rpcUdp.c 7.3 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "rpcUdp.h"
S
slguan 已提交
17
#include "os.h"
dengyihao's avatar
dengyihao 已提交
18 19
#include "rpcHead.h"
#include "rpcLog.h"
20
#include "taosdef.h"
21
#include "taoserror.h"
dengyihao's avatar
dengyihao 已提交
22 23
#include "ttimer.h"
#include "tutil.h"
H
hzcheng 已提交
24 25 26 27 28 29 30

#define RPC_MAX_UDP_CONNS 256
#define RPC_MAX_UDP_PKTS 1000
#define RPC_UDP_BUF_TIME 5  // mseconds
#define RPC_MAX_UDP_SIZE 65480

typedef struct {
dengyihao's avatar
dengyihao 已提交
31 32 33 34 35 36 37 38 39 40 41
  int       index;
  SOCKET    fd;
  uint16_t  port;                   // peer port
  uint16_t  localPort;              // local port
  char      label[TSDB_LABEL_LEN];  // copy from udpConnSet;
  pthread_t thread;
  void *    hash;
  void *    shandle;  // handle passed by upper layer during server initialization
  void *    pSet;
  void *(*processData)(SRecvInfo *pRecv);
  char *buffer;  // buffer to receive data
H
hzcheng 已提交
42 43 44
} SUdpConn;

typedef struct {
dengyihao's avatar
dengyihao 已提交
45 46 47 48 49 50 51 52 53
  int      index;
  int      server;
  uint32_t ip;       // local IP
  uint16_t port;     // local Port
  void *   shandle;  // handle passed by upper layer during server initialization
  int      threads;
  char     label[TSDB_LABEL_LEN];
  void *(*fp)(SRecvInfo *pPacket);
  SUdpConn udpConn[];
H
hzcheng 已提交
54 55
} SUdpConnSet;

56
static void *taosRecvUdpData(void *param);
H
hzcheng 已提交
57

J
jtao1735 已提交
58
void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
dengyihao's avatar
dengyihao 已提交
59
  SUdpConn *   pConn;
J
Jeff Tao 已提交
60
  SUdpConnSet *pSet;
H
hzcheng 已提交
61 62 63 64 65

  int size = (int)sizeof(SUdpConnSet) + threads * (int)sizeof(SUdpConn);
  pSet = (SUdpConnSet *)malloc((size_t)size);
  if (pSet == NULL) {
    tError("%s failed to allocate UdpConn", label);
66
    terrno = TAOS_SYSTEM_ERROR(errno);
H
hzcheng 已提交
67 68 69 70
    return NULL;
  }

  memset(pSet, 0, (size_t)size);
J
jtao1735 已提交
71
  pSet->ip = ip;
H
hzcheng 已提交
72 73 74
  pSet->port = port;
  pSet->shandle = shandle;
  pSet->fp = fp;
75
  pSet->threads = threads;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
76
  tstrncpy(pSet->label, label, sizeof(pSet->label));
H
hzcheng 已提交
77

78 79 80 81
  pthread_attr_t thAttr;
  pthread_attr_init(&thAttr);
  pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

dengyihao's avatar
dengyihao 已提交
82
  int      i;
L
lihui 已提交
83
  uint16_t ownPort;
84
  for (i = 0; i < threads; ++i) {
H
hzcheng 已提交
85
    pConn = pSet->udpConn + i;
L
lihui 已提交
86
    ownPort = (port ? port + i : 0);
H
hzcheng 已提交
87 88
    pConn->fd = taosOpenUdpSocket(ip, ownPort);
    if (pConn->fd < 0) {
J
jtao1735 已提交
89
      tError("%s failed to open UDP socket %x:%hu", label, ip, port);
90
      break;
H
hzcheng 已提交
91 92
    }

93 94 95
    pConn->buffer = malloc(RPC_MAX_UDP_SIZE);
    if (NULL == pConn->buffer) {
      tError("%s failed to malloc recv buffer", label);
96
      break;
97 98
    }

H
hzcheng 已提交
99
    struct sockaddr_in sin;
dengyihao's avatar
dengyihao 已提交
100 101 102
    unsigned int       addrlen = sizeof(sin);
    if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
        addrlen == sizeof(sin)) {
J
jtao1735 已提交
103
      pConn->localPort = (uint16_t)ntohs(sin.sin_port);
H
hzcheng 已提交
104 105
    }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
106
    tstrncpy(pConn->label, label, sizeof(pConn->label));
H
hzcheng 已提交
107 108 109 110
    pConn->shandle = shandle;
    pConn->processData = fp;
    pConn->index = i;
    pConn->pSet = pSet;
111

J
Jeff Tao 已提交
112 113
    int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn);
    if (code != 0) {
114 115
      tError("%s failed to create thread to process UDP data(%s)", label, strerror(errno));
      break;
116
    }
H
hzcheng 已提交
117 118
  }

119
  pthread_attr_destroy(&thAttr);
H
hzcheng 已提交
120

dengyihao's avatar
dengyihao 已提交
121
  if (i != threads) {
122 123 124 125 126
    terrno = TAOS_SYSTEM_ERROR(errno);
    taosCleanUpUdpConnection(pSet);
    return NULL;
  }

127
  tDebug("%s UDP connection is initialized, ip:%x:%hu threads:%d", label, ip, port, threads);
H
hzcheng 已提交
128 129 130
  return pSet;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
