rpcUdp.c 7.3 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "rpcUdp.h"
S
slguan 已提交
17
#include "os.h"
dengyihao's avatar
dengyihao 已提交
18 19
#include "rpcHead.h"
#include "rpcLog.h"
20
#include "taosdef.h"
21
#include "taoserror.h"
dengyihao's avatar
dengyihao 已提交
22 23
#include "ttimer.h"
#include "tutil.h"
H
hzcheng 已提交
24

25 26
#ifndef USE_UV

H
hzcheng 已提交
27 28 29 30 31 32
#define RPC_MAX_UDP_CONNS 256
#define RPC_MAX_UDP_PKTS 1000
#define RPC_UDP_BUF_TIME 5  // mseconds
#define RPC_MAX_UDP_SIZE 65480

typedef struct {
dengyihao's avatar
dengyihao 已提交
33 34 35 36 37 38 39 40 41 42 43
  int       index;
  SOCKET    fd;
  uint16_t  port;                   // peer port
  uint16_t  localPort;              // local port
  char      label[TSDB_LABEL_LEN];  // copy from udpConnSet;
  pthread_t thread;
  void *    hash;
  void *    shandle;  // handle passed by upper layer during server initialization
  void *    pSet;
  void *(*processData)(SRecvInfo *pRecv);
  char *buffer;  // buffer to receive data
H
hzcheng 已提交
44 45 46
} SUdpConn;

typedef struct {
dengyihao's avatar
dengyihao 已提交
47 48 49 50 51 52 53 54 55
  int      index;
  int      server;
  uint32_t ip;       // local IP
  uint16_t port;     // local Port
  void *   shandle;  // handle passed by upper layer during server initialization
  int      threads;
  char     label[TSDB_LABEL_LEN];
  void *(*fp)(SRecvInfo *pPacket);
  SUdpConn udpConn[];
H
hzcheng 已提交
56 57
} SUdpConnSet;

58
static void *taosRecvUdpData(void *param);
H
hzcheng 已提交
59

J
jtao1735 已提交
60
void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
dengyihao's avatar
dengyihao 已提交
61
  SUdpConn *   pConn;
J
Jeff Tao 已提交
62
  SUdpConnSet *pSet;
H
hzcheng 已提交
63 64 65 66 67

  int size = (int)sizeof(SUdpConnSet) + threads * (int)sizeof(SUdpConn);
  pSet = (SUdpConnSet *)malloc((size_t)size);
  if (pSet == NULL) {
    tError("%s failed to allocate UdpConn", label);
68
    terrno = TAOS_SYSTEM_ERROR(errno);
H
hzcheng 已提交
69 70 71 72
    return NULL;
  }

  memset(pSet, 0, (size_t)size);
J
jtao1735 已提交
73
  pSet->ip = ip;
H
hzcheng 已提交
74 75 76
  pSet->port = port;
  pSet->shandle = shandle;
  pSet->fp = fp;
77
  pSet->threads = threads;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
78
  tstrncpy(pSet->label, label, sizeof(pSet->label));
H
hzcheng 已提交
79

80 81 82 83
  pthread_attr_t thAttr;
  pthread_attr_init(&thAttr);
  pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE);

dengyihao's avatar
dengyihao 已提交
84
  int      i;
L
lihui 已提交
85
  uint16_t ownPort;
