From 2c12354c2c064d8bbc551c72a260fe11ac5026c8 Mon Sep 17 00:00:00 2001 From: afwerar <1296468573@qq.com> Date: Mon, 7 Mar 2022 02:07:23 +0800 Subject: [PATCH] [TD-13760]: libuv replace socket. --- include/os/osFile.h | 1 - include/os/osSocket.h | 15 +++++- source/libs/transport/inc/transComm.h | 2 - source/libs/transport/inc/transportInt.h | 2 - source/libs/transport/src/rpcTcp.c | 2 + source/libs/transport/src/rpcUdp.c | 3 ++ source/libs/transport/src/transSrv.c | 2 +- source/os/src/osFile.c | 45 +++++++++++------ source/os/src/osSocket.c | 46 +++++++++++++---- source/util/CMakeLists.txt | 6 +++ source/util/src/thttp.c | 63 ++++++++++++++++++++++++ 11 files changed, 156 insertions(+), 31 deletions(-) diff --git a/include/os/osFile.h b/include/os/osFile.h index 6ddf1e33c6..703ba196ef 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -87,7 +87,6 @@ int32_t taosRemoveFile(const char *path); void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath); -int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size); int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size); void *taosMmapReadOnlyFile(TdFilePtr pFile, int64_t length); diff --git a/include/os/osSocket.h b/include/os/osSocket.h index 395874a88c..cbecb380e2 100644 --- a/include/os/osSocket.h +++ b/include/os/osSocket.h @@ -16,6 +16,14 @@ #ifndef _TD_OS_SOCKET_H_ #define _TD_OS_SOCKET_H_ +// If the error is in a third-party library, place this header file under the third-party library header file. +#ifndef ALLOW_FORBID_FUNC + #define socket SOCKET_FUNC_TAOS_FORBID + #define bind BIND_FUNC_TAOS_FORBID + #define listen LISTEN_FUNC_TAOS_FORBID + // #define accept ACCEPT_FUNC_TAOS_FORBID +#endif + #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #include "winsock2.h" #include @@ -30,6 +38,8 @@ extern "C" { #endif +#ifndef USE_UV + #define TAOS_EPOLL_WAIT_TIME 500 typedef int32_t SOCKET; typedef SOCKET EpollFd; @@ -50,7 +60,6 @@ void taosShutDownSocketRD(SOCKET fd); void taosShutDownSocketWR(SOCKET fd); int32_t taosSetNonblocking(SOCKET sock, int32_t on); void taosIgnSIGPIPE(); -void taosBlockSIGPIPE(); void taosSetMaskSIGPIPE(); int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen); int32_t taosGetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t *optlen); @@ -86,6 +95,10 @@ uint32_t taosGetIpv4FromFqdn(const char *); void tinet_ntoa(char *ipstr, uint32_t ip); uint32_t ip2uint(const char *const ip_addr); +#endif + +void taosBlockSIGPIPE(); + #ifdef __cplusplus } #endif diff --git a/source/libs/transport/inc/transComm.h b/source/libs/transport/inc/transComm.h index d4d9bff56c..985d2f2f2f 100644 --- a/source/libs/transport/inc/transComm.h +++ b/source/libs/transport/inc/transComm.h @@ -20,8 +20,6 @@ #include "rpcCache.h" #include "rpcHead.h" #include "rpcLog.h" -#include "rpcTcp.h" -#include "rpcUdp.h" #include "taoserror.h" #include "tglobal.h" #include "thash.h" diff --git a/source/libs/transport/inc/transportInt.h b/source/libs/transport/inc/transportInt.h index d080db753d..73137487eb 100644 --- a/source/libs/transport/inc/transportInt.h +++ b/source/libs/transport/inc/transportInt.h @@ -24,8 +24,6 @@ #include "rpcCache.h" #include "rpcHead.h" #include "rpcLog.h" -#include "rpcTcp.h" -#include "rpcUdp.h" #include "taoserror.h" #include "tglobal.h" #include "thash.h" diff --git a/source/libs/transport/src/rpcTcp.c b/source/libs/transport/src/rpcTcp.c index 56dd8cce25..d95ac3d36d 100644 --- a/source/libs/transport/src/rpcTcp.c +++ b/source/libs/transport/src/rpcTcp.c @@ -21,6 +21,7 @@ #include "taoserror.h" #include "tutil.h" +#ifndef USE_UV typedef struct SFdObj { void * signature; SOCKET fd; // TCP socket FD @@ -659,3 +660,4 @@ static void taosFreeFdObj(SFdObj *pFdObj) { tfree(pFdObj); } +#endif \ No newline at end of file diff --git a/source/libs/transport/src/rpcUdp.c b/source/libs/transport/src/rpcUdp.c index b57cf57c55..3640414a4c 100644 --- a/source/libs/transport/src/rpcUdp.c +++ b/source/libs/transport/src/rpcUdp.c @@ -22,6 +22,8 @@ #include "ttimer.h" #include "tutil.h" +#ifndef USE_UV + #define RPC_MAX_UDP_CONNS 256 #define RPC_MAX_UDP_PKTS 1000 #define RPC_UDP_BUF_TIME 5 // mseconds @@ -257,3 +259,4 @@ int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *c return ret; } +#endif \ No newline at end of file diff --git a/source/libs/transport/src/transSrv.c b/source/libs/transport/src/transSrv.c index c7b6ca2a2c..ce78d83bdf 100644 --- a/source/libs/transport/src/transSrv.c +++ b/source/libs/transport/src/transSrv.c @@ -56,7 +56,7 @@ typedef struct SSrvMsg { typedef struct SWorkThrdObj { pthread_t thread; uv_pipe_t* pipe; - int fd; + uv_os_fd_t fd; uv_loop_t* loop; SAsyncPool* asyncPool; // uv_async_t* workerAsync; // diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 652e0b5182..acafbf6da8 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -667,17 +667,43 @@ int64_t taosSendFile(SocketFd dfd, FileFd sfd, int64_t *offset, int64_t count) { #else -int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size) { - if (pFileSrc == NULL) { +// int64_t taosSendFile(int fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size) { +// if (pFileSrc == NULL) { +// return 0; +// } +// assert(pFileSrc->fd >= 0); + +// int64_t leftbytes = size; +// int64_t sentbytes; + +// while (leftbytes > 0) { +// sentbytes = sendfile(fdDst, pFileSrc->fd, offset, leftbytes); +// if (sentbytes == -1) { +// if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { +// continue; +// } else { +// return -1; +// } +// } else if (sentbytes == 0) { +// return (int64_t)(size - leftbytes); +// } + +// leftbytes -= sentbytes; +// } + +// return size; +// } + +int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size) { + if (pFileOut == NULL || pFileIn == NULL) { return 0; } - assert(pFileSrc->fd >= 0); - + assert(pFileIn->fd >= 0 && pFileOut->fd >= 0); int64_t leftbytes = size; int64_t sentbytes; while (leftbytes > 0) { - sentbytes = sendfile(fdDst, pFileSrc->fd, offset, leftbytes); + sentbytes = sendfile(pFileOut->fd, pFileIn->fd, offset, leftbytes); if (sentbytes == -1) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { continue; @@ -694,15 +720,6 @@ int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_ return size; } -int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size) { - if (pFileOut == NULL || pFileIn == NULL) { - return 0; - } - assert(pFileOut->fd >= 0); - - return taosSendFile(pFileOut->fd, pFileIn, offset, size); -} - #endif void taosFprintfFile(TdFilePtr pFile, const char *format, ...) { diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index 07d30276b7..f27ad3a1e0 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -34,6 +34,24 @@ #include #endif +#ifndef USE_UV + +// typedef struct TdSocketServer { +// #if SOCKET_WITH_LOCK +// pthread_rwlock_t rwlock; +// #endif +// int refId; +// SocketFd fd; +// } * TdSocketServerPtr, TdSocketServer; + +// typedef struct TdSocketConnector { +// #if SOCKET_WITH_LOCK +// pthread_rwlock_t rwlock; +// #endif +// int refId; +// SocketFd fd; +// } * TdSocketConnectorPtr, TdSocketConnector; + #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #define taosSend(sockfd, buf, len, flags) send((SOCKET)sockfd, buf, len, flags) @@ -115,15 +133,6 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) { void taosIgnSIGPIPE() { signal(SIGPIPE, SIG_IGN); } -void taosBlockSIGPIPE() { - sigset_t signal_mask; - sigemptyset(&signal_mask); - sigaddset(&signal_mask, SIGPIPE); - int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); - if (rc != 0) { - //printf("failed to block SIGPIPE"); - } -} void taosSetMaskSIGPIPE() { sigset_t signal_mask; @@ -215,7 +224,6 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) { } void taosIgnSIGPIPE() {} -void taosBlockSIGPIPE() {} void taosSetMaskSIGPIPE() {} int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) { @@ -786,3 +794,21 @@ int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) { return len; } + +#endif + + + +#if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)) +void taosBlockSIGPIPE() { + sigset_t signal_mask; + sigemptyset(&signal_mask); + sigaddset(&signal_mask, SIGPIPE); + int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL); + if (rc != 0) { + //printf("failed to block SIGPIPE"); + } +} +#else +void taosBlockSIGPIPE() {} +#endif \ No newline at end of file diff --git a/source/util/CMakeLists.txt b/source/util/CMakeLists.txt index 7a47639e75..6effdff712 100644 --- a/source/util/CMakeLists.txt +++ b/source/util/CMakeLists.txt @@ -12,6 +12,12 @@ target_link_libraries( PUBLIC lz4_static PUBLIC api cjson ) +if(${BUILD_WITH_UV}) + target_link_libraries( + util + PUBLIC uv_a + ) +endif(${BUILD_TEST}) if(${BUILD_TEST}) ADD_SUBDIRECTORY(test) diff --git a/source/util/src/thttp.c b/source/util/src/thttp.c index 0737f67ed1..593f3c43c2 100644 --- a/source/util/src/thttp.c +++ b/source/util/src/thttp.c @@ -18,6 +18,67 @@ #include "taoserror.h" #include "tlog.h" +#ifdef USE_UV + +#include + +void clientConnCb(uv_connect_t* req, int status) { + if(status < 0) { + terrno = TAOS_SYSTEM_ERROR(status); + uError("Connection error %s\n",uv_strerror(status)); + return; + } + + // impl later + uv_buf_t* wb = req->data; + if (wb == NULL) { + uv_close((uv_handle_t *)req->handle,NULL); + } + uv_write_t write_req; + uv_write(&write_req, req->handle, wb, 2, NULL); + uv_close((uv_handle_t *)req->handle,NULL); +} + +int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen) { + uint32_t ipv4 = taosGetIpv4FromFqdn(server); + if (ipv4 == 0xffffffff) { + terrno = TAOS_SYSTEM_ERROR(errno); + uError("failed to get http server:%s ip since %s", server, terrstr()); + return -1; + // goto SEND_OVER; + } + char ipv4Buf[128]; + tinet_ntoa(ipv4Buf, ipv4); + + struct sockaddr_in dest; + uv_ip4_addr(ipv4Buf, port, &dest); + + uv_tcp_t socket_tcp; + uv_loop_t *loop = uv_default_loop(); + uv_tcp_init(loop, &socket_tcp); + uv_connect_t* connect = (uv_connect_t*)malloc(sizeof(uv_connect_t)); + + char header[4096] = {0}; + int32_t headLen = snprintf(header, sizeof(header), + "POST /report HTTP/1.1\n" + "Host: %s\n" + "Content-Type: application/json\n" + "Content-Length: %d\n\n", + server, contLen); + uv_buf_t wb[2]; + wb[0] = uv_buf_init((char*)header, headLen); + wb[1] = uv_buf_init((char*)pCont, contLen); + + connect->data = wb; + uv_tcp_connect(connect, &socket_tcp, (const struct sockaddr*)&dest, clientConnCb); + terrno = 0; + uv_run(loop,UV_RUN_DEFAULT); + uv_loop_close(loop); + free(connect); + return terrno; +} + +#else int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen) { int32_t code = -1; SOCKET fd = 0; @@ -73,3 +134,5 @@ SEND_OVER: return code; } + +#endif \ No newline at end of file -- GitLab