syncTcp.c 8.6 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17 18 19 20 21
#include "os.h"
#include "tulog.h"
#include "tutil.h"
#include "tsocket.h"
#include "taoserror.h"
22 23 24
#include "twal.h"
#include "tsync.h"
#include "syncInt.h"
S
Shengliang Guan 已提交
25
#include "syncTcp.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
26

S
TD-1207  
Shengliang Guan 已提交
27 28 29 30 31 32 33 34
#ifdef WINDOWS
#include "wepoll.h"
#endif

#ifndef EPOLLWAKEUP
  #define EPOLLWAKEUP (1u << 29)
#endif

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
35 36 37
typedef struct SThreadObj {
  pthread_t thread;
  bool      stop;
S
Shengliang Guan 已提交
38
  int32_t   pollFd;
39
  int32_t   numOfFds;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
40 41 42 43 44 45 46
  struct SPoolObj *pPool;
} SThreadObj;

typedef struct SPoolObj {
  SPoolInfo    info;
  SThreadObj **pThread;
  pthread_t    thread;
47
  int32_t      nextId;
S
Shengliang Guan 已提交
48
  int32_t      acceptFd;  // FD for accept new connection
S
TD-1207  
Shengliang Guan 已提交
49
  int8_t       stop;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
50 51 52 53
} SPoolObj;

typedef struct {
  SThreadObj *pThread;
S
TD-2524  
Shengliang Guan 已提交
54
  int64_t     handleId;
S
Shengliang Guan 已提交
55
  int32_t     fd;
56
  int32_t     closedByApp;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
57 58
} SConnObj;

S
Shengliang Guan 已提交
59 60 61 62
static void *syncAcceptPeerTcpConnection(void *argv);
static void *syncProcessTcpData(void *param);
static void  syncStopPoolThread(SThreadObj *pThread);
static SThreadObj *syncGetTcpThread(SPoolObj *pPool);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
63

S
Shengliang Guan 已提交
64
void *syncOpenTcpThreadPool(SPoolInfo *pInfo) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
65 66 67 68
  pthread_attr_t thattr;

  SPoolObj *pPool = calloc(sizeof(SPoolObj), 1);
  if (pPool == NULL) {
69
    sError("failed to alloc pool for TCP server since no enough memory");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
70 71 72 73
    return NULL;
  }

  pPool->info = *pInfo;
S
TD-1617  
Shengliang Guan 已提交
74

75
  pPool->pThread = calloc(sizeof(SThreadObj *), pInfo->numOfThreads);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
76
  if (pPool->pThread == NULL) {
77 78
    sError("failed to alloc pool thread for TCP server since no enough memory");
    tfree(pPool);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
79 80 81 82 83
    return NULL;
  }

  pPool->acceptFd = taosOpenTcpServerSocket(pInfo->serverIp, pInfo->port);
  if (pPool->acceptFd < 0) {
84 85 86
    tfree(pPool->pThread);
    tfree(pPool);
    sError("failed to create TCP server socket, port:%d (%s)", pInfo->port, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
87 88 89 90 91
    return NULL;
  }

  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
92
  if (pthread_create(&(pPool->thread), &thattr, (void *)syncAcceptPeerTcpConnection, pPool) != 0) {
93
    sError("failed to create accept thread for TCP server since %s", strerror(errno));
S
TD-1207  
Shengliang Guan 已提交
94
    taosCloseSocket(pPool->acceptFd);
95 96
    tfree(pPool->pThread);
    tfree(pPool);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
97 98 99 100 101
    return NULL;
  }

  pthread_attr_destroy(&thattr);

102
  sDebug("%p TCP pool is created", pPool);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
103 104 105
  return pPool;
}

S
Shengliang Guan 已提交
106
void syncCloseTcpThreadPool(void *param) {
107
  SPoolObj *  pPool = param;
S
TD-1617  
Shengliang Guan 已提交
108
  SThreadObj *pThread;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
109

S
TD-1207  
Shengliang Guan 已提交
110 111 112 113 114
  pPool->stop = 1;

#ifdef WINDOWS
  closesocket(pPool->acceptFd);
#else
S
TD-1617  
Shengliang Guan 已提交
115
  shutdown(pPool->acceptFd, SHUT_RD);
S
TD-1207  
Shengliang Guan 已提交
116 117
#endif

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
118 119
  pthread_join(pPool->thread, NULL);

120
  for (int32_t i = 0; i < pPool->info.numOfThreads; ++i) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
121
    pThread = pPool->pThread[i];
S
Shengliang Guan 已提交
122
    if (pThread) syncStopPoolThread(pThread);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
123 124
  }

125
  sDebug("%p TCP pool is closed", pPool);
S
TD-1617  
Shengliang Guan 已提交
126

