rserver.c 4.0 KB
Newer Older
S
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

//#define _DEFAULT_SOURCE
#include "os.h"
#include "tlog.h"
#include "trpc.h"
#include <stdint.h>

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44
int msgSize = 128;
int commit = 0;
int dataFd = -1;

void processRequestMsg(char type, void *pCont, int contLen, void *thandle, int32_t code) {
  static int num = 0;
  tTrace("request is received, type:%d, contLen:%d", type, contLen);

  if (dataFd >=0) 
    write(dataFd, pCont, contLen);

  if (commit >=2) {
    ++num;
    if ( fsync(dataFd) < 0 ) {
      tPrint("failed to flush data to file, reason:%s", strerror(errno));
    }

    if (num % 10000 == 0) {
      tPrint("%d request have been written into disk", num);
    }
  }

  void *rsp = rpcMallocCont(msgSize);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
45

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
46
  rpcSendResponse(thandle, 1, rsp, msgSize);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
47

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
48
/*
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
49 50 51 52 53 54 55
  SRpcIpSet ipSet;
  ipSet.numOfIps = 1;
  ipSet.index = 0;
  ipSet.port = 7000;
  ipSet.ip[0] = inet_addr("192.168.0.2");

  rpcSendRedirectRsp(ahandle, &ipSet);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
56
*/
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
57

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
58 59 60
  rpcFreeCont(pCont);
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
61
int main(int argc, char *argv[]) {
S
slguan 已提交
62
  SRpcInit rpcInit;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
63 64
  char     dataName[20] = "server.data";

S
slguan 已提交
65 66 67
  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.localIp      = "0.0.0.0";
  rpcInit.localPort    = 7000;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
68
  rpcInit.label        = "SER";
S
slguan 已提交
69
  rpcInit.numOfThreads = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
70
  rpcInit.cfp          = processRequestMsg;
S
slguan 已提交
71 72
  rpcInit.sessions     = 1000;
  rpcInit.idleTime     = 2000;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
73 74

  for (int i=1; i<argc; ++i) {
75
    if (strcmp(argv[i], "-p")==0 && i < argc-1) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
76 77 78
      rpcInit.localPort = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-i")==0 && i < argc-1) {
      strcpy(rpcInit.localIp, argv[++i]); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
79
    } else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
80 81 82 83 84 85 86 87 88 89 90 91 92 93 94
      rpcInit.numOfThreads = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
      msgSize = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
      rpcInit.sessions = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
      tsCompressMsgSize = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-w")==0 && i < argc-1) {
      commit = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
      rpcDebugFlag = atoi(argv[++i]);
    } else {
      printf("\nusage: %s [options] \n", argv[0]);
      printf("  [-i ip]: server IP address, default is:%s\n", rpcInit.localIp);
      printf("  [-p port]: server port number, default is:%d\n", rpcInit.localPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
95
      printf("  [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
96 97 98 99 100 101 102 103 104 105
      printf("  [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions);
      printf("  [-m msgSize]: message body size, default is:%d\n", msgSize);
      printf("  [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
      printf("  [-w write]: write received data to file(0, 1, 2), default is:%d\n", commit);
      printf("  [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
      printf("  [-h help]: print out this help\n\n");
      exit(0);
    }
  } 

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
106
  tsAsyncLog = 0;
107
  rpcInit.connType = TAOS_CONN_SERVER;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
108 109

  taosInitLog("server.log", 100000, 10);
S
slguan 已提交
110

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
111 112
  void *pRpc = rpcOpen(&rpcInit);
  if (pRpc == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
113
    tError("failed to start RPC server");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
114
    return -1;
S
slguan 已提交
115 116
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
117
  tPrint("RPC server is running, ctrl-c to exit");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
118

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
119 120 121 122 123 124 125 126 127 128
  if (commit) {
    dataFd = open(dataName, O_APPEND | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);  
    if (dataFd<0) 
      tPrint("failed to open data file, reason:%s", strerror(errno));
  }

  // loop forever
  while(1) {
    sleep(1);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
129

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
130 131 132 133
  if (dataFd >= 0) {
    close(dataFd);
    remove(dataName);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
134

S
slguan 已提交
135 136 137 138
  return 0;
}