svrBench.c 7.0 KB
Newer Older
dengyihao's avatar
dengyihao 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

//#define _DEFAULT_SOURCE
#include "os.h"
#include "tglobal.h"
#include "tqueue.h"
dengyihao's avatar
dengyihao 已提交
20
#include "transLog.h"
dengyihao's avatar
dengyihao 已提交
21 22 23 24
#include "trpc.h"

int         msgSize = 128;
int         commit = 0;
25
TdFilePtr   pDataFile = NULL;
dengyihao's avatar
dengyihao 已提交
26
STaosQueue *qhandle = NULL;
dengyihao's avatar
dengyihao 已提交
27
STaosQset  *qset = NULL;
dengyihao's avatar
dengyihao 已提交
28

dengyihao's avatar
dengyihao 已提交
29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
int32_t balance = 0;

typedef struct {
  int32_t      numOfThread;
  STaosQueue **qhandle;
  STaosQset  **qset;

} MultiThreadQhandle;

typedef struct TThread {
  TdThread thread;
  int      idx;
} TThread;

MultiThreadQhandle *multiQ = NULL;

G
bench  
gccgdb1234 已提交
45
void initLogEnv() {
dengyihao's avatar
dengyihao 已提交
46 47
  const char   *logDir = "/tmp/trans_svr";
  const char   *defaultLogFileNamePrefix = "taoslog";
G
bench  
gccgdb1234 已提交
48 49
  const int32_t maxLogFileNum = 10000;
  tsAsyncLog = 0;
dengyihao's avatar
dengyihao 已提交
50
  // idxDebugFlag = 143;
G
bench  
gccgdb1234 已提交
51 52
  strcpy(tsLogDir, logDir);
  taosRemoveDir(tsLogDir);
dengyihao's avatar
dengyihao 已提交
53 54
  taosMkDir(tsLogDir);

G
bench  
gccgdb1234 已提交
55
  if (taosInitLog(defaultLogFileNamePrefix, maxLogFileNum) < 0) {
dengyihao's avatar
dengyihao 已提交
56
    printf("failed to open log file in directory:%s\n", tsLogDir);
G
bench  
gccgdb1234 已提交
57 58
  }
}
dengyihao's avatar
dengyihao 已提交
59 60 61 62
void *processShellMsg(void *arg) {
  TThread *thread = (TThread *)arg;

  int32_t    idx = thread->idx;
dengyihao's avatar
dengyihao 已提交
63 64
  static int num = 0;
  STaosQall *qall;
dengyihao's avatar
dengyihao 已提交
65
  SRpcMsg   *pRpcMsg, rpcMsg;
dengyihao's avatar
dengyihao 已提交
66
  int        type;
67
  SQueueInfo qinfo = {0};
dengyihao's avatar
dengyihao 已提交
68 69 70 71

  qall = taosAllocateQall();

  while (1) {
dengyihao's avatar
dengyihao 已提交
72
    int numOfMsgs = taosReadAllQitemsFromQset(multiQ->qset[idx], qall, &qinfo);
dengyihao's avatar
dengyihao 已提交
73 74 75 76 77 78
    tDebug("%d shell msgs are received", numOfMsgs);
    if (numOfMsgs <= 0) break;

    for (int i = 0; i < numOfMsgs; ++i) {
      taosGetQitem(qall, (void **)&pRpcMsg);

79 80
      if (pDataFile != NULL) {
        if (taosWriteFile(pDataFile, pRpcMsg->pCont, pRpcMsg->contLen) < 0) {
dengyihao's avatar
dengyihao 已提交
81 82 83 84 85 86 87
          tInfo("failed to write data file, reason:%s", strerror(errno));
        }
      }
    }

    if (commit >= 2) {
      num += numOfMsgs;
88
      // if (taosFsync(pDataFile) < 0) {
dengyihao's avatar
dengyihao 已提交
89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104
      //  tInfo("failed to flush data to file, reason:%s", strerror(errno));
      //}

      if (num % 10000 == 0) {
        tInfo("%d request have been written into disk", num);
      }
    }

    taosResetQitems(qall);
    for (int i = 0; i < numOfMsgs; ++i) {
      taosGetQitem(qall, (void **)&pRpcMsg);
      rpcFreeCont(pRpcMsg->pCont);

      memset(&rpcMsg, 0, sizeof(rpcMsg));
      rpcMsg.pCont = rpcMallocCont(msgSize);
      rpcMsg.contLen = msgSize;
S
Shengliang Guan 已提交
105
      rpcMsg.info = pRpcMsg->info;
dengyihao's avatar
dengyihao 已提交
106 107 108
      rpcMsg.code = 0;
      rpcSendResponse(&rpcMsg);

S
Shengliang Guan 已提交
109
      void *handle = pRpcMsg->info.handle;
dengyihao's avatar
dengyihao 已提交
110
      taosFreeQitem(pRpcMsg);
dengyihao's avatar
dengyihao 已提交
111 112 113 114 115 116 117 118
      //{
      //  SRpcMsg nRpcMsg = {0};
      //  nRpcMsg.pCont = rpcMallocCont(msgSize);
      //  nRpcMsg.contLen = msgSize;
      //  nRpcMsg.info.handle = handle;
      //  nRpcMsg.code = TSDB_CODE_CTG_NOT_READY;
      //  rpcSendResponse(&nRpcMsg);
      //}
dengyihao's avatar
dengyihao 已提交
119
    }
120 121

    taosUpdateItemSize(qinfo.queue, numOfMsgs);
dengyihao's avatar
dengyihao 已提交
122 123 124
  }

  taosFreeQall(qall);
dengyihao's avatar
dengyihao 已提交
125
  return NULL;
dengyihao's avatar
dengyihao 已提交
126 127 128 129 130
}

