rserver.c 4.0 KB
Newer Older
S
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

//#define _DEFAULT_SOURCE
#include "os.h"
#include "tlog.h"
#include "trpc.h"
#include <stdint.h>

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
int msgSize = 128;
int commit = 0;
int dataFd = -1;

void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32_t code) {
  static int num = 0;
  tTrace("request is received, type:%d, contLen:%d", type, contLen);

  if (dataFd >=0) 
    write(dataFd, pCont, contLen);

  if (commit >=2) {
    ++num;
    if ( fsync(dataFd) < 0 ) {
      tPrint("failed to flush data to file, reason:%s", strerror(errno));
    }

    if (num % 10000 == 0) {
      tPrint("%d request have been written into disk", num);
    }
  }

  void *rsp = rpcMallocCont(msgSize);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
45

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
46
  rpcSendResponse(thandle, 1, rsp, msgSize);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
47

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
48
/*
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
49 50 51 52 53 54 55
  SRpcIpSet ipSet;
  ipSet.numOfIps = 1;
  ipSet.index = 0;
  ipSet.port = 7000;
  ipSet.ip[0] = inet_addr("192.168.0.2");

  rpcSendRedirectRsp(ahandle, &ipSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
56
*/
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
57

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
58 59 60
  rpcFreeCont(pCont);
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
61
int main(int argc, char *argv[]) {
S
slguan 已提交
62
  SRpcInit rpcInit;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
63 64
  char     dataName[20] = "server.data";

S
slguan 已提交
65 66 67
  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.localIp      = "0.0.0.0";
  rpcInit.localPort    = 7000;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
68
  rpcInit.label        = "SER";
S
slguan 已提交
69
  rpcInit.numOfThreads = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
70
  rpcInit.cfp          = processRequestMsg;
S
slguan 已提交
71 72
  rpcInit.sessions     = 1000;
  rpcInit.idleTime     = 2000;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
73 74

  for (int i=1; i<argc; ++i) {
75
    if (strcmp(argv[i], "-p")==0 && i < argc-1) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
      rpcInit.localPort = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-i")==0 && i < argc-1) {
      strcpy(rpcInit.localIp, argv[++i]); 
    } else if (strcmp(argv[i], "-n")==0 && i < argc-1) {
      rpcInit.numOfThreads = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
      msgSize = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
      rpcInit.sessions = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
      tsCompressMsgSize = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-w")==0 && i < argc-1) {
      commit = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
      rpcDebugFlag = atoi(argv[++i]);
    } else {
      printf("\nusage: %s [options] \n", argv[0]);
      printf("  [-i ip]: server IP address, default is:%s\n", rpcInit.localIp);
      printf("  [-p port]: server port number, default is:%d\n", rpcInit.localPort);
      printf("  [-t threads]: number of threads, default is:%d\n", rpcInit.numOfThreads);
      printf("  [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions);
      printf("  [-m msgSize]: message body size, default is:%d\n", msgSize);
      printf("  [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
      printf("  [-w write]: write received data to file(0, 1, 2), default is:%d\n", commit);
      printf("  [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
      printf("  [-h help]: print out this help\n\n");
      exit(0);
    }
  } 

106
  rpcInit.connType = TAOS_CONN_SERVER;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
107 108

  taosInitLog("server.log", 100000, 10);
S
slguan 已提交
109

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
110 111
  void *pRpc = rpcOpen(&rpcInit);
  if (pRpc == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
112
    tError("failed to start RPC server");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
113
    return -1;
S
slguan 已提交
114 115
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
116
  tPrint("RPC server is running, ctrl-c to exit");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
117

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
118 119 120 121 122 123 124 125 126 127
  if (commit) {
    dataFd = open(dataName, O_APPEND | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);  
    if (dataFd<0) 
      tPrint("failed to open data file, reason:%s", strerror(errno));
  }

  // loop forever
  while(1) {
    sleep(1);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
128

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
129 130 131 132
  if (dataFd >= 0) {
    close(dataFd);
    remove(dataName);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
133

S
slguan 已提交
134 135 136 137
  return 0;
}