86
  for (i = 0; i < threads; ++i) {
H
hzcheng 已提交
87
    pConn = pSet->udpConn + i;
L
lihui 已提交
88
    ownPort = (port ? port + i : 0);
H
hzcheng 已提交
89 90
    pConn->fd = taosOpenUdpSocket(ip, ownPort);
    if (pConn->fd < 0) {
J
jtao1735 已提交
91
      tError("%s failed to open UDP socket %x:%hu", label, ip, port);
92
      break;
H
hzcheng 已提交
93 94
    }

95 96 97
    pConn->buffer = malloc(RPC_MAX_UDP_SIZE);
    if (NULL == pConn->buffer) {
      tError("%s failed to malloc recv buffer", label);
98
      break;
99 100
    }

H
hzcheng 已提交
101
    struct sockaddr_in sin;
dengyihao's avatar
dengyihao 已提交
102 103 104
    unsigned int       addrlen = sizeof(sin);
    if (getsockname(pConn->fd, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
        addrlen == sizeof(sin)) {
J
jtao1735 已提交
105
      pConn->localPort = (uint16_t)ntohs(sin.sin_port);
H
hzcheng 已提交
106 107
    }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
108
    tstrncpy(pConn->label, label, sizeof(pConn->label));
H
hzcheng 已提交
109 110 111 112
    pConn->shandle = shandle;
    pConn->processData = fp;
    pConn->index = i;
    pConn->pSet = pSet;
113

J
Jeff Tao 已提交
114 115
    int code = pthread_create(&pConn->thread, &thAttr, taosRecvUdpData, pConn);
    if (code != 0) {
116 117
      tError("%s failed to create thread to process UDP data(%s)", label, strerror(errno));
      break;
118
    }
H
hzcheng 已提交
119 120
  }

121
  pthread_attr_destroy(&thAttr);
H
hzcheng 已提交
122

dengyihao's avatar
dengyihao 已提交
123
  if (i != threads) {
124 125 126 127 128
    terrno = TAOS_SYSTEM_ERROR(errno);
    taosCleanUpUdpConnection(pSet);
    return NULL;
  }

129
  tDebug("%s UDP connection is initialized, ip:%x:%hu threads:%d", label, ip, port, threads);
H
hzcheng 已提交
130 131 132
  return pSet;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
133
void taosStopUdpConnection(void *handle) {
H
hzcheng 已提交
134
  SUdpConnSet *pSet = (SUdpConnSet *)handle;
dengyihao's avatar
dengyihao 已提交
135
  SUdpConn *   pConn;
H
hzcheng 已提交
136 137 138 139 140

  if (pSet == NULL) return;

  for (int i = 0; i < pSet->threads; ++i) {
    pConn = pSet->udpConn + i;
dengyihao's avatar
dengyihao 已提交
141 142
    if (pConn->fd >= 0) shutdown(pConn->fd, SHUT_RDWR);
    if (pConn->fd >= 0) taosCloseSocket(pConn->fd);
143
    pConn->fd = -1;
144
  }
H
hzcheng 已提交
145

146 147
  for (int i = 0; i < pSet->threads; ++i) {
    pConn = pSet->udpConn + i;
148 149 150
    if (taosCheckPthreadValid(pConn->thread)) {
      pthread_join(pConn->thread, NULL);
    }
S
TD-1848  
Shengliang Guan 已提交
151
    tfree(pConn->buffer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
152 153 154
    // tTrace("%s UDP thread is closed, index:%d", pConn->label, i);
  }

155
  tDebug("%s UDP is stopped", pSet->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
156 157 158 159
}

void taosCleanUpUdpConnection(void *handle) {
  SUdpConnSet *pSet = (SUdpConnSet *)handle;
dengyihao's avatar
dengyihao 已提交
160
  SUdpConn *   pConn;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
161 162 163 164 165

  if (pSet == NULL) return;

  for (int i = 0; i < pSet->threads; ++i) {
    pConn = pSet->udpConn + i;
dengyihao's avatar
dengyihao 已提交
166
    if (pConn->fd >= 0) taosCloseSocket(pConn->fd);
H
hzcheng 已提交
167 168
  }

169
  tDebug("%s UDP is cleaned up", pSet->label);
S
TD-1848  
Shengliang Guan 已提交
170
  tfree(pSet);
H
hzcheng 已提交
171 172
}

J
jtao1735 已提交
173
void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
H
hzcheng 已提交
174 175 176 177 178 179 180
  SUdpConnSet *pSet = (SUdpConnSet *)shandle;

  pSet->index = (pSet->index + 1) % pSet->threads;

  SUdpConn *pConn = pSet->udpConn + pSet->index;
  pConn->port = port;

181
  tDebug("%s UDP connection is setup, ip:%x:%hu localPort:%hu", pConn->label, ip, port, pConn->localPort);
H
hzcheng 已提交
182 183 184 185

  return pConn;
}

186
static void *taosRecvUdpData(void *param) {
dengyihao's avatar
dengyihao 已提交
187
  SUdpConn *         pConn = param;
188
  struct sockaddr_in sourceAdd;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
189
  ssize_t            dataLen;
190 191 192 193 194 195
  unsigned int       addLen;
  uint16_t           port;
  SRecvInfo          recvInfo;

  memset(&sourceAdd, 0, sizeof(sourceAdd));
  addLen = sizeof(sourceAdd);
196
  tDebug("%s UDP thread is created, index:%d", pConn->label, pConn->index);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
197
  char *msg = pConn->buffer;
198

199 200
  setThreadName("recvUdpData");

201
  while (1) {
202
    dataLen = recvfrom(pConn->fd, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
S
TD-1207  
Shengliang Guan 已提交
203 204 205 206
    if (dataLen <= 0) {
      tDebug("%s UDP socket was closed, exiting(%s), dataLen:%d fd:%d", pConn->label, strerror(errno), (int32_t)dataLen,
             pConn->fd);

207
      // for windows usage, remote shutdown also returns - 1 in windows client
S
TD-1207  
Shengliang Guan 已提交
208
      if (pConn->fd == -1) {
209 210 211 212
        break;
      } else {
        continue;
      }
213 214
    }

J
jtao1735 已提交
215
    port = ntohs(sourceAdd.sin_port);
216 217

    if (dataLen < sizeof(SRpcHead)) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
218
      tError("%s recvfrom failed(%s)", pConn->label, strerror(errno));
219 220 221
      continue;
    }

S
TD-1762  
Shengliang Guan 已提交
222
    int32_t size = dataLen + tsRpcOverhead;
dengyihao's avatar
dengyihao 已提交
223
    char *  tmsg = malloc(size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
224
    if (NULL == tmsg) {
S
TD-1530  
Shengliang Guan 已提交
225
      tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
226
      continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
227
    } else {
S
TD-1762  
Shengliang Guan 已提交
228
      tTrace("UDP malloc mem:%p size:%d", tmsg, size);
229 230
    }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
231 232 233 234 235 236 237 238 239 240 241
    tmsg += tsRpcOverhead;  // overhead for SRpcReqContext
    memcpy(tmsg, msg, dataLen);
    recvInfo.msg = tmsg;
    recvInfo.msgLen = dataLen;
    recvInfo.ip = sourceAdd.sin_addr.s_addr;
    recvInfo.port = port;
    recvInfo.shandle = pConn->shandle;
    recvInfo.thandle = NULL;
    recvInfo.chandle = pConn;
    recvInfo.connType = 0;
    (*(pConn->processData))(&recvInfo);
242 243 244 245 246 247 248 249
  }

  return NULL;
}

int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle) {
  SUdpConn *pConn = (SUdpConn *)chandle;

250
  if (pConn == NULL) return -1;
251

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
252 253 254 255 256
  struct sockaddr_in destAdd;
  memset(&destAdd, 0, sizeof(destAdd));
  destAdd.sin_family = AF_INET;
  destAdd.sin_addr.s_addr = ip;
  destAdd.sin_port = htons(port);
H
hzcheng 已提交
257

S
Shengliang Guan 已提交
258
  int ret = (int)taosSendto(pConn->fd, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd));
H
hzcheng 已提交
259

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
260
  return ret;
H
hzcheng 已提交
261
}
262
#endif