提交 c1370a94 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-11463-3.0

...@@ -87,7 +87,6 @@ int32_t taosRemoveFile(const char *path); ...@@ -87,7 +87,6 @@ int32_t taosRemoveFile(const char *path);
void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath); void taosGetTmpfilePath(const char *inputTmpDir, const char *fileNamePrefix, char *dstPath);
int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size);
int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size); int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size);
void *taosMmapReadOnlyFile(TdFilePtr pFile, int64_t length); void *taosMmapReadOnlyFile(TdFilePtr pFile, int64_t length);
......
...@@ -16,6 +16,14 @@ ...@@ -16,6 +16,14 @@
#ifndef _TD_OS_SOCKET_H_ #ifndef _TD_OS_SOCKET_H_
#define _TD_OS_SOCKET_H_ #define _TD_OS_SOCKET_H_
// If the error is in a third-party library, place this header file under the third-party library header file.
#ifndef ALLOW_FORBID_FUNC
#define socket SOCKET_FUNC_TAOS_FORBID
#define bind BIND_FUNC_TAOS_FORBID
#define listen LISTEN_FUNC_TAOS_FORBID
// #define accept ACCEPT_FUNC_TAOS_FORBID
#endif
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#include "winsock2.h" #include "winsock2.h"
#include <WS2tcpip.h> #include <WS2tcpip.h>
...@@ -30,6 +38,8 @@ ...@@ -30,6 +38,8 @@
extern "C" { extern "C" {
#endif #endif
#ifndef USE_UV
#define TAOS_EPOLL_WAIT_TIME 500 #define TAOS_EPOLL_WAIT_TIME 500
typedef int32_t SOCKET; typedef int32_t SOCKET;
typedef SOCKET EpollFd; typedef SOCKET EpollFd;
...@@ -50,7 +60,6 @@ void taosShutDownSocketRD(SOCKET fd); ...@@ -50,7 +60,6 @@ void taosShutDownSocketRD(SOCKET fd);
void taosShutDownSocketWR(SOCKET fd); void taosShutDownSocketWR(SOCKET fd);
int32_t taosSetNonblocking(SOCKET sock, int32_t on); int32_t taosSetNonblocking(SOCKET sock, int32_t on);
void taosIgnSIGPIPE(); void taosIgnSIGPIPE();
void taosBlockSIGPIPE();
void taosSetMaskSIGPIPE(); void taosSetMaskSIGPIPE();
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen); int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen);
int32_t taosGetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t *optlen); int32_t taosGetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t *optlen);
...@@ -86,6 +95,10 @@ uint32_t taosGetIpv4FromFqdn(const char *); ...@@ -86,6 +95,10 @@ uint32_t taosGetIpv4FromFqdn(const char *);
void tinet_ntoa(char *ipstr, uint32_t ip); void tinet_ntoa(char *ipstr, uint32_t ip);
uint32_t ip2uint(const char *const ip_addr); uint32_t ip2uint(const char *const ip_addr);
#endif
void taosBlockSIGPIPE();
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -20,8 +20,6 @@ ...@@ -20,8 +20,6 @@
#include "rpcCache.h" #include "rpcCache.h"
#include "rpcHead.h" #include "rpcHead.h"
#include "rpcLog.h" #include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h" #include "taoserror.h"
#include "tglobal.h" #include "tglobal.h"
#include "thash.h" #include "thash.h"
......
...@@ -24,8 +24,6 @@ ...@@ -24,8 +24,6 @@
#include "rpcCache.h" #include "rpcCache.h"
#include "rpcHead.h" #include "rpcHead.h"
#include "rpcLog.h" #include "rpcLog.h"
#include "rpcTcp.h"
#include "rpcUdp.h"
#include "taoserror.h" #include "taoserror.h"
#include "tglobal.h" #include "tglobal.h"
#include "thash.h" #include "thash.h"
......
...@@ -21,6 +21,7 @@ ...@@ -21,6 +21,7 @@
#include "taoserror.h" #include "taoserror.h"
#include "tutil.h" #include "tutil.h"
#ifndef USE_UV
typedef struct SFdObj { typedef struct SFdObj {
void * signature; void * signature;
SOCKET fd; // TCP socket FD SOCKET fd; // TCP socket FD
...@@ -659,3 +660,4 @@ static void taosFreeFdObj(SFdObj *pFdObj) { ...@@ -659,3 +660,4 @@ static void taosFreeFdObj(SFdObj *pFdObj) {
tfree(pFdObj); tfree(pFdObj);
} }
#endif
\ No newline at end of file
...@@ -22,6 +22,8 @@ ...@@ -22,6 +22,8 @@
#include "ttimer.h" #include "ttimer.h"
#include "tutil.h" #include "tutil.h"
#ifndef USE_UV
#define RPC_MAX_UDP_CONNS 256 #define RPC_MAX_UDP_CONNS 256
#define RPC_MAX_UDP_PKTS 1000 #define RPC_MAX_UDP_PKTS 1000
#define RPC_UDP_BUF_TIME 5 // mseconds #define RPC_UDP_BUF_TIME 5 // mseconds
...@@ -257,3 +259,4 @@ int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *c ...@@ -257,3 +259,4 @@ int taosSendUdpData(uint32_t ip, uint16_t port, void *data, int dataLen, void *c
return ret; return ret;
} }
#endif
\ No newline at end of file
...@@ -56,7 +56,7 @@ typedef struct SSrvMsg { ...@@ -56,7 +56,7 @@ typedef struct SSrvMsg {
typedef struct SWorkThrdObj { typedef struct SWorkThrdObj {
pthread_t thread; pthread_t thread;
uv_pipe_t* pipe; uv_pipe_t* pipe;
int fd; uv_os_fd_t fd;
uv_loop_t* loop; uv_loop_t* loop;
SAsyncPool* asyncPool; SAsyncPool* asyncPool;
// uv_async_t* workerAsync; // // uv_async_t* workerAsync; //
......
...@@ -667,17 +667,43 @@ int64_t taosSendFile(SocketFd dfd, FileFd sfd, int64_t *offset, int64_t count) { ...@@ -667,17 +667,43 @@ int64_t taosSendFile(SocketFd dfd, FileFd sfd, int64_t *offset, int64_t count) {
#else #else
int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size) { // int64_t taosSendFile(int fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_t size) {
if (pFileSrc == NULL) { // if (pFileSrc == NULL) {
// return 0;
// }
// assert(pFileSrc->fd >= 0);
// int64_t leftbytes = size;
// int64_t sentbytes;
// while (leftbytes > 0) {
// sentbytes = sendfile(fdDst, pFileSrc->fd, offset, leftbytes);
// if (sentbytes == -1) {
// if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
// continue;
// } else {
// return -1;
// }
// } else if (sentbytes == 0) {
// return (int64_t)(size - leftbytes);
// }
// leftbytes -= sentbytes;
// }
// return size;
// }
int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size) {
if (pFileOut == NULL || pFileIn == NULL) {
return 0; return 0;
} }
assert(pFileSrc->fd >= 0); assert(pFileIn->fd >= 0 && pFileOut->fd >= 0);
int64_t leftbytes = size; int64_t leftbytes = size;
int64_t sentbytes; int64_t sentbytes;
while (leftbytes > 0) { while (leftbytes > 0) {
sentbytes = sendfile(fdDst, pFileSrc->fd, offset, leftbytes); sentbytes = sendfile(pFileOut->fd, pFileIn->fd, offset, leftbytes);
if (sentbytes == -1) { if (sentbytes == -1) {
if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) { if (errno == EINTR || errno == EAGAIN || errno == EWOULDBLOCK) {
continue; continue;
...@@ -694,15 +720,6 @@ int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_ ...@@ -694,15 +720,6 @@ int64_t taosSendFile(SocketFd fdDst, TdFilePtr pFileSrc, int64_t *offset, int64_
return size; return size;
} }
int64_t taosFSendFile(TdFilePtr pFileOut, TdFilePtr pFileIn, int64_t *offset, int64_t size) {
if (pFileOut == NULL || pFileIn == NULL) {
return 0;
}
assert(pFileOut->fd >= 0);
return taosSendFile(pFileOut->fd, pFileIn, offset, size);
}
#endif #endif
void taosFprintfFile(TdFilePtr pFile, const char *format, ...) { void taosFprintfFile(TdFilePtr pFile, const char *format, ...) {
......
...@@ -34,6 +34,24 @@ ...@@ -34,6 +34,24 @@
#include <unistd.h> #include <unistd.h>
#endif #endif
#ifndef USE_UV
// typedef struct TdSocketServer {
// #if SOCKET_WITH_LOCK
// pthread_rwlock_t rwlock;
// #endif
// int refId;
// SocketFd fd;
// } * TdSocketServerPtr, TdSocketServer;
// typedef struct TdSocketConnector {
// #if SOCKET_WITH_LOCK
// pthread_rwlock_t rwlock;
// #endif
// int refId;
// SocketFd fd;
// } * TdSocketConnectorPtr, TdSocketConnector;
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#define taosSend(sockfd, buf, len, flags) send((SOCKET)sockfd, buf, len, flags) #define taosSend(sockfd, buf, len, flags) send((SOCKET)sockfd, buf, len, flags)
...@@ -115,15 +133,6 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) { ...@@ -115,15 +133,6 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
void taosIgnSIGPIPE() { signal(SIGPIPE, SIG_IGN); } void taosIgnSIGPIPE() { signal(SIGPIPE, SIG_IGN); }
void taosBlockSIGPIPE() {
sigset_t signal_mask;
sigemptyset(&signal_mask);
sigaddset(&signal_mask, SIGPIPE);
int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
if (rc != 0) {
//printf("failed to block SIGPIPE");
}
}
void taosSetMaskSIGPIPE() { void taosSetMaskSIGPIPE() {
sigset_t signal_mask; sigset_t signal_mask;
...@@ -215,7 +224,6 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) { ...@@ -215,7 +224,6 @@ int32_t taosSetNonblocking(SOCKET sock, int32_t on) {
} }
void taosIgnSIGPIPE() {} void taosIgnSIGPIPE() {}
void taosBlockSIGPIPE() {}
void taosSetMaskSIGPIPE() {} void taosSetMaskSIGPIPE() {}
int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) { int32_t taosSetSockOpt(SOCKET socketfd, int32_t level, int32_t optname, void *optval, int32_t optlen) {
...@@ -786,3 +794,21 @@ int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) { ...@@ -786,3 +794,21 @@ int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) {
return len; return len;
} }
#endif
#if !(defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32))
void taosBlockSIGPIPE() {
sigset_t signal_mask;
sigemptyset(&signal_mask);
sigaddset(&signal_mask, SIGPIPE);
int32_t rc = pthread_sigmask(SIG_BLOCK, &signal_mask, NULL);
if (rc != 0) {
//printf("failed to block SIGPIPE");
}
}
#else
void taosBlockSIGPIPE() {}
#endif
\ No newline at end of file
...@@ -12,6 +12,12 @@ target_link_libraries( ...@@ -12,6 +12,12 @@ target_link_libraries(
PUBLIC lz4_static PUBLIC lz4_static
PUBLIC api cjson PUBLIC api cjson
) )
if(${BUILD_WITH_UV})
target_link_libraries(
util
PUBLIC uv_a
)
endif(${BUILD_TEST})
if(${BUILD_TEST}) if(${BUILD_TEST})
ADD_SUBDIRECTORY(test) ADD_SUBDIRECTORY(test)
......
...@@ -18,6 +18,67 @@ ...@@ -18,6 +18,67 @@
#include "taoserror.h" #include "taoserror.h"
#include "tlog.h" #include "tlog.h"
#ifdef USE_UV
#include <uv.h>
void clientConnCb(uv_connect_t* req, int status) {
if(status < 0) {
terrno = TAOS_SYSTEM_ERROR(status);
uError("Connection error %s\n",uv_strerror(status));
return;
}
// impl later
uv_buf_t* wb = req->data;
if (wb == NULL) {
uv_close((uv_handle_t *)req->handle,NULL);
}
uv_write_t write_req;
uv_write(&write_req, req->handle, wb, 2, NULL);
uv_close((uv_handle_t *)req->handle,NULL);
}
int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen) {
uint32_t ipv4 = taosGetIpv4FromFqdn(server);
if (ipv4 == 0xffffffff) {
terrno = TAOS_SYSTEM_ERROR(errno);
uError("failed to get http server:%s ip since %s", server, terrstr());
return -1;
// goto SEND_OVER;
}
char ipv4Buf[128];
tinet_ntoa(ipv4Buf, ipv4);
struct sockaddr_in dest;
uv_ip4_addr(ipv4Buf, port, &dest);
uv_tcp_t socket_tcp;
uv_loop_t *loop = uv_default_loop();
uv_tcp_init(loop, &socket_tcp);
uv_connect_t* connect = (uv_connect_t*)malloc(sizeof(uv_connect_t));
char header[4096] = {0};
int32_t headLen = snprintf(header, sizeof(header),
"POST /report HTTP/1.1\n"
"Host: %s\n"
"Content-Type: application/json\n"
"Content-Length: %d\n\n",
server, contLen);
uv_buf_t wb[2];
wb[0] = uv_buf_init((char*)header, headLen);
wb[1] = uv_buf_init((char*)pCont, contLen);
connect->data = wb;
uv_tcp_connect(connect, &socket_tcp, (const struct sockaddr*)&dest, clientConnCb);
terrno = 0;
uv_run(loop,UV_RUN_DEFAULT);
uv_loop_close(loop);
free(connect);
return terrno;
}
#else
int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen) { int32_t taosSendHttpReport(const char* server, uint16_t port, const char* pCont, int32_t contLen) {
int32_t code = -1; int32_t code = -1;
SOCKET fd = 0; SOCKET fd = 0;
...@@ -73,3 +134,5 @@ SEND_OVER: ...@@ -73,3 +134,5 @@ SEND_OVER:
return code; return code;
} }
#endif
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册