syncTcp.c 8.5 KB
Newer Older
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
/*
 * Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
 *
 * This program is free software: you can use, redistribute, and/or modify
 * it under the terms of the GNU Affero General Public License, version 3
 * or later ("AGPL"), as published by the Free Software Foundation.
 *
 * This program is distributed in the hope that it will be useful, but WITHOUT
 * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
 * FITNESS FOR A PARTICULAR PURPOSE.
 *
 * You should have received a copy of the GNU Affero General Public License
 * along with this program. If not, see <http://www.gnu.org/licenses/>.
 */

16
#define _DEFAULT_SOURCE
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
17 18 19 20 21
#include "os.h"
#include "tulog.h"
#include "tutil.h"
#include "tsocket.h"
#include "taoserror.h"
22 23 24
#include "twal.h"
#include "tsync.h"
#include "syncInt.h"
S
Shengliang Guan 已提交
25
#include "syncTcp.h"
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
26 27 28 29

typedef struct SThreadObj {
  pthread_t thread;
  bool      stop;
30
  SOCKET    pollFd;
31
  int32_t   numOfFds;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
32 33 34 35 36 37 38
  struct SPoolObj *pPool;
} SThreadObj;

typedef struct SPoolObj {
  SPoolInfo    info;
  SThreadObj **pThread;
  pthread_t    thread;
39
  int32_t      nextId;
40
  SOCKET       acceptFd;  // FD for accept new connection
S
TD-1207  
Shengliang Guan 已提交
41
  int8_t       stop;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
42 43 44 45
} SPoolObj;

typedef struct {
  SThreadObj *pThread;
S
TD-2524  
Shengliang Guan 已提交
46
  int64_t     handleId;
47
  SOCKET      fd;
48
  int32_t     closedByApp;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
49 50
} SConnObj;

S
Shengliang Guan 已提交
51 52 53 54
static void *syncAcceptPeerTcpConnection(void *argv);
static void *syncProcessTcpData(void *param);
static void  syncStopPoolThread(SThreadObj *pThread);
static SThreadObj *syncGetTcpThread(SPoolObj *pPool);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
55

S
Shengliang Guan 已提交
56
void *syncOpenTcpThreadPool(SPoolInfo *pInfo) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
57 58 59 60
  pthread_attr_t thattr;

  SPoolObj *pPool = calloc(sizeof(SPoolObj), 1);
  if (pPool == NULL) {
61
    sError("failed to alloc pool for TCP server since no enough memory");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
62 63 64 65
    return NULL;
  }

  pPool->info = *pInfo;
S
TD-1617  
Shengliang Guan 已提交
66

67
  pPool->pThread = calloc(sizeof(SThreadObj *), pInfo->numOfThreads);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
68
  if (pPool->pThread == NULL) {
69 70
    sError("failed to alloc pool thread for TCP server since no enough memory");
    tfree(pPool);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
71 72 73 74 75
    return NULL;
  }

  pPool->acceptFd = taosOpenTcpServerSocket(pInfo->serverIp, pInfo->port);
  if (pPool->acceptFd < 0) {
76 77 78
    tfree(pPool->pThread);
    tfree(pPool);
    sError("failed to create TCP server socket, port:%d (%s)", pInfo->port, strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
79 80 81 82 83
    return NULL;
  }

  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
84
  if (pthread_create(&(pPool->thread), &thattr, (void *)syncAcceptPeerTcpConnection, pPool) != 0) {
85
    sError("failed to create accept thread for TCP server since %s", strerror(errno));
S
TD-1207  
Shengliang Guan 已提交
86
    taosCloseSocket(pPool->acceptFd);
87 88
    tfree(pPool->pThread);
    tfree(pPool);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
89 90 91 92 93
    return NULL;
  }

  pthread_attr_destroy(&thattr);

94
  sDebug("%p TCP pool is created", pPool);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
95 96 97
  return pPool;
}

S
Shengliang Guan 已提交
98
void syncCloseTcpThreadPool(void *param) {
99
  SPoolObj *  pPool = param;
S
TD-1617  
Shengliang Guan 已提交
100
  SThreadObj *pThread;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
101

S
TD-1207  
Shengliang Guan 已提交
102 103 104 105 106
  pPool->stop = 1;

#ifdef WINDOWS
  closesocket(pPool->acceptFd);
#else
S
TD-1617  
Shengliang Guan 已提交
107
  shutdown(pPool->acceptFd, SHUT_RD);
S
TD-1207  
Shengliang Guan 已提交
108 109
#endif

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
110 111
  pthread_join(pPool->thread, NULL);

112
  for (int32_t i = 0; i < pPool->info.numOfThreads; ++i) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
113
    pThread = pPool->pThread[i];
S
Shengliang Guan 已提交
114
    if (pThread) syncStopPoolThread(pThread);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
115 116
  }

117
  sDebug("%p TCP pool is closed", pPool);
S
TD-1617  
Shengliang Guan 已提交
118

