rpcUdp.c 7.5 KB
Newer Older
H
hzcheng 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

dengyihao's avatar
dengyihao 已提交
16
#include "rpcUdp.h"
S
slguan 已提交
17
#include "os.h"
dengyihao's avatar
dengyihao 已提交
18 19
#include "rpcHead.h"
#include "rpcLog.h"
20
#include "taosdef.h"
21
#include "taoserror.h"
dengyihao's avatar
dengyihao 已提交
22 23
#include "ttimer.h"
#include "tutil.h"
H
hzcheng 已提交
24

25 26
#ifndef USE_UV

H
hzcheng 已提交
27 28 29 30 31 32
#define RPC_MAX_UDP_CONNS 256
#define RPC_MAX_UDP_PKTS 1000
#define RPC_UDP_BUF_TIME 5  // mseconds
#define RPC_MAX_UDP_SIZE 65480

typedef struct {
wafwerar's avatar
wafwerar 已提交
33 34 35 36 37
  int         index;
  TdSocketPtr pSocket;
  uint16_t    port;                   // peer port
  uint16_t    localPort;              // local port
  char        label[TSDB_LABEL_LEN];  // copy from udpConnSet;
wafwerar's avatar
wafwerar 已提交
38
  TdThread   thread;
wafwerar's avatar
wafwerar 已提交
39 40 41 42 43
  void       *hash;
  void       *shandle;  // handle passed by upper layer during server initialization
  void       *pSet;
  void       *(*processData)(SRecvInfo *pRecv);
  char       *buffer;  // buffer to receive data
H
hzcheng 已提交
44 45 46
} SUdpConn;

typedef struct {
dengyihao's avatar
dengyihao 已提交
47 48 49 50 51 52 53 54 55
  int      index;
  int      server;
  uint32_t ip;       // local IP
  uint16_t port;     // local Port
  void *   shandle;  // handle passed by upper layer during server initialization
  int      threads;
  char     label[TSDB_LABEL_LEN];
  void *(*fp)(SRecvInfo *pPacket);
  SUdpConn udpConn[];
H
hzcheng 已提交
56 57
} SUdpConnSet;

58
static void *taosRecvUdpData(void *param);
H
hzcheng 已提交
59

