rserver.c 5.5 KB
Newer Older
S
slguan 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

//#define _DEFAULT_SOURCE
#include "os.h"
S
slguan 已提交
18 19
#include "tglobal.h"
#include "rpcLog.h"
S
slguan 已提交
20
#include "trpc.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
21
#include "tqueue.h"
S
slguan 已提交
22

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
23 24 25
int msgSize = 128;
int commit = 0;
int dataFd = -1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
26
void *qhandle = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
27
void *qset = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
28

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
29
void processShellMsg() {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
30
  static int num = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
31
  taos_qall  qall;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
32 33
  SRpcMsg   *pRpcMsg, rpcMsg;
  int        type;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
34
  void      *pvnode;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
35 36
 
  qall = taosAllocateQall();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
37 38

  while (1) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
39
    int numOfMsgs = taosReadAllQitemsFromQset(qset, qall, &pvnode);
40
    tDebug("%d shell msgs are received", numOfMsgs);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
41
    if (numOfMsgs <= 0) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
42 43

    for (int i=0; i<numOfMsgs; ++i) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
44
      taosGetQitem(qall, &type, (void **)&pRpcMsg);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
45 46
 
      if (dataFd >=0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
47
        if ( write(dataFd, pRpcMsg->pCont, pRpcMsg->contLen) <0 ) {
48
          tInfo("failed to write data file, reason:%s", strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
49 50 51
        }
      }
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
52

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
53 54
    if (commit >=2) {
      num += numOfMsgs;
S
TD-4088  
Shengliang Guan 已提交
55
      if ( taosFsync(dataFd) < 0 ) {
56
        tInfo("failed to flush data to file, reason:%s", strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
57
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
58

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
59
      if (num % 10000 == 0) {
60
        tInfo("%d request have been written into disk", num);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
61
      }
62
    }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
63 64 65 66
  
    taosResetQitems(qall);
    for (int i=0; i<numOfMsgs; ++i) {

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
67 68 69
      taosGetQitem(qall, &type, (void **)&pRpcMsg);
      rpcFreeCont(pRpcMsg->pCont);

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
70
      memset(&rpcMsg, 0, sizeof(rpcMsg));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
71 72
      rpcMsg.pCont = rpcMallocCont(msgSize);
      rpcMsg.contLen = msgSize;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
73
      rpcMsg.handle = pRpcMsg->handle;
74
      rpcMsg.code = 0;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
75
      rpcSendResponse(&rpcMsg);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
76 77

      taosFreeQitem(pRpcMsg);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
78 79 80 81
    }

  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
82
  taosFreeQall(qall);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
83

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
84 85
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
int retrieveAuthInfo(char *meterId, char *spi, char *encrypt, char *secret, char *ckey) {
  // app shall retrieve the auth info based on meterID from DB or a data file
  // demo code here only for simple demo
  int ret = 0;

  if (strcmp(meterId, "michael") == 0) {
    *spi = 1;
    *encrypt = 0;
    strcpy(secret, "mypassword");
    strcpy(ckey, "key");
  } else if (strcmp(meterId, "jeff") == 0) {
    *spi = 0;
    *encrypt = 0;
  } else {
    ret = -1;  // user not there
  }

  return ret;
}

106
void processRequestMsg(SRpcMsg *pMsg, SRpcEpSet *pEpSet) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
107
  SRpcMsg *pTemp;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
108
 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
109 110 111
  pTemp = taosAllocateQitem(sizeof(SRpcMsg));
  memcpy(pTemp, pMsg, sizeof(SRpcMsg));

112
  tDebug("request is received, type:%d, contLen:%d, item:%p", pMsg->msgType, pMsg->contLen, pTemp);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
113
  taosWriteQitem(qhandle, TAOS_QTYPE_RPC, pTemp); 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
114 115
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
116
int main(int argc, char *argv[]) {
S
slguan 已提交
117
  SRpcInit rpcInit;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
118 119
  char     dataName[20] = "server.data";

120 121
  taosBlockSIGPIPE();

S
slguan 已提交
122 123
  memset(&rpcInit, 0, sizeof(rpcInit));
  rpcInit.localPort    = 7000;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
124
  rpcInit.label        = "SER";
S
slguan 已提交
125
  rpcInit.numOfThreads = 1;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
126
  rpcInit.cfp          = processRequestMsg;
S
slguan 已提交
127
  rpcInit.sessions     = 1000;
128
  rpcInit.idleTime     = tsShellActivityTimer*1500; 
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
129
  rpcInit.afp          = retrieveAuthInfo;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
130 131

  for (int i=1; i<argc; ++i) {
132
    if (strcmp(argv[i], "-p")==0 && i < argc-1) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
133
      rpcInit.localPort = atoi(argv[++i]);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
134
    } else if (strcmp(argv[i], "-t")==0 && i < argc-1) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
135 136 137 138 139 140 141 142 143 144 145
      rpcInit.numOfThreads = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-m")==0 && i < argc-1) {
      msgSize = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-s")==0 && i < argc-1) {
      rpcInit.sessions = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-o")==0 && i < argc-1) {
      tsCompressMsgSize = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-w")==0 && i < argc-1) {
      commit = atoi(argv[++i]);
    } else if (strcmp(argv[i], "-d")==0 && i < argc-1) {
      rpcDebugFlag = atoi(argv[++i]);
146
      dDebugFlag = rpcDebugFlag;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
147
      uDebugFlag = rpcDebugFlag;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
148 149 150
    } else {
      printf("\nusage: %s [options] \n", argv[0]);
      printf("  [-p port]: server port number, default is:%d\n", rpcInit.localPort);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
151
      printf("  [-t threads]: number of rpc threads, default is:%d\n", rpcInit.numOfThreads);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
152 153 154 155 156 157 158 159 160 161
      printf("  [-s sessions]: number of sessions, default is:%d\n", rpcInit.sessions);
      printf("  [-m msgSize]: message body size, default is:%d\n", msgSize);
      printf("  [-o compSize]: compression message size, default is:%d\n", tsCompressMsgSize);
      printf("  [-w write]: write received data to file(0, 1, 2), default is:%d\n", commit);
      printf("  [-d debugFlag]: debug flag, default:%d\n", rpcDebugFlag);
      printf("  [-h help]: print out this help\n\n");
      exit(0);
    }
  } 

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
162
  tsAsyncLog = 0;
163
  rpcInit.connType = TAOS_CONN_SERVER;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
164
  taosInitLog("server.log", 100000, 10);
S
slguan 已提交
165

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
166 167
  void *pRpc = rpcOpen(&rpcInit);
  if (pRpc == NULL) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
168
    tError("failed to start RPC server");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
169
    return -1;
S
slguan 已提交
170 171
  }

172
  tInfo("RPC server is running, ctrl-c to exit");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
173

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
174
  if (commit) {
175
    dataFd = open(dataName, O_APPEND | O_CREAT | O_WRONLY, S_IRWXU | S_IRWXG | S_IRWXO);  
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
176
    if (dataFd<0) 
177
      tInfo("failed to open data file, reason:%s", strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
178 179
  }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
180
  qhandle = taosOpenQueue(sizeof(SRpcMsg));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
181 182
  qset = taosOpenQset();
  taosAddIntoQset(qset, qhandle, NULL);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
183

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
184
  processShellMsg();
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
185

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
186 187 188 189
  if (dataFd >= 0) {
    close(dataFd);
    remove(dataName);
  }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
190

S
slguan 已提交
191 192
  return 0;
}