S
TD-1848  
Shengliang Guan 已提交
119
  tfree(pPool->pThread);
120
  tfree(pPool);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
121 122
}

123
void *syncAllocateTcpConn(void *param, int64_t rid, SOCKET connFd) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
124
  struct epoll_event event;
125
  SPoolObj *pPool = param;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
126

127
  SConnObj *pConn = calloc(sizeof(SConnObj), 1);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
128
  if (pConn == NULL) {
S
TD-1617  
Shengliang Guan 已提交
129
    terrno = TAOS_SYSTEM_ERROR(errno);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
130 131 132
    return NULL;
  }

S
Shengliang Guan 已提交
133
  SThreadObj *pThread = syncGetTcpThread(pPool);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
134
  if (pThread == NULL) {
135
    tfree(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
136 137 138 139 140
    return NULL;
  }

  pConn->fd = connFd;
  pConn->pThread = pThread;
S
TD-2524  
Shengliang Guan 已提交
141
  pConn->handleId = rid;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
142 143 144 145 146 147
  pConn->closedByApp = 0;

  event.events = EPOLLIN | EPOLLRDHUP;
  event.data.ptr = pConn;

  if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) {
148
    sError("failed to add fd:%d since %s", connFd, strerror(errno));
S
TD-1617  
Shengliang Guan 已提交
149
    terrno = TAOS_SYSTEM_ERROR(errno);
150
    tfree(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
151 152 153
    pConn = NULL;
  } else {
    pThread->numOfFds++;
154
    sDebug("%p fd:%d is added to epoll thread, num:%d", pThread, connFd, pThread->numOfFds);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
155 156 157 158 159
  }

  return pConn;
}

S
Shengliang Guan 已提交
160
void syncFreeTcpConn(void *param) {
S
TD-2157  
Shengliang Guan 已提交
161
  SConnObj *  pConn = param;
S
TD-1617  
Shengliang Guan 已提交
162
  SThreadObj *pThread = pConn->pThread;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
163

164
  sDebug("%p TCP connection will be closed, fd:%d", pThread, pConn->fd);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
165 166 167 168 169 170
  pConn->closedByApp = 1;
  shutdown(pConn->fd, SHUT_WR);
}

static void taosProcessBrokenLink(SConnObj *pConn) {
  SThreadObj *pThread = pConn->pThread;
S
TD-1617  
Shengliang Guan 已提交
171 172 173
  SPoolObj *  pPool = pThread->pPool;
  SPoolInfo * pInfo = &pPool->info;

陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
174
  if (pConn->closedByApp == 0) shutdown(pConn->fd, SHUT_WR);
S
TD-2524  
Shengliang Guan 已提交
175
  (*pInfo->processBrokenLink)(pConn->handleId);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
176 177 178

  pThread->numOfFds--;
  epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pConn->fd, NULL);
179
  sDebug("%p fd:%d is removed from epoll thread, num:%d", pThread, pConn->fd, pThread->numOfFds);
S
TD-1207  
Shengliang Guan 已提交
180
  taosCloseSocket(pConn->fd);
181
  tfree(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
182 183 184 185
}

#define maxEvents 10

S
Shengliang Guan 已提交
186
static void *syncProcessTcpData(void *param) {
S
TD-1617  
Shengliang Guan 已提交
187 188 189 190
  SThreadObj *pThread = (SThreadObj *)param;
  SPoolObj *  pPool = pThread->pPool;
  SPoolInfo * pInfo = &pPool->info;
  SConnObj *  pConn = NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
191 192 193 194 195 196
  struct epoll_event events[maxEvents];

  void *buffer = malloc(pInfo->bufferSize);
  taosBlockSIGPIPE();

  while (1) {
S
TD-1617  
Shengliang Guan 已提交
197
    if (pThread->stop) break;
198
    int32_t fdNum = epoll_wait(pThread->pollFd, events, maxEvents, TAOS_EPOLL_WAIT_TIME);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
199
    if (pThread->stop) {
200
      sDebug("%p TCP epoll thread is exiting...", pThread);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
201 202 203
      break;
    }

S
TD-1617  
Shengliang Guan 已提交
204
    if (fdNum < 0) {
205
      sError("epoll_wait failed since %s", strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
206 207 208
      continue;
    }

209
    for (int32_t i = 0; i < fdNum; ++i) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
210 211 212 213
      pConn = events[i].data.ptr;
      assert(pConn);

      if (events[i].events & EPOLLERR) {
S
TD-2211  
Shengliang Guan 已提交
214
        sDebug("conn is broken since EPOLLERR");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
215 216 217 218 219
        taosProcessBrokenLink(pConn);
        continue;
      }

      if (events[i].events & EPOLLHUP) {
S
TD-2211  
Shengliang Guan 已提交
220
        sDebug("conn is broken since EPOLLHUP");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
221 222 223 224 225
        taosProcessBrokenLink(pConn);
        continue;
      }

      if (events[i].events & EPOLLRDHUP) {
S
TD-2211  
Shengliang Guan 已提交
226
        sDebug("conn is broken since EPOLLRDHUP");
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
227 228 229 230 231
        taosProcessBrokenLink(pConn);
        continue;
      }

      if (pConn->closedByApp == 0) {
S
TD-2524  
Shengliang Guan 已提交
232
        if ((*pInfo->processIncomingMsg)(pConn->handleId, buffer) < 0) {
S
Shengliang Guan 已提交
233
          syncFreeTcpConn(pConn);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
234 235
          continue;
        }
S
TD-1617  
Shengliang Guan 已提交
236
      }
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
237
    }
陶建辉(Jeff)'s avatar
TD-1645  
陶建辉(Jeff) 已提交
238 239

    if (pThread->stop) break;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
240 241
  }

242
  sDebug("%p TCP epoll thread exits", pThread);
S
TD-1617  
Shengliang Guan 已提交
243

244
  EpollClose(pThread->pollFd);
245 246
  tfree(pThread);
  tfree(buffer);
S
TD-1617  
Shengliang Guan 已提交
247
  return NULL;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
248 249
}