J
jtao1735 已提交
60
void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads, void *fp, void *shandle) {
dengyihao's avatar
dengyihao 已提交
61
  SUdpConn *   pConn;
J
Jeff Tao 已提交
62
  SUdpConnSet *pSet;
H
hzcheng 已提交
63 64

  int size = (int)sizeof(SUdpConnSet) + threads * (int)sizeof(SUdpConn);
wafwerar's avatar
wafwerar 已提交
65
  pSet = (SUdpConnSet *)taosMemoryMalloc((size_t)size);
H
hzcheng 已提交
66 67
  if (pSet == NULL) {
    tError("%s failed to allocate UdpConn", label);
68
    terrno = TAOS_SYSTEM_ERROR(errno);
H
hzcheng 已提交
69 70 71 72
    return NULL;
  }

  memset(pSet, 0, (size_t)size);
J
jtao1735 已提交
73
  pSet->ip = ip;
H
hzcheng 已提交
74 75 76
  pSet->port = port;
  pSet->shandle = shandle;
  pSet->fp = fp;
77
  pSet->threads = threads;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
78
  tstrncpy(pSet->label, label, sizeof(pSet->label));
H
hzcheng 已提交
79

wafwerar's avatar
wafwerar 已提交
80 81 82
  TdThreadAttr thAttr;
  taosThreadAttrInit(&thAttr);
  taosThreadAttrSetDetachState(&thAttr, PTHREAD_CREATE_JOINABLE);
83

dengyihao's avatar
dengyihao 已提交
84
  int      i;
L
lihui 已提交
85
  uint16_t ownPort;
86
  for (i = 0; i < threads; ++i) {
H
hzcheng 已提交
87
    pConn = pSet->udpConn + i;
L
lihui 已提交
88
    ownPort = (port ? port + i : 0);
wafwerar's avatar
wafwerar 已提交
89 90
    pConn->pSocket = taosOpenUdpSocket(ip, ownPort);
    if (pConn->pSocket == NULL) {
J
jtao1735 已提交
91
      tError("%s failed to open UDP socket %x:%hu", label, ip, port);
92
      break;
H
hzcheng 已提交
93 94
    }

wafwerar's avatar
wafwerar 已提交
95
    pConn->buffer = taosMemoryMalloc(RPC_MAX_UDP_SIZE);
96 97
    if (NULL == pConn->buffer) {
      tError("%s failed to malloc recv buffer", label);
98
      break;
99 100
    }

H
hzcheng 已提交
101
    struct sockaddr_in sin;
dengyihao's avatar
dengyihao 已提交
102
    unsigned int       addrlen = sizeof(sin);
wafwerar's avatar
wafwerar 已提交
103
    if (taosGetSocketName(pConn->pSocket, (struct sockaddr *)&sin, &addrlen) == 0 && sin.sin_family == AF_INET &&
dengyihao's avatar
dengyihao 已提交
104
        addrlen == sizeof(sin)) {
J
jtao1735 已提交
105
      pConn->localPort = (uint16_t)ntohs(sin.sin_port);
H
hzcheng 已提交
106 107
    }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
108
    tstrncpy(pConn->label, label, sizeof(pConn->label));
H
hzcheng 已提交
109 110 111 112
    pConn->shandle = shandle;
    pConn->processData = fp;
    pConn->index = i;
    pConn->pSet = pSet;
113

wafwerar's avatar
wafwerar 已提交
114
    int code = taosThreadCreate(&pConn->thread, &thAttr, taosRecvUdpData, pConn);
J
Jeff Tao 已提交
115
    if (code != 0) {
116 117
      tError("%s failed to create thread to process UDP data(%s)", label, strerror(errno));
      break;
118
    }
H
hzcheng 已提交
119 120
  }

wafwerar's avatar
wafwerar 已提交
121
  taosThreadAttrDestroy(&thAttr);
H
hzcheng 已提交
122

dengyihao's avatar
dengyihao 已提交
123
  if (i != threads) {
124 125 126 127 128
    terrno = TAOS_SYSTEM_ERROR(errno);
    taosCleanUpUdpConnection(pSet);
    return NULL;
  }

129
  tDebug("%s UDP connection is initialized, ip:%x:%hu threads:%d", label, ip, port, threads);
H
hzcheng 已提交
130 131 132
  return pSet;
}

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
133
void taosStopUdpConnection(void *handle) {
H
hzcheng 已提交
134
  SUdpConnSet *pSet = (SUdpConnSet *)handle;
dengyihao's avatar
dengyihao 已提交
135
  SUdpConn *   pConn;
H
hzcheng 已提交
136 137 138 139 140

  if (pSet == NULL) return;

  for (int i = 0; i < pSet->threads; ++i) {
    pConn = pSet->udpConn + i;
wafwerar's avatar
wafwerar 已提交
141 142 143
    if (pConn->pSocket != NULL) taosShutDownSocketRDWR(pConn->pSocket);
    if (pConn->pSocket != NULL) taosCloseSocket(&pConn->pSocket);
    pConn->pSocket = NULL;
144
  }
H
hzcheng 已提交
145

146 147
  for (int i = 0; i < pSet->threads; ++i) {
    pConn = pSet->udpConn + i;
148
    if (taosCheckPthreadValid(pConn->thread)) {
wafwerar's avatar
wafwerar 已提交
149
      taosThreadJoin(pConn->thread, NULL);
150
    }
wafwerar's avatar
wafwerar 已提交
151
    taosMemoryFreeClear(pConn->buffer);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
152 153 154
    // tTrace("%s UDP thread is closed, index:%d", pConn->label, i);
  }

155
  tDebug("%s UDP is stopped", pSet->label);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
156 157 158 159
}

void taosCleanUpUdpConnection(void *handle) {
  SUdpConnSet *pSet = (SUdpConnSet *)handle;
dengyihao's avatar
dengyihao 已提交
160
  SUdpConn *   pConn;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
161 162 163 164 165

  if (pSet == NULL) return;

  for (int i = 0; i < pSet->threads; ++i) {
    pConn = pSet->udpConn + i;
wafwerar's avatar
wafwerar 已提交
166
    if (pConn->pSocket != NULL) taosCloseSocket(&pConn->pSocket);
H
hzcheng 已提交
167 168
  }

169
  tDebug("%s UDP is cleaned up", pSet->label);
wafwerar's avatar
wafwerar 已提交
170
  taosMemoryFreeClear(pSet);
H
hzcheng 已提交
171 172
}

