提交 2c12354c 编写于 作者: wafwerar's avatar wafwerar

[TD-13760]<fix>: libuv replace socket.

上级 b64bfa97
......@@ -87,7 +87,6 @@ int32_t taosRemoveFile(const char *path);
void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath);
int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size);
int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size);
void *taosMmapReadOnlyFile(TdFilePtr pFile, int64_t length);
......
......@@ -16,6 +16,14 @@
#ifndef _TD_OS_SOCKET_H_
#define _TD_OS_SOCKET_H_
// If the error is in a third-party library, place this header file under the third-party library header file.
#ifndef ALLOW_FORBID_FUNC
#define socket SOCKET_FUNC_TAOS_FORBID
#define bind BIND_FUNC_TAOS_FORBID
#define listen LISTEN_FUNC_TAOS_FORBID
// #define accept ACCEPT_FUNC_TAOS_FORBID
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#include "winsock2.h"
#include <WS2tcpip.h>
......@@ -30,6 +38,8 @@
extern "C" {
#endif
#ifndef USE_UV
#define TAOS_EPOLL_WAIT_TIME 500
typedef int32_t SOCKET;
typedef SOCKET EpollFd;
......@@ -50,7 +60,6 @@ void taosShutDownSocketRD(SOCKET fd);
void taosShutDownSocketWR(SOCKET fd);
int32_t taosSetNonblocking(SOCKET sock, int32_t on);
void taosIgnSIGPIPE();
void taosBlockSIGPIPE();
void taosSetMaskSIGPIPE();
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen);
int32_t taosGetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t *optlen);
......@@ -86,6 +95,10 @@ uint32_t taosGetIpv4FromFqdn(const char *);
void tinet_ntoa(char *ipstr, uint32_t ip);
uint32_t ip2uint(const char *const ip_addr);
#endif
void taosBlockSIGPIPE();
#ifdef __cplusplus
}
#endif
......
......@@ -20,8 +20,6 @@
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
......
......@@ -24,8 +24,6 @@
#include "rpcCache.h"
#include "rpcHead.h"
#include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h"
#include "tglobal.h"
#include "thash.h"
......
......@@ -21,6 +21,7 @@
#include "taoserror.h"
#include "tutil.h"
#ifndef USE_UV
typedef struct SFdObj {
void * signature;
SOCKET fd; // TCP socket FD
......@@ -659,3 +660,4 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
tfree(pFdObj);
}
#endif
\ No newline at end of file
......@@ -22,6 +22,8 @@
#include "ttimer.h"
#include "tutil.h"
#ifndef USE_UV
#define RPC_MAX_UDP_CONNS 256
#define RPC_MAX_UDP_PKTS 1000
#define RPC_UDP_BUF_TIME 5 // mseconds
......@@ -257,3 +259,4 @@ int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *c
return ret;
}
#endif
\ No newline at end of file
......@@ -56,7 +56,7 @@ typedef struct SSrvMsg {
typedef struct SWorkThrdObj {
pthread_t thread;
uv_pipe_t* pipe;
int fd;
uv_os_fd_t fd;
uv_loop_t* loop;
SAsyncPool* asyncPool;
// uv_async_t* workerAsync; //
......
......@@ -667,17 +667,43 @@ int64_t taosSendFile(SocketFd dfd, FileFd sfd, int64_t *offset, int64_t count) {
#else
int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size) {
if (pFileSrc == NULL) {
// int64_t taosSendFile(int fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size) {
// if (pFileSrc == NULL) {
// return 0;
// }
// assert(pFileSrc->fd >= 0);
// int64_t leftbytes = size;
// int64_t sentbytes;
// while (leftbytes > 0) {
// sentbytes = sendfile(fdDst, pFileSrc->fd, offset, leftbytes);
// if (sentbytes == -1) {
// if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
// continue;
// } else {
// return -1;
// }
// } else if (sentbytes == 0) {
// return (int64_t)(size - leftbytes);
// }
// leftbytes -= sentbytes;
// }
// return size;
// }
int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size) {
if (pFileOut == NULL || pFileIn == NULL) {
return 0;
}
assert(pFileSrc->fd >= 0);
assert(pFileIn->fd >= 0 && pFileOut->fd >= 0);
int64_t leftbytes = size;
int64_t sentbytes;
while (leftbytes > 0) {
sentbytes = sendfile(fdDst, pFileSrc->fd, offset, leftbytes);
sentbytes = sendfile(pFileOut->fd, pFileIn->fd, offset, leftbytes);
if (sentbytes == -1) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
continue;
......@@ -694,15 +720,6 @@ int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_
return size;
}
int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size) {
if (pFileOut == NULL || pFileIn == NULL) {
return 0;
}
assert(pFileOut->fd >= 0);
return taosSendFile(pFileOut->fd, pFileIn, offset, size);
}
#endif
void taosFprintfFile(TdFilePtr pFile, const char *format, ...) {
......
......@@ -34,6 +34,24 @@
#include <unistd.h>
#endif
#ifndef USE_UV
// typedef struct TdSocketServer {
// #if SOCKET_WITH_LOCK
// pthread_rwlock_t rwlock;
// #endif
// int refId;
// SocketFd fd;
// } * TdSocketServerPtr, TdSocketServer;
// typedef struct TdSocketConnector {
// #if SOCKET_WITH_LOCK
// pthread_rwlock_t rwlock;
// #endif
// int refId;
// SocketFd fd;
// } * TdSocketConnectorPtr, TdSocketConnector;
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#define taosSend(sockfd, buf, len, flags) send((SOCKET)sockfd, buf, len, flags)
......@@ -115,15 +133,6 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
void taosIgnSIGPIPE() { signal(SIGPIPE, SIG_IGN); }
void taosBlockSIGPIPE() {
sigset_t signal_mask;
sigemptyset(&signal_mask);
sigaddset(&signal_mask, SIGPIPE);
int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
if (rc != 0) {
//printf("failed to block SIGPIPE");
}
}
void taosSetMaskSIGPIPE() {
sigset_t signal_mask;
......@@ -215,7 +224,6 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
}
void taosIgnSIGPIPE() {}
void taosBlockSIGPIPE() {}
void taosSetMaskSIGPIPE() {}
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
......@@ -786,3 +794,21 @@ int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) {
return len;
}
#endif
#if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32))
void taosBlockSIGPIPE() {
sigset_t signal_mask;
sigemptyset(&signal_mask);
sigaddset(&signal_mask, SIGPIPE);
int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
if (rc != 0) {
//printf("failed to block SIGPIPE");
}
}
#else
void taosBlockSIGPIPE() {}
#endif
\ No newline at end of file
......@@ -12,6 +12,12 @@ target_link_libraries(
PUBLIC lz4_static
PUBLIC api cjson
)
if(${BUILD_WITH_UV})
target_link_libraries(
util
PUBLIC uv_a
)
endif(${BUILD_TEST})
if(${BUILD_TEST})
ADD_SUBDIRECTORY(test)
......
......@@ -18,6 +18,67 @@
#include "taoserror.h"
#include "tlog.h"
#ifdef USE_UV
#include <uv.h>
void clientConnCb(uv_connect_t* req, int status) {
if(status < 0) {
terrno = TAOS_SYSTEM_ERROR(status);
uError("Connection error %s\n",uv_strerror(status));
return;
}
// impl later
uv_buf_t* wb = req->data;
if (wb == NULL) {
uv_close((uv_handle_t *)req->handle,NULL);
}
uv_write_t write_req;
uv_write(&write_req, req->handle, wb, 2, NULL);
uv_close((uv_handle_t *)req->handle,NULL);
}
int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen) {
uint32_t ipv4 = taosGetIpv4FromFqdn(server);
if (ipv4 == 0xffffffff) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to get http server:%s ip since %s", server, terrstr());
return -1;
// goto SEND_OVER;
}
char ipv4Buf[128];
tinet_ntoa(ipv4Buf, ipv4);
struct sockaddr_in dest;
uv_ip4_addr(ipv4Buf, port, &dest);
uv_tcp_t socket_tcp;
uv_loop_t *loop = uv_default_loop();
uv_tcp_init(loop, &socket_tcp);
uv_connect_t* connect = (uv_connect_t*)malloc(sizeof(uv_connect_t));
char header[4096] = {0};
int32_t headLen = snprintf(header, sizeof(header),
"POST /report HTTP/1.1\n"
"Host: %s\n"
"Content-Type: application/json\n"
"Content-Length: %d\n\n",
server, contLen);
uv_buf_t wb[2];
wb[0] = uv_buf_init((char*)header, headLen);
wb[1] = uv_buf_init((char*)pCont, contLen);
connect->data = wb;
uv_tcp_connect(connect, &socket_tcp, (const struct sockaddr*)&dest, clientConnCb);
terrno = 0;
uv_run(loop,UV_RUN_DEFAULT);
uv_loop_close(loop);
free(connect);
return terrno;
}
#else
int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen) {
int32_t code = -1;
SOCKET fd = 0;
......@@ -73,3 +134,5 @@ SEND_OVER:
return code;
}
#endif
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册