S
Shengliang Guan 已提交
250
static void *syncAcceptPeerTcpConnection(void *argv) {
S
TD-1617  
Shengliang Guan 已提交
251 252
  SPoolObj * pPool = (SPoolObj *)argv;
  SPoolInfo *pInfo = &pPool->info;
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
253 254 255 256 257 258

  taosBlockSIGPIPE();

  while (1) {
    struct sockaddr_in clientAddr;
    socklen_t addrlen = sizeof(clientAddr);
259
    SOCKET connFd = accept(pPool->acceptFd, (struct sockaddr *)&clientAddr, &addrlen);
S
TD-1207  
Shengliang Guan 已提交
260 261 262 263 264
    if (pPool->stop) {
      sDebug("%p TCP server accept is stopped", pPool);
      break;
    }

265
    if (connFd < 0) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
266
      if (errno == EINVAL) {
267
        sDebug("%p TCP server accept is exiting...", pPool);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
268 269
        break;
      } else {
270
        sError("TCP accept failure since %s", strerror(errno));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
271 272 273 274
        continue;
      }
    }

275
    // sDebug("TCP connection from: 0x%x:%d", clientAddr.sin_addr.s_addr, clientAddr.sin_port);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
276 277 278 279
    taosKeepTcpAlive(connFd);
    (*pInfo->processIncomingConn)(connFd, clientAddr.sin_addr.s_addr);
  }

S
TD-1207  
Shengliang Guan 已提交
280
  taosCloseSocket(pPool->acceptFd);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
281 282 283
  return NULL;
}

S
Shengliang Guan 已提交
284
static SThreadObj *syncGetTcpThread(SPoolObj *pPool) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
285 286 287 288
  SThreadObj *pThread = pPool->pThread[pPool->nextId];

  if (pThread) return pThread;

S
TD-1617  
Shengliang Guan 已提交
289
  pThread = (SThreadObj *)calloc(1, sizeof(SThreadObj));
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
290 291 292
  if (pThread == NULL) return NULL;

  pThread->pPool = pPool;
293
  pThread->pollFd = (EpollFd)epoll_create(10);  // size does not matter
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
294
  if (pThread->pollFd < 0) {
295
    tfree(pThread);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
296 297 298 299 300 301
    return NULL;
  }

  pthread_attr_t thattr;
  pthread_attr_init(&thattr);
  pthread_attr_setdetachstate(&thattr, PTHREAD_CREATE_JOINABLE);
S
Shengliang Guan 已提交
302
  int32_t ret = pthread_create(&(pThread->thread), &thattr, (void *)syncProcessTcpData, pThread);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
303 304 305
  pthread_attr_destroy(&thattr);

  if (ret != 0) {
S
TD-1207  
Shengliang Guan 已提交
306
    taosCloseSocket(pThread->pollFd);
307
    tfree(pThread);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
308 309 310
    return NULL;
  }

311
  sDebug("%p TCP epoll thread is created", pThread);
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
312 313 314 315 316 317 318
  pPool->pThread[pPool->nextId] = pThread;
  pPool->nextId++;
  pPool->nextId = pPool->nextId % pPool->info.numOfThreads;

  return pThread;
}

S
Shengliang Guan 已提交
319
static void syncStopPoolThread(SThreadObj *pThread) {
dengyihao's avatar
dengyihao 已提交
320
  pthread_t thread = pThread->thread;
321 322 323
  if (!taosCheckPthreadValid(thread)) {
    return;
  }
dengyihao's avatar
dengyihao 已提交
324
  pThread->stop = true;
dengyihao's avatar
dengyihao 已提交
325
  if (taosComparePthread(thread, pthread_self())) {
陶建辉(Jeff)'s avatar
陶建辉(Jeff) 已提交
326 327 328 329 330
    pthread_detach(pthread_self());
    return;
  }
  pthread_join(thread, NULL);
}