J
jtao1735 已提交
173
void *taosOpenUdpConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) {
H
hzcheng 已提交
174 175 176 177 178 179 180
  SUdpConnSet *pSet = (SUdpConnSet *)shandle;

  pSet->index = (pSet->index + 1) % pSet->threads;

  SUdpConn *pConn = pSet->udpConn + pSet->index;
  pConn->port = port;

181
  tDebug("%s UDP connection is setup, ip:%x:%hu localPort:%hu", pConn->label, ip, port, pConn->localPort);
H
hzcheng 已提交
182 183 184 185

  return pConn;
}

186
static void *taosRecvUdpData(void *param) {
dengyihao's avatar
dengyihao 已提交
187
  SUdpConn *         pConn = param;
188
  struct sockaddr_in sourceAdd;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
189
  ssize_t            dataLen;
190 191 192 193 194 195
  unsigned int       addLen;
  uint16_t           port;
  SRecvInfo          recvInfo;

  memset(&sourceAdd, 0, sizeof(sourceAdd));
  addLen = sizeof(sourceAdd);
196
  tDebug("%s UDP thread is created, index:%d", pConn->label, pConn->index);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
197
  char *msg = pConn->buffer;
198

199 200
  setThreadName("recvUdpData");

201
  while (1) {
wafwerar's avatar
wafwerar 已提交
202
    dataLen = taosReadFromSocket(pConn->pSocket, pConn->buffer, RPC_MAX_UDP_SIZE, 0, (struct sockaddr *)&sourceAdd, &addLen);
S
TD-1207  
Shengliang Guan 已提交
203
    if (dataLen <= 0) {
wafwerar's avatar
wafwerar 已提交
204
      tDebug("%s UDP socket was closed, exiting(%s), dataLen:%d", pConn->label, strerror(errno), (int32_t)dataLen);
S
TD-1207  
Shengliang Guan 已提交
205

206
      // for windows usage, remote shutdown also returns - 1 in windows client
wafwerar's avatar
wafwerar 已提交
207
      if (pConn->pSocket == NULL) {
208 209 210 211
        break;
      } else {
        continue;
      }
212 213
    }

J
jtao1735 已提交
214
    port = ntohs(sourceAdd.sin_port);
215 216

    if (dataLen < sizeof(SRpcHead)) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
217
      tError("%s recvfrom failed(%s)", pConn->label, strerror(errno));
218 219 220
      continue;
    }

S
TD-1762  
Shengliang Guan 已提交
221
    int32_t size = dataLen + tsRpcOverhead;
wafwerar's avatar
wafwerar 已提交
222
    char *  tmsg = taosMemoryMalloc(size);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
223
    if (NULL == tmsg) {
S
TD-1530  
Shengliang Guan 已提交
224
      tError("%s failed to allocate memory, size:%" PRId64, pConn->label, (int64_t)dataLen);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
225
      continue;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
226
    } else {
S
TD-1762  
Shengliang Guan 已提交
227
      tTrace("UDP malloc mem:%p size:%d", tmsg, size);
228 229
    }

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
230 231 232 233 234 235 236 237 238 239 240
    tmsg += tsRpcOverhead;  // overhead for SRpcReqContext
    memcpy(tmsg, msg, dataLen);
    recvInfo.msg = tmsg;
    recvInfo.msgLen = dataLen;
    recvInfo.ip = sourceAdd.sin_addr.s_addr;
    recvInfo.port = port;
    recvInfo.shandle = pConn->shandle;
    recvInfo.thandle = NULL;
    recvInfo.chandle = pConn;
    recvInfo.connType = 0;
    (*(pConn->processData))(&recvInfo);
241 242 243 244 245 246 247 248
  }

  return NULL;
}

int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *chandle) {
  SUdpConn *pConn = (SUdpConn *)chandle;

249
  if (pConn == NULL) return -1;
250

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
251 252 253 254 255
  struct sockaddr_in destAdd;
  memset(&destAdd, 0, sizeof(destAdd));
  destAdd.sin_family = AF_INET;
  destAdd.sin_addr.s_addr = ip;
  destAdd.sin_port = htons(port);
H
hzcheng 已提交
256

wafwerar's avatar
wafwerar 已提交
257
  int ret = taosSendto(pConn->pSocket, data, (size_t)dataLen, 0, (struct sockaddr *)&destAdd, sizeof(destAdd));
H
hzcheng 已提交
258

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
259
  return ret;
H
hzcheng 已提交
260
}
261
#endif