S
TD-1848  
Shengliang Guan 已提交
127
  tfree(pPool->pThread);
128
  tfree(pPool);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
129 130
}

S
Shengliang Guan 已提交
131
void *syncAllocateTcpConn(void *param, int64_t rid, int32_t connFd) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
132
  struct epoll_event event;
133
  SPoolObj *pPool = param;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
134

135
  SConnObj *pConn = calloc(sizeof(SConnObj), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
136
  if (pConn == NULL) {
S
TD-1617  
Shengliang Guan 已提交
137
    terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
138 139 140
    return NULL;
  }

S
Shengliang Guan 已提交
141
  SThreadObj *pThread = syncGetTcpThread(pPool);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
142
  if (pThread == NULL) {
143
    tfree(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
144 145 146 147 148
    return NULL;
  }

  pConn->fd = connFd;
  pConn->pThread = pThread;
S
TD-2524  
Shengliang Guan 已提交
149
  pConn->handleId = rid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
150 151 152 153 154 155
  pConn->closedByApp = 0;

  event.events = EPOLLIN | EPOLLRDHUP;
  event.data.ptr = pConn;

  if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
156
    sError("failed to add fd:%d since %s", connFd, strerror(errno));
S
TD-1617  
Shengliang Guan 已提交
157
    terrno = TAOS_SYSTEM_ERROR(errno);
158
    tfree(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
159 160 161
    pConn = NULL;
  } else {
    pThread->numOfFds++;
162
    sDebug("%p fd:%d is added to epoll thread, num:%d", pThread, connFd, pThread->numOfFds);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
163 164 165 166 167
  }

  return pConn;
}

S
Shengliang Guan 已提交
168
void syncFreeTcpConn(void *param) {
S
TD-2157  
Shengliang Guan 已提交
169
  SConnObj *  pConn = param;
S
TD-1617  
Shengliang Guan 已提交
170
  SThreadObj *pThread = pConn->pThread;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
171

172
  sDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
173 174 175 176 177 178
  pConn->closedByApp = 1;
  shutdown(pConn->fd, SHUT_WR);
}

static void taosProcessBrokenLink(SConnObj *pConn) {
  SThreadObj *pThread = pConn->pThread;
S
TD-1617  
Shengliang Guan 已提交
179 180 181
  SPoolObj *  pPool = pThread->pPool;
  SPoolInfo * pInfo = &pPool->info;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
182
  if (pConn->closedByApp == 0) shutdown(pConn->fd, SHUT_WR);
S
TD-2524  
Shengliang Guan 已提交
183
  (*pInfo->processBrokenLink)(pConn->handleId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
184 185 186

  pThread->numOfFds--;
  epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL);
187
  sDebug("%p fd:%d is removed from epoll thread, num:%d", pThread, pConn->fd, pThread->numOfFds);
S
TD-1207  
Shengliang Guan 已提交
188
  taosCloseSocket(pConn->fd);
189
  tfree(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
190 191 192 193
}

#define maxEvents 10

S
Shengliang Guan 已提交
194
static void *syncProcessTcpData(void *param) {
S
TD-1617  
Shengliang Guan 已提交
195 196 197 198
  SThreadObj *pThread = (SThreadObj *)param;
  SPoolObj *  pPool = pThread->pPool;
  SPoolInfo * pInfo = &pPool->info;
  SConnObj *  pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
199 200 201 202 203 204
  struct epoll_event events[maxEvents];

  void *buffer = malloc(pInfo->bufferSize);
  taosBlockSIGPIPE();

  while (1) {
S
TD-1617  
Shengliang Guan 已提交
205
    if (pThread->stop) break;
206
    int32_t fdNum = epoll_wait(pThread->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
207
    if (pThread->stop) {
208
      sDebug("%p TCP epoll thread is exiting...", pThread);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
209 210 211
      break;
    }

S
TD-1617  
Shengliang Guan 已提交
212
    if (fdNum < 0) {
213
      sError("epoll_wait failed since %s", strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
214 215 216
      continue;
    }

217
    for (int32_t i = 0; i < fdNum; ++i) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
218 219 220 221
      pConn = events[i].data.ptr;
      assert(pConn);

      if (events[i].events & EPOLLERR) {
S
TD-2211  
Shengliang Guan 已提交
222
        sDebug("conn is broken since EPOLLERR");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
223 224 225 226 227
        taosProcessBrokenLink(pConn);
        continue;
      }

      if (events[i].events & EPOLLHUP) {
S
TD-2211  
Shengliang Guan 已提交
228
        sDebug("conn is broken since EPOLLHUP");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
229 230 231 232 233
        taosProcessBrokenLink(pConn);
        continue;
      }

      if (events[i].events & EPOLLRDHUP) {
S
TD-2211  
Shengliang Guan 已提交
234
        sDebug("conn is broken since EPOLLRDHUP");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
235 236 237 238 239
        taosProcessBrokenLink(pConn);
        continue;
      }

      if (pConn->closedByApp == 0) {
S
TD-2524  
Shengliang Guan 已提交
240
        if ((*pInfo->processIncomingMsg)(pConn->handleId, buffer) < 0) {
S
Shengliang Guan 已提交
241
          syncFreeTcpConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
242 243
          continue;
        }
S
TD-1617  
Shengliang Guan 已提交
244
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
245
    }
陶建辉(Jeff)'s avatar
TD-1645  
陶建辉(Jeff) 已提交
246 247

    if (pThread->stop) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
248 249
  }

250
  sDebug("%p TCP epoll thread exits", pThread);
S
TD-1617  
Shengliang Guan 已提交
251

S
TD-1207  
Shengliang Guan 已提交
252
  taosCloseSocket(pThread->pollFd);
253 254
  tfree(pThread);
  tfree(buffer);
S
TD-1617  
Shengliang Guan 已提交
255
  return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
256 257
}

S
Shengliang Guan 已提交
258
static void *syncAcceptPeerTcpConnection(void *argv) {
S
TD-1617  
Shengliang Guan 已提交
259 260
  SPoolObj * pPool = (SPoolObj *)argv;
  SPoolInfo *pInfo = &pPool->info;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
261 262 263 264 265 266

  taosBlockSIGPIPE();

  while (1) {
    struct sockaddr_in clientAddr;
    socklen_t addrlen = sizeof(clientAddr);
S
Shengliang Guan 已提交
267
    int32_t connFd = (int32_t)accept(pPool->acceptFd, (struct sockaddr *)&clientAddr, &addrlen);
S
TD-1207  
Shengliang Guan 已提交
268 269 270 271 272
    if (pPool->stop) {
      sDebug("%p TCP server accept is stopped", pPool);
      break;
    }

S
Shengliang Guan 已提交
273
    if ((int32_t)connFd < 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
274
      if (errno == EINVAL) {
275
        sDebug("%p TCP server accept is exiting...", pPool);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
276 277
        break;
      } else {
278
        sError("TCP accept failure since %s", strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
279 280 281 282
        continue;
      }
    }

283
    // sDebug("TCP connection from: 0x%x:%d", clientAddr.sin_addr.s_addr, clientAddr.sin_port);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
284 285 286 287
    taosKeepTcpAlive(connFd);
    (*pInfo->processIncomingConn)(connFd, clientAddr.sin_addr.s_addr);
  }

S
TD-1207  
Shengliang Guan 已提交
288
  taosCloseSocket(pPool->acceptFd);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
289 290 291
  return NULL;
}

S
Shengliang Guan 已提交
292
static SThreadObj *syncGetTcpThread(SPoolObj *pPool) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
293 294 295 296
  SThreadObj *pThread = pPool->pThread[pPool->nextId];

  if (pThread) return pThread;

S
TD-1617  
Shengliang Guan 已提交
297
  pThread = (SThreadObj *)calloc(1, sizeof(SThreadObj));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
298 299 300
  if (pThread == NULL) return NULL;

  pThread->pPool = pPool;
S
Shengliang Guan 已提交
301
  pThread->pollFd = (int32_t)epoll_create(10);  // size does not matter
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
302
  if (pThread->pollFd < 0) {
303
    tfree(pThread);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
304 305 306 307 308 309
    return NULL;
  }

  pthread_attr_t thattr;
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
310
  int32_t ret = pthread_create(&(pThread->thread), &thattr, (void *)syncProcessTcpData, pThread);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
311 312 313
  pthread_attr_destroy(&thattr);

  if (ret != 0) {
S
TD-1207  
Shengliang Guan 已提交
314
    taosCloseSocket(pThread->pollFd);
315
    tfree(pThread);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
316 317 318
    return NULL;
  }

319
  sDebug("%p TCP epoll thread is created", pThread);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
320 321 322 323 324 325 326
  pPool->pThread[pPool->nextId] = pThread;
  pPool->nextId++;
  pPool->nextId = pPool->nextId % pPool->info.numOfThreads;

  return pThread;
}

S
Shengliang Guan 已提交
327
static void syncStopPoolThread(SThreadObj *pThread) {
dengyihao's avatar
dengyihao 已提交
328
  pthread_t thread = pThread->thread;
329 330 331
  if (!taosCheckPthreadValid(thread)) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
332
  pThread->stop = true;
dengyihao's avatar
dengyihao 已提交
333
  if (taosComparePthread(thread, pthread_self())) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
334 335 336 337 338
    pthread_detach(pthread_self());
    return;
  }
  pthread_join(thread, NULL);
}