void processRequestMsg(void *pParent, SRpcMsg *pMsg, SEpSet *pEpSet) {
  SRpcMsg *pTemp;

131
  pTemp = taosAllocateQitem(sizeof(SRpcMsg), DEF_QITEM);
dengyihao's avatar
dengyihao 已提交
132 133
  memcpy(pTemp, pMsg, sizeof(SRpcMsg));

dengyihao's avatar
dengyihao 已提交
134
  int32_t idx = balance % multiQ->numOfThread;
dengyihao's avatar
dengyihao 已提交
135
  tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
dengyihao's avatar
dengyihao 已提交
136 137 138
  taosWriteQitem(multiQ->qhandle[idx], pTemp);
  balance++;
  if (balance >= multiQ->numOfThread) balance = 0;
dengyihao's avatar
dengyihao 已提交
139 140 141 142 143 144 145 146 147 148
}

int main(int argc, char *argv[]) {
  SRpcInit rpcInit;
  char     dataName[20] = "server.data";

  taosBlockSIGPIPE();

  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.localPort = 7000;
dengyihao's avatar
dengyihao 已提交
149
  memcpy(rpcInit.localFqdn, "localhost", strlen("localhost"));
dengyihao's avatar
dengyihao 已提交
150 151 152 153
  rpcInit.label = "SER";
  rpcInit.numOfThreads = 1;
  rpcInit.cfp = processRequestMsg;
  rpcInit.idleTime = 2 * 1500;
dengyihao's avatar
dengyihao 已提交
154
  rpcDebugFlag = 131;
dengyihao's avatar
dengyihao 已提交
155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173

  for (int i = 1; i < argc; ++i) {
    if (strcmp(argv[i], "-p") == 0 && i < argc - 1) {
      rpcInit.localPort = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-t") == 0 && i < argc - 1) {
      rpcInit.numOfThreads = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-m") == 0 && i < argc - 1) {
      msgSize = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-s") == 0 && i < argc - 1) {
      rpcInit.sessions = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-o") == 0 && i < argc - 1) {
      tsCompressMsgSize = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-w") == 0 && i < argc - 1) {
      commit = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-d") == 0 && i < argc - 1) {
      rpcDebugFlag = atoi(argv[++i]);
      dDebugFlag = rpcDebugFlag;
      uDebugFlag = rpcDebugFlag;
    } else {
174
      printf("\nusage:%s [options] \n", argv[0]);
dengyihao's avatar
dengyihao 已提交
175 176 177 178 179 180 181 182 183 184 185 186 187
      printf("  [-p port]: server port number, default is:%d\n", rpcInit.localPort);
      printf("  [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
      printf("  [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions);
      printf("  [-m msgSize]: message body size, default is:%d\n", msgSize);
      printf("  [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
      printf("  [-w write]: write received data to file(0, 1, 2), default is:%d\n", commit);
      printf("  [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
      printf("  [-h help]: print out this help\n\n");
      exit(0);
    }
  }

  rpcInit.connType = TAOS_CONN_SERVER;
dengyihao's avatar
dengyihao 已提交
188

G
bench  
gccgdb1234 已提交
189
  initLogEnv();
dengyihao's avatar
dengyihao 已提交
190 191 192 193 194 195

  void *pRpc = rpcOpen(&rpcInit);
  if (pRpc == NULL) {
    tError("failed to start RPC server");
    return -1;
  }
wafwerar's avatar
wafwerar 已提交
196
  // taosSsleep(5);
dengyihao's avatar
dengyihao 已提交
197 198 199 200

  tInfo("RPC server is running, ctrl-c to exit");

  if (commit) {
201
    pDataFile = taosOpenFile(dataName, TD_FILE_APPEND | TD_FILE_CREATE | TD_FILE_WRITE);
202
    if (pDataFile == NULL) tInfo("failed to open data file, reason:%s", strerror(errno));
dengyihao's avatar
dengyihao 已提交
203 204
  }

dengyihao's avatar
dengyihao 已提交
205 206 207 208 209 210 211 212 213 214 215 216 217 218 219 220 221 222 223 224 225
  int32_t numOfAthread = 5;
  multiQ = taosMemoryMalloc(sizeof(numOfAthread));
  multiQ->numOfThread = numOfAthread;
  multiQ->qhandle = (STaosQueue **)taosMemoryMalloc(sizeof(STaosQueue *) * numOfAthread);
  multiQ->qset = (STaosQset **)taosMemoryMalloc(sizeof(STaosQset *) * numOfAthread);

  for (int i = 0; i < numOfAthread; i++) {
    multiQ->qhandle[i] = taosOpenQueue();
    multiQ->qset[i] = taosOpenQset();
    taosAddIntoQset(multiQ->qset[i], multiQ->qhandle[i], NULL);
  }
  TThread *threads = taosMemoryMalloc(sizeof(TThread) * numOfAthread);
  for (int i = 0; i < numOfAthread; i++) {
    threads[i].idx = i;
    taosThreadCreate(&(threads[i].thread), NULL, processShellMsg, (void *)&threads[i]);
  }
  // qhandle = taosOpenQueue();
  // qset = taosOpenQset();
  // taosAddIntoQset(qset, qhandle, NULL);

  // processShellMsg();
dengyihao's avatar
dengyihao 已提交
226

227 228
  if (pDataFile != NULL) {
    taosCloseFile(&pDataFile);
229
    taosRemoveFile(dataName);
dengyihao's avatar
dengyihao 已提交
230
  }
dengyihao's avatar
dengyihao 已提交
231 232
  int ch = getchar();
  UNUSED(ch);
dengyihao's avatar
dengyihao 已提交
233 234 235

  return 0;
}