131
void taosStopUdpConnection(void *handle) {
H
hzcheng 已提交
132
  SUdpConnSet *pSet = (SUdpConnSet *)handle;
dengyihao's avatar
dengyihao 已提交
133
  SUdpConn *   pConn;
H
hzcheng 已提交
134 135 136 137 138

  if (pSet == NULL) return;

  for (int i = 0; i < pSet->threads; ++i) {
    pConn = pSet->udpConn + i;
dengyihao's avatar
dengyihao 已提交
139 140
    if (pConn->fd >= 0) shutdown(pConn->fd, SHUT_RDWR);
    if (pConn->fd >= 0) taosCloseSocket(pConn->fd);
141
    pConn->fd = -1;
142
  }
H
hzcheng 已提交
143

144 145
  for (int i = 0; i < pSet->threads; ++i) {
    pConn = pSet->udpConn + i;
146 147 148
    if (taosCheckPthreadValid(pConn->thread)) {
      pthread_join(pConn->thread, NULL);
    }
S
TD-1848  
Shengliang Guan 已提交
149
    tfree(pConn->buffer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
150 151 152
    // tTrace("%s UDP thread is closed, index:%d", pConn->label, i);
  }

153
  tDebug("%s UDP is stopped", pSet->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
154 155 156 157
}

void taosCleanUpUdpConnection(void *handle) {
  SUdpConnSet *pSet = (SUdpConnSet *)handle;
dengyihao's avatar
dengyihao 已提交
158
  SUdpConn *   pConn;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
159 160 161 162 163

  if (pSet == NULL) return;

  for (int i = 0; i < pSet->threads; ++i) {
    pConn = pSet->udpConn + i;
dengyihao's avatar
dengyihao 已提交
164
    if (pConn->fd >= 0) taosCloseSocket(pConn->fd);
H
hzcheng 已提交
165 166
  }

167
  tDebug("%s UDP is cleaned up", pSet->label);
S
TD-1848  
Shengliang Guan 已提交
168
  tfree(pSet);
H
hzcheng 已提交
169 170
}

J
jtao1735 已提交
171
void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
H
hzcheng 已提交
172 173 174 175 176 177 178
  SUdpConnSet *pSet = (SUdpConnSet *)shandle;

  pSet->index = (pSet->index + 1) % pSet->threads;

  SUdpConn *pConn = pSet->udpConn + pSet->index;
  pConn->port = port;

179
  tDebug("%s UDP connection is setup, ip:%x:%hu localPort:%hu", pConn->label, ip, port, pConn->localPort);
H
hzcheng 已提交
180 181 182 183

  return pConn;
}

184
static void *taosRecvUdpData(void *param) {
dengyihao's avatar
dengyihao 已提交
185
  SUdpConn *         pConn = param;
186
  struct sockaddr_in sourceAdd;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
187
  ssize_t            dataLen;
188 189 190 191 192 193
  unsigned int       addLen;
  uint16_t           port;
  SRecvInfo          recvInfo;

  memset(&sourceAdd, 0, sizeof(sourceAdd));
  addLen = sizeof(sourceAdd);
194
  tDebug("%s UDP thread is created, index:%d", pConn->label, pConn->index);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
195
  char *msg = pConn->buffer;
196

197 198
  setThreadName("recvUdpData");

199
  while (1) {
200
    dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
S
TD-1207  
Shengliang Guan 已提交
201 202 203 204
    if (dataLen <= 0) {
      tDebug("%s UDP socket was closed, exiting(%s), dataLen:%d fd:%d", pConn->label, strerror(errno), (int32_t)dataLen,
             pConn->fd);

205
      // for windows usage, remote shutdown also returns - 1 in windows client
S
TD-1207  
Shengliang Guan 已提交
206
      if (pConn->fd == -1) {
207 208 209 210
        break;
      } else {
        continue;
      }
211 212
    }

J
jtao1735 已提交
213
    port = ntohs(sourceAdd.sin_port);
214 215

    if (dataLen < sizeof(SRpcHead)) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
216
      tError("%s recvfrom failed(%s)", pConn->label, strerror(errno));
217 218 219
      continue;
    }

S
TD-1762  
Shengliang Guan 已提交
220
    int32_t size = dataLen + tsRpcOverhead;
dengyihao's avatar
dengyihao 已提交
221
    char *  tmsg = malloc(size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
222
    if (NULL == tmsg) {
S
TD-1530  
Shengliang Guan 已提交
223
      tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
224
      continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
225
    } else {
S
TD-1762  
Shengliang Guan 已提交
226
      tTrace("UDP malloc mem:%p size:%d", tmsg, size);
227 228
    }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
229 230 231 232 233 234 235 236 237 238 239
    tmsg += tsRpcOverhead;  // overhead for SRpcReqContext
    memcpy(tmsg, msg, dataLen);
    recvInfo.msg = tmsg;
    recvInfo.msgLen = dataLen;
    recvInfo.ip = sourceAdd.sin_addr.s_addr;
    recvInfo.port = port;
    recvInfo.shandle = pConn->shandle;
    recvInfo.thandle = NULL;
    recvInfo.chandle = pConn;
    recvInfo.connType = 0;
    (*(pConn->processData))(&recvInfo);
240 241 242 243 244 245 246 247
  }

  return NULL;
}

int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle) {
  SUdpConn *pConn = (SUdpConn *)chandle;

248
  if (pConn == NULL) return -1;
249

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
250 251 252 253 254
  struct sockaddr_in destAdd;
  memset(&destAdd, 0, sizeof(destAdd));
  destAdd.sin_family = AF_INET;
  destAdd.sin_addr.s_addr = ip;
  destAdd.sin_port = htons(port);
H
hzcheng 已提交
255

S
Shengliang Guan 已提交
256
  int ret = (int)taosSendto(pConn->fd, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd));
H
hzcheng 已提交
257

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
258
  return ret;
H
hzcheng 已提交
259
}