From a368b7b40afccb1bf6b4568f2db3c06354aa496b Mon Sep 17 00:00:00 2001 From: Shengliang Guan Date: Sun, 24 Jan 2021 10:24:44 +0800 Subject: [PATCH] TD-1207 change int32_t to socket --- src/common/inc/tdataformat.h | 22 +++++++++---------- src/dnode/src/dnodeCheck.c | 16 +++++++------- src/dnode/src/dnodeTelemetry.c | 2 +- src/os/inc/osFile.h | 2 +- src/os/inc/osSocket.h | 7 ++++-- src/os/inc/osWindows.h | 6 +++++ src/os/src/darwin/darwinFile.c | 2 +- src/os/src/darwin/darwinSocket.c | 2 +- src/os/src/detail/osFile.c | 2 +- src/os/src/detail/osSocket.c | 4 ++-- src/os/src/linux/linuxEnv.c | 4 ---- src/os/src/windows/wFile.c | 2 +- src/os/src/windows/wSocket.c | 4 ++-- src/os/src/windows/wSysinfo.c | 2 +- src/plugins/http/inc/httpContext.h | 2 +- src/plugins/http/inc/httpInt.h | 7 +++--- src/plugins/http/src/httpContext.c | 10 ++++++--- src/plugins/http/src/httpServer.c | 12 +++++----- src/rpc/src/rpcTcp.c | 35 ++++++++++++------------------ src/rpc/src/rpcUdp.c | 2 +- src/sync/inc/syncInt.h | 4 ++-- src/sync/inc/syncTcp.h | 4 ++-- src/sync/src/syncArbitrator.c | 6 ++--- src/sync/src/syncMain.c | 8 +++---- src/sync/src/syncTcp.c | 24 +++++++------------- src/util/inc/tsocket.h | 26 ++++++++++------------ src/util/src/tnettest.c | 21 +++++++++--------- src/util/src/tsocket.c | 30 ++++++++++++------------- 28 files changed, 129 insertions(+), 139 deletions(-) diff --git a/src/common/inc/tdataformat.h b/src/common/inc/tdataformat.h index b6ad99f02a..4999fbb9bf 100644 --- a/src/common/inc/tdataformat.h +++ b/src/common/inc/tdataformat.h @@ -27,23 +27,23 @@ extern "C" { #endif -#define STR_TO_VARSTR(x, str) \ - do { \ - VarDataLenT __len = (int32_t)strlen(str); \ - *(VarDataLenT *)(x) = __len; \ - memcpy(varDataVal(x), (str), __len); \ +#define STR_TO_VARSTR(x, str) \ + do { \ + VarDataLenT __len = (int32_t)strlen(str); \ + *(VarDataLenT *)(x) = __len; \ + memcpy(varDataVal(x), (str), __len); \ } while (0); -#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \ - do { \ +#define STR_WITH_MAXSIZE_TO_VARSTR(x, str, _maxs) \ + do { \ char *_e = stpncpy(varDataVal(x), (str), (_maxs)-VARSTR_HEADER_SIZE); \ - varDataSetLen(x, (_e - (x)-VARSTR_HEADER_SIZE)); \ + varDataSetLen(x, (_e - (x)-VARSTR_HEADER_SIZE)); \ } while (0) #define STR_WITH_SIZE_TO_VARSTR(x, str, _size) \ do { \ - *(VarDataLenT *)(x) = (int32_t)(_size); \ - memcpy(varDataVal(x), (str), (_size)); \ + *(VarDataLenT *)(x) = (int32_t)(_size); \ + memcpy(varDataVal(x), (str), (_size)); \ } while (0); // ----------------- TSDB COLUMN DEFINITION @@ -156,7 +156,7 @@ static FORCE_INLINE int tkeyComparFn(const void *tkey1, const void *tkey2) { * +----------+----------+---------------------------------+---------------------------------+ * | len | sversion | First part | Second part | * +----------+----------+---------------------------------+---------------------------------+ - * + * * NOTE: timestamp in this row structure is TKEY instead of TSKEY */ typedef void *SDataRow; diff --git a/src/dnode/src/dnodeCheck.c b/src/dnode/src/dnodeCheck.c index 2c8dcfdcc9..94d2360950 100644 --- a/src/dnode/src/dnodeCheck.c +++ b/src/dnode/src/dnodeCheck.c @@ -29,11 +29,11 @@ typedef struct { static SCheckItem tsCheckItem[TSDB_CHECK_ITEM_MAX] = {{0}}; int64_t tsMinFreeMemSizeForStart = 0; -static int32_t bindTcpPort(int32_t port) { - int32_t serverSocket; +static int32_t bindTcpPort(int16_t port) { + SOCKET serverSocket; struct sockaddr_in server_addr; - if ((serverSocket = ( int32_t)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { + if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { dError("socket() fail: %s", strerror(errno)); return -1; } @@ -59,11 +59,11 @@ static int32_t bindTcpPort(int32_t port) { return 0; } -static int32_t bindUdpPort(int32_t port) { - int32_t serverSocket; +static int32_t bindUdpPort(int16_t port) { + SOCKET serverSocket; struct sockaddr_in server_addr; - if ((serverSocket = (int32_t)socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { + if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { dError("socket() fail: %s", strerror(errno)); return -1; } @@ -85,9 +85,9 @@ static int32_t bindUdpPort(int32_t port) { static int32_t dnodeCheckNetwork() { int32_t ret; - int32_t startPort = tsServerPort; + int16_t startPort = tsServerPort; - for (int32_t port = startPort; port < startPort + 12; port++) { + for (int16_t port = startPort; port < startPort + 12; port++) { ret = bindTcpPort(port); if (0 != ret) { dError("failed to tcp bind port %d, quit", port); diff --git a/src/dnode/src/dnodeTelemetry.c b/src/dnode/src/dnodeTelemetry.c index d1b7127dda..7a9cf45431 100644 --- a/src/dnode/src/dnodeTelemetry.c +++ b/src/dnode/src/dnodeTelemetry.c @@ -200,7 +200,7 @@ static void sendTelemetryReport() { dTrace("failed to get IP address of " TELEMETRY_SERVER ", reason:%s", strerror(errno)); return; } - int32_t fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0); + SOCKET fd = taosOpenTcpClientSocket(ip, TELEMETRY_PORT, 0); if (fd < 0) { dTrace("failed to create socket for telemetry, reason:%s", strerror(errno)); return; diff --git a/src/os/inc/osFile.h b/src/os/inc/osFile.h index d72f9cefb9..19cc78472c 100644 --- a/src/os/inc/osFile.h +++ b/src/os/inc/osFile.h @@ -39,7 +39,7 @@ int32_t taosRenameFile(char *fullPath, char *suffix, char delimiter, char **dstP } // TAOS_OS_FUNC_FILE_SENDIFLE -int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size); +int64_t taosSendFile(SOCKET dfd, int32_t sfd, int64_t *offset, int64_t size); int64_t taosFSendFile(FILE *outfile, FILE *infile, int64_t *offset, int64_t size); #ifdef TAOS_RANDOM_FILE_FAIL diff --git a/src/os/inc/osSocket.h b/src/os/inc/osSocket.h index 9683adf45a..111fca712c 100644 --- a/src/os/inc/osSocket.h +++ b/src/os/inc/osSocket.h @@ -37,6 +37,9 @@ extern "C" { #ifndef TAOS_OS_DEF_EPOLL #define TAOS_EPOLL_WAIT_TIME 500 + typedef int32_t SOCKET; + typedef SOCKET EpollFd; + #define EpollClose(pollFd) taosCloseSocket(pollFd) #endif #ifdef TAOS_RANDOM_NETWORK_FAIL @@ -57,13 +60,13 @@ extern "C" { #endif // TAOS_OS_FUNC_SOCKET -int32_t taosSetNonblocking(int32_t sock, int32_t on); +int32_t taosSetNonblocking(SOCKET sock, int32_t on); void taosIgnSIGPIPE(); void taosBlockSIGPIPE(); void taosSetMaskSIGPIPE(); // TAOS_OS_FUNC_SOCKET_SETSOCKETOPT -int32_t taosSetSockOpt(int32_t socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen); +int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen); // TAOS_OS_FUNC_SOCKET_INET uint32_t taosInetAddr(char *ipAddr); diff --git a/src/os/inc/osWindows.h b/src/os/inc/osWindows.h index a984a5ba88..1f3b1b02e3 100644 --- a/src/os/inc/osWindows.h +++ b/src/os/inc/osWindows.h @@ -93,6 +93,12 @@ typedef SOCKET eventfd_t; #define TAOS_OS_DEF_EPOLL #define TAOS_EPOLL_WAIT_TIME 100 + typedef SOCKET EpollFd; + #define EpollClose(pollFd) epoll_close(pollFd) + +#ifndef EPOLLWAKEUP + #define EPOLLWAKEUP (1u << 29) +#endif #define TAOS_OS_DEF_ZU #define PRIzu "ld" diff --git a/src/os/src/darwin/darwinFile.c b/src/os/src/darwin/darwinFile.c index 00c1d8532f..1e77cd68d8 100644 --- a/src/os/src/darwin/darwinFile.c +++ b/src/os/src/darwin/darwinFile.c @@ -51,7 +51,7 @@ int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t co return writeLen; } -int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t* offset, int64_t count) { +int64_t taosSendFile(SOCKET dfd, int32_t sfd, int64_t* offset, int64_t count) { lseek(sfd, (int32_t)(*offset), 0); int64_t writeLen = 0; uint8_t buffer[_SEND_FILE_STEP_] = { 0 }; diff --git a/src/os/src/darwin/darwinSocket.c b/src/os/src/darwin/darwinSocket.c index 9137d4b125..69a4666d34 100644 --- a/src/os/src/darwin/darwinSocket.c +++ b/src/os/src/darwin/darwinSocket.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" -int taosSetSockOpt(int32_t socketfd, int level, int optname, void *optval, int optlen) { +int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int optlen) { if (level == SOL_SOCKET && optname == SO_SNDBUF) { return 0; } diff --git a/src/os/src/detail/osFile.c b/src/os/src/detail/osFile.c index 9560ce59bb..bb68622731 100644 --- a/src/os/src/detail/osFile.c +++ b/src/os/src/detail/osFile.c @@ -121,7 +121,7 @@ int64_t taosLSeekImp(int32_t fd, int64_t offset, int32_t whence) { #ifndef TAOS_OS_FUNC_FILE_SENDIFLE -int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t size) { +int64_t taosSendFile(SOCKET dfd, int32_t sfd, int64_t *offset, int64_t size) { int64_t leftbytes = size; int64_t sentbytes; diff --git a/src/os/src/detail/osSocket.c b/src/os/src/detail/osSocket.c index f929b3e717..d03e8086bf 100644 --- a/src/os/src/detail/osSocket.c +++ b/src/os/src/detail/osSocket.c @@ -19,7 +19,7 @@ #ifndef TAOS_OS_FUNC_SOCKET -int32_t taosSetNonblocking(int32_t sock, int32_t on) { +int32_t taosSetNonblocking(SOCKET sock, int32_t on) { int32_t flags = 0; if ((flags = fcntl(sock, F_GETFL, 0)) < 0) { uError("fcntl(F_GETFL) error: %d (%s)\n", errno, strerror(errno)); @@ -67,7 +67,7 @@ void taosSetMaskSIGPIPE() { #ifndef TAOS_OS_FUNC_SOCKET_SETSOCKETOPT -int32_t taosSetSockOpt(int32_t socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) { +int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) { return setsockopt(socketfd, level, optname, optval, (socklen_t)optlen); } diff --git a/src/os/src/linux/linuxEnv.c b/src/os/src/linux/linuxEnv.c index abcdabcfd5..e3eadbc94b 100644 --- a/src/os/src/linux/linuxEnv.c +++ b/src/os/src/linux/linuxEnv.c @@ -43,7 +43,6 @@ void osInit() { char cmdline[1024]; char* taosGetCmdlineByPID(int pid) { -#if 0 sprintf(cmdline, "/proc/%d/cmdline", pid); FILE* f = fopen(cmdline, "r"); if (f) { @@ -55,7 +54,4 @@ char* taosGetCmdlineByPID(int pid) { fclose(f); } return cmdline; -#else - return ""; -#endif } diff --git a/src/os/src/windows/wFile.c b/src/os/src/windows/wFile.c index e388e5fc67..4ad195dc6f 100644 --- a/src/os/src/windows/wFile.c +++ b/src/os/src/windows/wFile.c @@ -76,7 +76,7 @@ int64_t taosFSendFile(FILE *out_file, FILE *in_file, int64_t *offset, int64_t co return writeLen; } -int64_t taosSendFile(int32_t dfd, int32_t sfd, int64_t *offset, int64_t count) { +int64_t taosSendFile(SOCKET dfd, int32_t sfd, int64_t *offset, int64_t count) { if (offset != NULL) lseek(sfd, (int32_t)(*offset), 0); int64_t writeLen = 0; diff --git a/src/os/src/windows/wSocket.c b/src/os/src/windows/wSocket.c index 679842a7e9..4e6ee15e77 100644 --- a/src/os/src/windows/wSocket.c +++ b/src/os/src/windows/wSocket.c @@ -34,7 +34,7 @@ void taosWinSocketInit() { } } -int32_t taosSetNonblocking(int32_t sock, int32_t on) { +int32_t taosSetNonblocking(SOCKET sock, int32_t on) { u_long mode; if (on) { mode = 1; @@ -50,7 +50,7 @@ void taosIgnSIGPIPE() {} void taosBlockSIGPIPE() {} void taosSetMaskSIGPIPE() {} -int32_t taosSetSockOpt(int32_t socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) { +int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) { if (level == SOL_SOCKET && optname == TCP_KEEPCNT) { return 0; } diff --git a/src/os/src/windows/wSysinfo.c b/src/os/src/windows/wSysinfo.c index 9a79f5a6bf..082aaaf5d8 100644 --- a/src/os/src/windows/wSysinfo.c +++ b/src/os/src/windows/wSysinfo.c @@ -66,7 +66,7 @@ bool taosGetSysMemory(float *memoryUsedMB) { bool taosGetProcMemory(float *memoryUsedMB) { unsigned bytes_used = 0; -#if defined(_WIN32) && defined(_MSC_VER) +#if defined(_WIN64) && defined(_MSC_VER) PROCESS_MEMORY_COUNTERS pmc; HANDLE cur_proc = GetCurrentProcess(); diff --git a/src/plugins/http/inc/httpContext.h b/src/plugins/http/inc/httpContext.h index 260858c5cc..b016da2dd3 100644 --- a/src/plugins/http/inc/httpContext.h +++ b/src/plugins/http/inc/httpContext.h @@ -22,7 +22,7 @@ bool httpInitContexts(); void httpCleanupContexts(); const char *httpContextStateStr(HttpContextState state); -HttpContext *httpCreateContext(int32_t fd); +HttpContext *httpCreateContext(SOCKET fd); bool httpInitContext(HttpContext *pContext); HttpContext *httpGetContext(void * pContext); void httpReleaseContext(HttpContext *pContext, bool clearRes); diff --git a/src/plugins/http/inc/httpInt.h b/src/plugins/http/inc/httpInt.h index 73e9d30924..634468f3cc 100644 --- a/src/plugins/http/inc/httpInt.h +++ b/src/plugins/http/inc/httpInt.h @@ -16,6 +16,7 @@ #ifndef TDENGINE_HTTP_INT_H #define TDENGINE_HTTP_INT_H +#include "os.h" #include #include "pthread.h" #include "semaphore.h" @@ -140,7 +141,7 @@ typedef enum { typedef struct HttpContext { int32_t refCount; - int32_t fd; + SOCKET fd; uint32_t accessTimes; uint32_t lastAccessTime; int32_t state; @@ -167,7 +168,7 @@ typedef struct HttpThread { HttpContext * pHead; pthread_mutex_t threadMutex; bool stop; - int32_t pollFd; + EpollFd pollFd; int32_t numOfContexts; int32_t threadId; char label[HTTP_LABEL_SIZE]; @@ -180,7 +181,7 @@ typedef struct HttpServer { uint16_t serverPort; int8_t stop; int8_t reserve; - int32_t fd; + SOCKET fd; int32_t numOfThreads; int32_t methodScannerLen; int32_t requestNum; diff --git a/src/plugins/http/src/httpContext.c b/src/plugins/http/src/httpContext.c index db77c479c0..f71a84a5af 100644 --- a/src/plugins/http/src/httpContext.c +++ b/src/plugins/http/src/httpContext.c @@ -35,14 +35,18 @@ static void httpRemoveContextFromEpoll(HttpContext *pContext) { HttpThread *pThread = pContext->pThread; if (pContext->fd >= 0) { epoll_ctl(pThread->pollFd, EPOLL_CTL_DEL, pContext->fd, NULL); - int32_t fd = atomic_val_compare_exchange_32(&pContext->fd, pContext->fd, -1); +#ifdef WINDOWS + SOCKET fd = atomic_val_compare_exchange_32(&pContext->fd, pContext->fd, -1); +#else + SOCKET fd = atomic_val_compare_exchange_64(&pContext->fd, pContext->fd, -1); +#endif taosCloseSocket(fd); } } static void httpDestroyContext(void *data) { HttpContext *pContext = *(HttpContext **)data; - if (pContext->fd > 0) taosClose(pContext->fd); + if (pContext->fd > 0) taosCloseSocket(pContext->fd); HttpThread *pThread = pContext->pThread; httpRemoveContextFromEpoll(pContext); @@ -106,7 +110,7 @@ bool httpAlterContextState(HttpContext *pContext, HttpContextState srcState, Htt return (atomic_val_compare_exchange_32(&pContext->state, srcState, destState) == srcState); } -HttpContext *httpCreateContext(int32_t fd) { +HttpContext *httpCreateContext(SOCKET fd) { HttpContext *pContext = calloc(1, sizeof(HttpContext)); if (pContext == NULL) return NULL; diff --git a/src/plugins/http/src/httpServer.c b/src/plugins/http/src/httpServer.c index c4a03d44ea..932f3e3808 100644 --- a/src/plugins/http/src/httpServer.c +++ b/src/plugins/http/src/httpServer.c @@ -50,7 +50,7 @@ static void httpStopThread(HttpThread *pThread) { taosCloseSocket(fd); } - taosCloseSocket(pThread->pollFd); + EpollClose(pThread->pollFd); pthread_mutex_destroy(&(pThread->threadMutex)); } @@ -152,7 +152,7 @@ static void httpProcessHttpData(void *param) { } static void *httpAcceptHttpConnection(void *arg) { - int32_t connFd = -1; + SOCKET connFd = -1; struct sockaddr_in clientAddr; int32_t threadId = 0; HttpServer * pServer = &tsHttpServer; @@ -175,7 +175,7 @@ static void *httpAcceptHttpConnection(void *arg) { while (1) { socklen_t addrlen = sizeof(clientAddr); - connFd = (int32_t)accept(pServer->fd, (struct sockaddr *)&clientAddr, &addrlen); + connFd = accept(pServer->fd, (struct sockaddr *)&clientAddr, &addrlen); if (pServer->stop) { httpDebug("http server:%s socket stop, exiting...", pServer->label); break; @@ -227,7 +227,7 @@ static void *httpAcceptHttpConnection(void *arg) { if (epoll_ctl(pThread->pollFd, EPOLL_CTL_ADD, connFd, &event) < 0) { httpError("context:%p, fd:%d, ip:%s, thread:%s, failed to add http fd for epoll, error:%s", pContext, connFd, pContext->ipstr, pThread->label, strerror(errno)); - taosClose(pContext->fd); + taosCloseSocket(pContext->fd); httpReleaseContext(pContext, true); continue; } @@ -265,8 +265,8 @@ bool httpInitConnect() { return false; } - pThread->pollFd = (int32_t)epoll_create(HTTP_MAX_EVENTS); // size does not matter - if (pThread->pollFd < 0) { + pThread->pollFd = (EpollFd)epoll_create(HTTP_MAX_EVENTS); // size does not matter + if (pThread->pollFd <= 0) { httpError("http thread:%s, failed to create HTTP epoll", pThread->label); pthread_mutex_destroy(&(pThread->threadMutex)); return false; diff --git a/src/rpc/src/rpcTcp.c b/src/rpc/src/rpcTcp.c index 1d2dbe281e..acd81e94a4 100644 --- a/src/rpc/src/rpcTcp.c +++ b/src/rpc/src/rpcTcp.c @@ -21,21 +21,14 @@ #include "rpcLog.h" #include "rpcHead.h" #include "rpcTcp.h" -#ifdef WINDOWS -#include "wepoll.h" -#endif - -#ifndef EPOLLWAKEUP - #define EPOLLWAKEUP (1u << 29) -#endif typedef struct SFdObj { void *signature; - int32_t fd; // TCP socket FD - int closedByApp; // 1: already closed by App + SOCKET fd; // TCP socket FD void *thandle; // handle from upper layer, like TAOS uint32_t ip; uint16_t port; + int16_t closedByApp; // 1: already closed by App struct SThreadObj *pThreadObj; struct SFdObj *prev; struct SFdObj *next; @@ -47,7 +40,7 @@ typedef struct SThreadObj { pthread_mutex_t mutex; uint32_t ip; bool stop; - int32_t pollFd; + EpollFd pollFd; int numOfFds; int threadId; char label[TSDB_LABEL_LEN]; @@ -56,7 +49,7 @@ typedef struct SThreadObj { } SThreadObj; typedef struct { - int32_t fd; + SOCKET fd; uint32_t ip; uint16_t port; int8_t stop; @@ -69,7 +62,7 @@ typedef struct { } SServerObj; static void *taosProcessTcpData(void *param); -static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int32_t fd); +static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd); static void taosFreeFdObj(SFdObj *pFdObj); static void taosReportBrokenLink(SFdObj *pFdObj); static void *taosAcceptTcpConnection(void *arg); @@ -134,7 +127,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread break; } - pThreadObj->pollFd = (int32_t)epoll_create(10); // size does not matter + pThreadObj->pollFd = (EpollFd)epoll_create(10); // size does not matter if (pThreadObj->pollFd < 0) { tError("%s failed to create TCP epoll", label); code = -1; @@ -227,7 +220,7 @@ void taosCleanUpTcpServer(void *handle) { } static void *taosAcceptTcpConnection(void *arg) { - int32_t connFd = -1; + SOCKET connFd = -1; struct sockaddr_in caddr; int threadId = 0; SThreadObj *pThreadObj; @@ -238,7 +231,7 @@ static void *taosAcceptTcpConnection(void *arg) { while (1) { socklen_t addrlen = sizeof(caddr); - connFd = (int32_t)accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen); + connFd = accept(pServerObj->fd, (struct sockaddr *)&caddr, &addrlen); if (pServerObj->stop) { tDebug("%s TCP server stop accepting new connections", pServerObj->label); break; @@ -306,7 +299,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void * return NULL; } - pThreadObj->pollFd = (int32_t)epoll_create(10); // size does not matter + pThreadObj->pollFd = (EpollFd)epoll_create(10); // size does not matter if (pThreadObj->pollFd < 0) { tError("%s failed to create TCP client epoll", label); free(pThreadObj); @@ -321,7 +314,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void * int code = pthread_create(&(pThreadObj->thread), &thattr, taosProcessTcpData, (void *)(pThreadObj)); pthread_attr_destroy(&thattr); if (code != 0) { - taosCloseSocket(pThreadObj->pollFd); + EpollClose(pThreadObj->pollFd); free(pThreadObj); terrno = TAOS_SYSTEM_ERROR(errno); tError("%s failed to create TCP read data thread(%s)", label, strerror(errno)); @@ -351,8 +344,8 @@ void taosCleanUpTcpClient(void *chandle) { void *taosOpenTcpClientConnection(void *shandle, void *thandle, uint32_t ip, uint16_t port) { SThreadObj * pThreadObj = shandle; - int32_t fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); - if (fd < 0) return NULL; + SOCKET fd = taosOpenTcpClientSocket(ip, port, pThreadObj->ip); + if (fd <= 0) return NULL; struct sockaddr_in sin; uint16_t localPort = 0; @@ -526,7 +519,7 @@ static void *taosProcessTcpData(void *param) { if (pThreadObj->stop) break; } - if (pThreadObj->pollFd >=0) taosCloseSocket(pThreadObj->pollFd); + if (pThreadObj->pollFd >=0) EpollClose(pThreadObj->pollFd); while (pThreadObj->pHead) { SFdObj *pFdObj = pThreadObj->pHead; @@ -541,7 +534,7 @@ static void *taosProcessTcpData(void *param) { return NULL; } -static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, int32_t fd) { +static SFdObj *taosMallocFdObj(SThreadObj *pThreadObj, SOCKET fd) { struct epoll_event event; SFdObj *pFdObj = (SFdObj *)calloc(sizeof(SFdObj), 1); diff --git a/src/rpc/src/rpcUdp.c b/src/rpc/src/rpcUdp.c index 862d9b1f5c..2599bca075 100644 --- a/src/rpc/src/rpcUdp.c +++ b/src/rpc/src/rpcUdp.c @@ -31,7 +31,7 @@ typedef struct { int index; - int32_t fd; + SOCKET fd; uint16_t port; // peer port uint16_t localPort; // local port char label[TSDB_LABEL_LEN]; // copy from udpConnSet; diff --git a/src/sync/inc/syncInt.h b/src/sync/inc/syncInt.h index eef687d647..e43140d4e6 100644 --- a/src/sync/inc/syncInt.h +++ b/src/sync/inc/syncInt.h @@ -82,8 +82,8 @@ typedef struct SsyncPeer { uint64_t sversion; // track the peer version in retrieve process uint64_t lastFileVer; // track the file version while retrieve uint64_t lastWalVer; // track the wal version while retrieve - int32_t syncFd; - int32_t peerFd; // forward FD + SOCKET syncFd; + SOCKET peerFd; // forward FD int32_t numOfRetrieves; // number of retrieves tried int32_t fileChanged; // a flag to indicate file is changed during retrieving process int32_t refCount; diff --git a/src/sync/inc/syncTcp.h b/src/sync/inc/syncTcp.h index d4674fee6b..b322c3440c 100644 --- a/src/sync/inc/syncTcp.h +++ b/src/sync/inc/syncTcp.h @@ -27,12 +27,12 @@ typedef struct { int32_t bufferSize; void (*processBrokenLink)(int64_t handleId); int32_t (*processIncomingMsg)(int64_t handleId, void *buffer); - void (*processIncomingConn)(int32_t fd, uint32_t ip); + void (*processIncomingConn)(SOCKET fd, uint32_t ip); } SPoolInfo; void *syncOpenTcpThreadPool(SPoolInfo *pInfo); void syncCloseTcpThreadPool(void *); -void *syncAllocateTcpConn(void *, int64_t rid, int32_t connFd); +void *syncAllocateTcpConn(void *, int64_t rid, SOCKET connFd); void syncFreeTcpConn(void *); #ifdef __cplusplus diff --git a/src/sync/src/syncArbitrator.c b/src/sync/src/syncArbitrator.c index 24760ce1a0..9fb6b0ddb7 100644 --- a/src/sync/src/syncArbitrator.c +++ b/src/sync/src/syncArbitrator.c @@ -28,7 +28,7 @@ #include "syncTcp.h" static void arbSignalHandler(int32_t signum, void *sigInfo, void *context); -static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); +static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp); static void arbProcessBrokenLink(int64_t rid); static int32_t arbProcessPeerMsg(int64_t rid, void *buffer); static tsem_t tsArbSem; @@ -36,7 +36,7 @@ static void * tsArbTcpPool; typedef struct { char id[TSDB_EP_LEN + 24]; - int32_t nodeFd; + SOCKET nodeFd; void * pConn; } SNodeConn; @@ -106,7 +106,7 @@ int32_t main(int32_t argc, char *argv[]) { return 0; } -static void arbProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { +static void arbProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { char ipstr[24]; tinet_ntoa(ipstr, sourceIp); sDebug("peer TCP connection from ip:%s", ipstr); diff --git a/src/sync/src/syncMain.c b/src/sync/src/syncMain.c index 606809e82b..d698432176 100644 --- a/src/sync/src/syncMain.c +++ b/src/sync/src/syncMain.c @@ -45,7 +45,7 @@ static void syncCheckPeerConnection(void *param, void *tmrId); static int32_t syncSendPeersStatusMsgToPeer(SSyncPeer *pPeer, char ack, int8_t type, uint16_t tranId); static void syncProcessBrokenLink(int64_t rid); static int32_t syncProcessPeerMsg(int64_t rid, void *buffer); -static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp); +static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp); static void syncRemovePeer(SSyncPeer *pPeer); static void syncAddArbitrator(SSyncNode *pNode); static void syncFreeNode(void *); @@ -1114,8 +1114,8 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) { return; } - int32_t connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); - if ((int32_t)connFd < 0) { + SOCKET connFd = taosOpenTcpClientSocket(pPeer->ip, pPeer->port, 0); + if (connFd <= 0) { sDebug("%s, failed to open tcp socket since %s", pPeer->id, strerror(errno)); taosTmrReset(syncCheckPeerConnection, SYNC_CHECK_INTERVAL, (void *)pPeer->rid, tsSyncTmrCtrl, &pPeer->timer); return; @@ -1179,7 +1179,7 @@ static void syncCreateRestoreDataThread(SSyncPeer *pPeer) { } } -static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) { +static void syncProcessIncommingConnection(SOCKET connFd, uint32_t sourceIp) { char ipstr[24]; int32_t i; diff --git a/src/sync/src/syncTcp.c b/src/sync/src/syncTcp.c index 1210caa84c..50c52623f7 100644 --- a/src/sync/src/syncTcp.c +++ b/src/sync/src/syncTcp.c @@ -24,18 +24,10 @@ #include "syncInt.h" #include "syncTcp.h" -#ifdef WINDOWS -#include "wepoll.h" -#endif - -#ifndef EPOLLWAKEUP - #define EPOLLWAKEUP (1u << 29) -#endif - typedef struct SThreadObj { pthread_t thread; bool stop; - int32_t pollFd; + SOCKET pollFd; int32_t numOfFds; struct SPoolObj *pPool; } SThreadObj; @@ -45,14 +37,14 @@ typedef struct SPoolObj { SThreadObj **pThread; pthread_t thread; int32_t nextId; - int32_t acceptFd; // FD for accept new connection + SOCKET acceptFd; // FD for accept new connection int8_t stop; } SPoolObj; typedef struct { SThreadObj *pThread; int64_t handleId; - int32_t fd; + SOCKET fd; int32_t closedByApp; } SConnObj; @@ -128,7 +120,7 @@ void syncCloseTcpThreadPool(void *param) { tfree(pPool); } -void *syncAllocateTcpConn(void *param, int64_t rid, int32_t connFd) { +void *syncAllocateTcpConn(void *param, int64_t rid, SOCKET connFd) { struct epoll_event event; SPoolObj *pPool = param; @@ -249,7 +241,7 @@ static void *syncProcessTcpData(void *param) { sDebug("%p TCP epoll thread exits", pThread); - taosCloseSocket(pThread->pollFd); + EpollClose(pThread->pollFd); tfree(pThread); tfree(buffer); return NULL; @@ -264,13 +256,13 @@ static void *syncAcceptPeerTcpConnection(void *argv) { while (1) { struct sockaddr_in clientAddr; socklen_t addrlen = sizeof(clientAddr); - int32_t connFd = (int32_t)accept(pPool->acceptFd, (struct sockaddr *)&clientAddr, &addrlen); + SOCKET connFd = accept(pPool->acceptFd, (struct sockaddr *)&clientAddr, &addrlen); if (pPool->stop) { sDebug("%p TCP server accept is stopped", pPool); break; } - if ((int32_t)connFd < 0) { + if (connFd < 0) { if (errno == EINVAL) { sDebug("%p TCP server accept is exiting...", pPool); break; @@ -298,7 +290,7 @@ static SThreadObj *syncGetTcpThread(SPoolObj *pPool) { if (pThread == NULL) return NULL; pThread->pPool = pPool; - pThread->pollFd = (int32_t)epoll_create(10); // size does not matter + pThread->pollFd = (EpollFd)epoll_create(10); // size does not matter if (pThread->pollFd < 0) { tfree(pThread); return NULL; diff --git a/src/util/inc/tsocket.h b/src/util/inc/tsocket.h index fc0d71c4d8..35b591b61e 100644 --- a/src/util/inc/tsocket.h +++ b/src/util/inc/tsocket.h @@ -24,21 +24,17 @@ extern "C" { #include "wepoll.h" #endif -#ifndef EPOLLWAKEUP - #define EPOLLWAKEUP (1u << 29) -#endif - -int32_t taosReadn(int32_t sock, char *buffer, int32_t len); -int32_t taosWriteMsg(int32_t fd, void *ptr, int32_t nbytes); -int32_t taosReadMsg(int32_t fd, void *ptr, int32_t nbytes); -int32_t taosNonblockwrite(int32_t fd, char *ptr, int32_t nbytes); -int32_t taosCopyFds(int32_t sfd, int32_t dfd, int64_t len); -int32_t taosSetNonblocking(int32_t sock, int32_t on); - -int32_t taosOpenUdpSocket(uint32_t localIp, uint16_t localPort); -int32_t taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp); -int32_t taosOpenTcpServerSocket(uint32_t ip, uint16_t port); -int32_t taosKeepTcpAlive(int32_t sockFd); +int32_t taosReadn(SOCKET sock, char *buffer, int32_t len); +int32_t taosWriteMsg(SOCKET fd, void *ptr, int32_t nbytes); +int32_t taosReadMsg(SOCKET fd, void *ptr, int32_t nbytes); +int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes); +int32_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len); +int32_t taosSetNonblocking(SOCKET sock, int32_t on); + +SOCKET taosOpenUdpSocket(uint32_t localIp, uint16_t localPort); +SOCKET taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp); +SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port); +int32_t taosKeepTcpAlive(SOCKET sockFd); int32_t taosGetFqdn(char *); uint32_t taosGetIpv4FromFqdn(const char *); diff --git a/src/util/src/tnettest.c b/src/util/src/tnettest.c index 70c38d36dc..fe6dfb493d 100644 --- a/src/util/src/tnettest.c +++ b/src/util/src/tnettest.c @@ -39,7 +39,7 @@ typedef struct { static void *taosNetBindUdpPort(void *sarg) { STestInfo *pinfo = (STestInfo *)sarg; int32_t port = pinfo->port; - int32_t serverSocket; + SOCKET serverSocket; char buffer[BUFFER_SIZE]; int32_t iDataNum; socklen_t sin_size; @@ -48,7 +48,7 @@ static void *taosNetBindUdpPort(void *sarg) { struct sockaddr_in server_addr; struct sockaddr_in clientAddr; - if ((serverSocket = (int32_t)socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { + if ((serverSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { uError("failed to create UDP socket since %s", strerror(errno)); return NULL; } @@ -106,12 +106,12 @@ static void *taosNetBindTcpPort(void *sarg) { STestInfo *pinfo = sarg; int32_t port = pinfo->port; - int32_t serverSocket; + SOCKET serverSocket; int32_t addr_len = sizeof(clientAddr); - int32_t client; + SOCKET client; char buffer[BUFFER_SIZE]; - if ((serverSocket = (int32_t)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { + if ((serverSocket = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) < 0) { uError("failed to create TCP socket since %s", strerror(errno)); return NULL; } @@ -133,7 +133,6 @@ static void *taosNetBindTcpPort(void *sarg) { return NULL; } - if (taosKeepTcpAlive(serverSocket) < 0) { uError("failed to set tcp server keep-alive option since %s", strerror(errno)); taosCloseSocket(serverSocket); @@ -148,7 +147,7 @@ static void *taosNetBindTcpPort(void *sarg) { uInfo("TCP server at port:%d is listening", port); while (1) { - client = (int32_t)accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len); + client = accept(serverSocket, (struct sockaddr *)&clientAddr, (socklen_t *)&addr_len); if (client < 0) { uDebug("TCP: failed to accept at port:%d since %s", port, strerror(errno)); continue; @@ -178,10 +177,10 @@ static void *taosNetBindTcpPort(void *sarg) { } static int32_t taosNetCheckTcpPort(STestInfo *info) { - int32_t clientSocket; + SOCKET clientSocket; char buffer[BUFFER_SIZE] = {0}; - if ((clientSocket = (int32_t)socket(AF_INET, SOCK_STREAM, 0)) < 0) { + if ((clientSocket = socket(AF_INET, SOCK_STREAM, 0)) < 0) { uError("failed to create TCP client socket since %s", strerror(errno)); return -1; } @@ -226,14 +225,14 @@ static int32_t taosNetCheckTcpPort(STestInfo *info) { } static int32_t taosNetCheckUdpPort(STestInfo *info) { - int32_t clientSocket; + SOCKET clientSocket; char buffer[BUFFER_SIZE] = {0}; int32_t iDataNum = 0; int32_t bufSize = 1024000; struct sockaddr_in serverAddr; - if ((clientSocket = (int32_t)socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { + if ((clientSocket = socket(AF_INET, SOCK_DGRAM, IPPROTO_UDP)) < 0) { uError("failed to create udp client socket since %s", strerror(errno)); return -1; } diff --git a/src/util/src/tsocket.c b/src/util/src/tsocket.c index d41c1518b8..3636698162 100644 --- a/src/util/src/tsocket.c +++ b/src/util/src/tsocket.c @@ -102,7 +102,7 @@ uint32_t ip2uint(const char *const ip_addr) { return *((uint32_t *)ip); } -int32_t taosWriteMsg(int32_t fd, void *buf, int32_t nbytes) { +int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) { int32_t nleft, nwritten; char * ptr = (char *)buf; @@ -128,7 +128,7 @@ int32_t taosWriteMsg(int32_t fd, void *buf, int32_t nbytes) { return (nbytes - nleft); } -int32_t taosReadMsg(int32_t fd, void *buf, int32_t nbytes) { +int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) { int32_t nleft, nread; char * ptr = (char *)buf; @@ -159,7 +159,7 @@ int32_t taosReadMsg(int32_t fd, void *buf, int32_t nbytes) { return (nbytes - nleft); } -int32_t taosNonblockwrite(int32_t fd, char *ptr, int32_t nbytes) { +int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes) { taosSetNonblocking(fd, 1); int32_t nleft, nwritten, nready; @@ -201,7 +201,7 @@ int32_t taosNonblockwrite(int32_t fd, char *ptr, int32_t nbytes) { return (nbytes - nleft); } -int32_t taosReadn(int32_t fd, char *ptr, int32_t nbytes) { +int32_t taosReadn(SOCKET fd, char *ptr, int32_t nbytes) { int32_t nread, nready, nleft = nbytes; fd_set fset; @@ -239,9 +239,9 @@ int32_t taosReadn(int32_t fd, char *ptr, int32_t nbytes) { return (nbytes - nleft); } -int32_t taosOpenUdpSocket(uint32_t ip, uint16_t port) { +SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) { struct sockaddr_in localAddr; - int32_t sockFd; + SOCKET sockFd; int32_t bufSize = 1024000; uDebug("open udp socket:0x%x:%hu", ip, port); @@ -251,7 +251,7 @@ int32_t taosOpenUdpSocket(uint32_t ip, uint16_t port) { localAddr.sin_addr.s_addr = ip; localAddr.sin_port = (uint16_t)htons(port); - if ((sockFd = (int32_t)socket(AF_INET, SOCK_DGRAM, 0)) <= 2) { + if ((sockFd = socket(AF_INET, SOCK_DGRAM, 0)) <= 2) { uError("failed to open udp socket: %d (%s)", errno, strerror(errno)); taosCloseSocketNoCheck(sockFd); return -1; @@ -279,13 +279,13 @@ int32_t taosOpenUdpSocket(uint32_t ip, uint16_t port) { return sockFd; } -int32_t taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) { - int32_t sockFd = 0; +SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) { + SOCKET sockFd = 0; int32_t ret; struct sockaddr_in serverAddr, clientAddr; int32_t bufSize = 1024 * 1024; - sockFd = (int32_t)socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + sockFd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); if (sockFd <= 2) { uError("failed to open the socket: %d (%s)", errno, strerror(errno)); @@ -346,7 +346,7 @@ int32_t taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t cli return sockFd; } -int32_t taosKeepTcpAlive(int32_t sockFd) { +int32_t taosKeepTcpAlive(SOCKET sockFd) { int32_t alive = 1; if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) { uError("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno)); @@ -394,9 +394,9 @@ int32_t taosKeepTcpAlive(int32_t sockFd) { return 0; } -int32_t taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { +SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { struct sockaddr_in serverAdd; - int32_t sockFd; + SOCKET sockFd; int32_t reuse; uDebug("open tcp server socket:0x%x:%hu", ip, port); @@ -406,7 +406,7 @@ int32_t taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { serverAdd.sin_addr.s_addr = ip; serverAdd.sin_port = (uint16_t)htons(port); - if ((sockFd = (int32_t)socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) { + if ((sockFd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) { uError("failed to open TCP socket: %d (%s)", errno, strerror(errno)); taosCloseSocketNoCheck(sockFd); return -1; @@ -449,7 +449,7 @@ void tinet_ntoa(char *ipstr, uint32_t ip) { #define COPY_SIZE 32768 // sendfile shall be used -int32_t taosCopyFds(int32_t sfd, int32_t dfd, int64_t len) { +int32_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) { int64_t leftLen; int32_t readLen, writeLen; char temp[COPY_SIZE]; -- GitLab