diff --git a/include/util/ttoken.h b/include/common/ttoken.h similarity index 100% rename from include/util/ttoken.h rename to include/common/ttoken.h diff --git a/src/inc/ttokendef.h b/include/common/ttokendef.h similarity index 100% rename from src/inc/ttokendef.h rename to include/common/ttokendef.h diff --git a/include/os/osDir.h b/include/os/osDir.h index 74286edbe80ac1212aead2a5fe0b7d1fd80a05f0..32733753a8a1de2dcdd5fe1a0244533aae4dc19c 100644 --- a/include/os/osDir.h +++ b/include/os/osDir.h @@ -22,7 +22,7 @@ extern "C" { void taosRemoveDir(char *dirname); bool taosDirExist(char *dirname); -bool taosMkDir(char *dirname, mode_t mode); +bool taosMkDir(char *dirname); void taosRemoveOldFiles(char *dirname, int32_t keepDays); bool taosExpandDir(char *dirname, char *outname, int32_t maxlen); bool taosRealPath(char *dirname, int32_t maxlen); diff --git a/include/os/osFile.h b/include/os/osFile.h index 66c3ae9cbd625cdce25d79bf9bb81faa43ada259..83f90d4ba286457ed0ac6fee378557bc66be97b6 100644 --- a/include/os/osFile.h +++ b/include/os/osFile.h @@ -46,8 +46,10 @@ int32_t taosFStatFile(FileFd fd, int64_t *size, int32_t *mtime); FileFd taosOpenFileWrite(const char *path); FileFd taosOpenFileCreateWrite(const char *path); -FileFd taosOpenFileTruncCreateWrite(const char *path); +FileFd taosOpenFileCreateWriteTrunc(const char *path); +FileFd taosOpenFileCreateWriteAppend(const char *path); FileFd taosOpenFileRead(const char *path); +FileFd taosOpenFileReadWrite(const char *path); int64_t taosLSeekFile(FileFd fd, int64_t offset, int32_t whence); int32_t taosFtruncateFile(FileFd fd, int64_t length); diff --git a/include/os/osSocket.h b/include/os/osSocket.h index 11a6486ff6fe08ea020f14aa352f8c4ec4420db3..c503e667e6e3dc78cae1640e3b99c80e9631ae00 100644 --- a/include/os/osSocket.h +++ b/include/os/osSocket.h @@ -85,6 +85,23 @@ const char *taosInetNtoa(struct in_addr ipInt); #define htobe64 htonll #endif +int32_t taosReadn(SOCKET sock, char *buffer, int32_t len); +int32_t taosWriteMsg(SOCKET fd, void *ptr, int32_t nbytes); +int32_t taosReadMsg(SOCKET fd, void *ptr, int32_t nbytes); +int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes); +int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len); +int32_t taosSetNonblocking(SOCKET sock, int32_t on); + +SOCKET taosOpenUdpSocket(uint32_t localIp, uint16_t localPort); +SOCKET taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp); +SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port); +int32_t taosKeepTcpAlive(SOCKET sockFd); + +int32_t taosGetFqdn(char *); +uint32_t taosGetIpv4FromFqdn(const char *); +void tinet_ntoa(char *ipstr, uint32_t ip); +uint32_t ip2uint(const char *const ip_addr); + #ifdef __cplusplus } #endif diff --git a/include/os/osTimer.h b/include/os/osTimer.h index 2afe018c433eee13e05de70305f344c81f7c7b9c..4b5db895a2f4d190138c1c79c873336f163382e2 100644 --- a/include/os/osTimer.h +++ b/include/os/osTimer.h @@ -24,6 +24,8 @@ extern "C" { int32_t taosInitTimer(void (*callback)(int32_t), int32_t ms); void taosUninitTimer(); +int64_t taosGetMonotonicMs(); +const char *taosMonotonicInit(); #ifdef __cplusplus } diff --git a/include/util/tcoding.h b/include/util/tcoding.h index ccc3c26124da1fbf5a8b9e3c4627d7b69a6520d8..a11768a1b08c1655238c87e918281d16c5f9084c 100644 --- a/include/util/tcoding.h +++ b/include/util/tcoding.h @@ -19,7 +19,7 @@ extern "C" { #endif -#include "os.h " +#include "os.h" #define ENCODE_LIMIT (((uint8_t)1) << 7) #define ZIGZAGE(T, v) ((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1) // zigzag encode diff --git a/include/util/tfile.h b/include/util/tfile.h index 066040170e44c24539e29b5de5acd438e8b9b9d0..155c127eff065d8a9b83bf60e5e54ad3b5db1100 100644 --- a/include/util/tfile.h +++ b/include/util/tfile.h @@ -28,8 +28,10 @@ void tfCleanup(); // the same syntax as UNIX standard open/close/read/write // but FD is int64_t and will never be reused -int64_t tfOpen(const char *pathname, int32_t flags); -int64_t tfOpenM(const char *pathname, int32_t flags, mode_t mode); +int64_t tfOpenReadWrite(const char *pathname); +int64_t tfOpenCreateWrite(const char *pathname); +int64_t tfOpenCreateWriteAppend(const char *pathname); + int64_t tfClose(int64_t tfd); int64_t tfWrite(int64_t tfd, void *buf, int64_t count); int64_t tfRead(int64_t tfd, void *buf, int64_t count); diff --git a/include/util/tskiplist.h b/include/util/tskiplist.h index d9dc001ccd072e9803d7241bd7e0e3e029a77b3e..9aa225078b4e4733e7aa0b5cc0dc30c2130981b7 100644 --- a/include/util/tskiplist.h +++ b/include/util/tskiplist.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "os.h" -#include "taosdef.h" +#include "tdef.h" #include "tarray.h" #include "tfunctional.h" diff --git a/include/util/tthread.h b/include/util/tthread.h index 7443ad706dcbef529d857fe823cddd0cc1efbdd3..879c73ef89162a009dcf26d033f87858d7b08bf6 100644 --- a/include/util/tthread.h +++ b/include/util/tthread.h @@ -21,7 +21,7 @@ extern "C" { #endif #include "os.h" -#include "taosdef.h" +#include "tdef.h" // create new thread pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param); diff --git a/source/common/src/texpr.c b/source/common/src/texpr.c index 6823de631fe65a27f527be011def4819dd77c73a..01e1f71c2f099bde9886af755a9ce69ffaf6eaf9 100644 --- a/source/common/src/texpr.c +++ b/source/common/src/texpr.c @@ -23,7 +23,9 @@ #include "tarray.h" #include "tbuffer.h" #include "tcompare.h" -#include "tsdb.h" +#include "tname.h" +#include "hash.h" +// #include "tsdb.h" #include "tskiplist.h" #include "texpr.h" #include "tarithoperator.h" diff --git a/source/util/src/ttokenizer.c b/source/common/src/ttokenizer.c similarity index 100% rename from source/util/src/ttokenizer.c rename to source/common/src/ttokenizer.c diff --git a/source/common/src/tvariant.c b/source/common/src/tvariant.c index ca3bb956a2fef4fa98450181b4378025013bb735..c12650ee8b767621e22334fd2adb2363fe4731bd 100644 --- a/source/common/src/tvariant.c +++ b/source/common/src/tvariant.c @@ -17,6 +17,7 @@ #include "hash.h" #include "taos.h" #include "taosdef.h" +#include "ttime.h" #include "ttoken.h" #include "ttokendef.h" #include "ttype.h" diff --git a/source/os/src/osDir.c b/source/os/src/osDir.c index 9bca1e6c2e88aaa4d11407f4dc7ce28481468c43..0526dffe9e799616a0d1b091e2765b02807e64cd 100644 --- a/source/os/src/osDir.c +++ b/source/os/src/osDir.c @@ -60,7 +60,7 @@ void taosRemoveDir(char *dirname) { bool taosDirExist(char *dirname) { return access(dirname, F_OK) == 0; } -bool taosMkDir(char *dirname, mode_t mode) { +bool taosMkDir(char *dirname) { int32_t code = mkdir(dirname, 0755); if (code < 0 && errno == EEXIST) { return true; diff --git a/source/os/src/osFile.c b/source/os/src/osFile.c index 3726830635df727fedcdcf761e72c682a289922e..b5d30d7c25d462ef87a7bf5799dba3a1552df44e 100644 --- a/source/os/src/osFile.c +++ b/source/os/src/osFile.c @@ -16,6 +16,22 @@ #include "os.h" #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) +#include + +#if defined(_MSDOS) +#define open _open +#endif + +#if defined(_WIN32) +extern int openA(const char *, int, ...); /* MsvcLibX ANSI version of open */ +extern int openU(const char *, int, ...); /* MsvcLibX UTF-8 version of open */ +#if defined(_UTF8_SOURCE) || defined(_BSD_SOURCE) || defined(_GNU_SOURCE) +#define open openU +#else /* _ANSI_SOURCE */ +#define open openA +#endif /* defined(_UTF8_SOURCE) */ +#endif /* defined(_WIN32) */ + #else #include #include @@ -478,26 +494,44 @@ int32_t taosOpenFileWrite(const char *path) { #endif } -FileFd taosOpenFileRead(const char *path) { - #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) +int32_t taosOpenFileCreateWrite(const char *path) { +#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) return 0; #else - return open(path, O_RDONLY); + return open(path, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); #endif } -int32_t taosOpenFileCreateWrite(const char *path) { +int32_t taosOpenFileCreateWriteTrunc(const char *path) { #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) return 0; #else - return open(path, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); + return open(path, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); #endif } -int32_t taosOpenFileTruncCreateWrite(const char *path) { +int32_t taosOpenFileCreateWriteAppend(const char *path) { #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) return 0; #else - return open(path, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); + return open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); #endif -} \ No newline at end of file +} + +FileFd taosOpenFileRead(const char *path) { + #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + return 0; +#else + return open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO); +#endif +} + +FileFd taosOpenFileReadWrite(const char *path) { + #if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32) + return 0; +#else + return open(path, O_RDWR, S_IRWXU | S_IRWXG | S_IRWXO); +#endif +} + + diff --git a/source/os/src/osSocket.c b/source/os/src/osSocket.c index d2f3c120db80c778a74b53eed4d9d7c9e5027707..0b62df4524e614a45d03683d71e1f1e9d2c917cc 100644 --- a/source/os/src/osSocket.c +++ b/source/os/src/osSocket.c @@ -224,4 +224,525 @@ uint64_t htonll(uint64_t val) { return (((uint64_t)htonl(val)) << 32) + htonl(va #endif -#endif \ No newline at end of file +#endif + + + +#ifndef SIGPIPE + #define SIGPIPE EPIPE +#endif + +#define TCP_CONN_TIMEOUT 3000 // conn timeout + +int32_t taosGetFqdn(char *fqdn) { + char hostname[1024]; + hostname[1023] = '\0'; + if (gethostname(hostname, 1023) == -1) { + printf("failed to get hostname, reason:%s", strerror(errno)); + return -1; + } + + struct addrinfo hints = {0}; + struct addrinfo *result = NULL; +#ifdef __APPLE__ + // on macosx, hostname -f has the form of xxx.local + // which will block getaddrinfo for a few seconds if AI_CANONNAME is set + // thus, we choose AF_INET (ipv4 for the moment) to make getaddrinfo return + // immediately + hints.ai_family = AF_INET; +#else // __APPLE__ + hints.ai_flags = AI_CANONNAME; +#endif // __APPLE__ + int32_t ret = getaddrinfo(hostname, NULL, &hints, &result); + if (!result) { + printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret)); + return -1; + } + +#ifdef __APPLE__ + // refer to comments above + strcpy(fqdn, hostname); +#else // __APPLE__ + strcpy(fqdn, result->ai_canonname); +#endif // __APPLE__ + freeaddrinfo(result); + return 0; +} + +uint32_t taosGetIpv4FromFqdn(const char *fqdn) { + struct addrinfo hints = {0}; + hints.ai_family = AF_INET; + hints.ai_socktype = SOCK_STREAM; + + struct addrinfo *result = NULL; + + int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result); + if (result) { + struct sockaddr * sa = result->ai_addr; + struct sockaddr_in *si = (struct sockaddr_in *)sa; + struct in_addr ia = si->sin_addr; + uint32_t ip = ia.s_addr; + freeaddrinfo(result); + return ip; + } else { +#ifdef EAI_SYSTEM + if (ret == EAI_SYSTEM) { + printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno)); + } else { + printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret)); + } +#else + printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret)); +#endif + return 0xFFFFFFFF; + } +} + +// Function converting an IP address string to an uint32_t. +uint32_t ip2uint(const char *const ip_addr) { + char ip_addr_cpy[20]; + char ip[5]; + + tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy)); + + char *s_start, *s_end; + s_start = ip_addr_cpy; + s_end = ip_addr_cpy; + + int32_t k; + + for (k = 0; *s_start != '\0'; s_start = s_end) { + for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) { + } + if (*s_end == '.') { + *s_end = '\0'; + s_end++; + } + ip[k++] = (char)atoi(s_start); + } + + ip[k] = '\0'; + + return *((uint32_t *)ip); +} + +int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) { + int32_t nleft, nwritten; + char * ptr = (char *)buf; + + nleft = nbytes; + + while (nleft > 0) { + nwritten = (int32_t)taosWriteSocket(fd, (char *)ptr, (size_t)nleft); + if (nwritten <= 0) { + if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK */) + continue; + else + return -1; + } else { + nleft -= nwritten; + ptr += nwritten; + } + + if (errno == SIGPIPE || errno == EPIPE) { + return -1; + } + } + + return (nbytes - nleft); +} + +int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) { + int32_t nleft, nread; + char * ptr = (char *)buf; + + nleft = nbytes; + + if (fd < 0) return -1; + + while (nleft > 0) { + nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft); + if (nread == 0) { + break; + } else if (nread < 0) { + if (errno == EINTR/* || errno == EAGAIN || errno == EWOULDBLOCK*/) { + continue; + } else { + return -1; + } + } else { + nleft -= nread; + ptr += nread; + } + + if (errno == SIGPIPE || errno == EPIPE) { + return -1; + } + } + + return (nbytes - nleft); +} + +int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes) { + taosSetNonblocking(fd, 1); + + int32_t nleft, nwritten, nready; + fd_set fset; + struct timeval tv; + + nleft = nbytes; + while (nleft > 0) { + tv.tv_sec = 30; + tv.tv_usec = 0; + FD_ZERO(&fset); + FD_SET(fd, &fset); + if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) { + errno = ETIMEDOUT; + printf("fd %d timeout, no enough space to write", fd); + break; + + } else if (nready < 0) { + if (errno == EINTR) continue; + + printf("select error, %d (%s)", errno, strerror(errno)); + return -1; + } + + nwritten = (int32_t)taosSend(fd, ptr, (size_t)nleft, MSG_NOSIGNAL); + if (nwritten <= 0) { + if (errno == EAGAIN || errno == EINTR) continue; + + printf("write error, %d (%s)", errno, strerror(errno)); + return -1; + } + + nleft -= nwritten; + ptr += nwritten; + } + + taosSetNonblocking(fd, 0); + + return (nbytes - nleft); +} + +int32_t taosReadn(SOCKET fd, char *ptr, int32_t nbytes) { + int32_t nread, nready, nleft = nbytes; + + fd_set fset; + struct timeval tv; + + while (nleft > 0) { + tv.tv_sec = 30; + tv.tv_usec = 0; + FD_ZERO(&fset); + FD_SET(fd, &fset); + if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) { + errno = ETIMEDOUT; + printf("fd %d timeout\n", fd); + break; + } else if (nready < 0) { + if (errno == EINTR) continue; + printf("select error, %d (%s)", errno, strerror(errno)); + return -1; + } + + if ((nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft)) < 0) { + if (errno == EINTR) continue; + printf("read error, %d (%s)", errno, strerror(errno)); + return -1; + + } else if (nread == 0) { + printf("fd %d EOF", fd); + break; // EOF + } + + nleft -= nread; + ptr += nread; + } + + return (nbytes - nleft); +} + +SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) { + struct sockaddr_in localAddr; + SOCKET sockFd; + int32_t bufSize = 1024000; + + printf("open udp socket:0x%x:%hu", ip, port); + + memset((char *)&localAddr, 0, sizeof(localAddr)); + localAddr.sin_family = AF_INET; + localAddr.sin_addr.s_addr = ip; + localAddr.sin_port = (uint16_t)htons(port); + + if ((sockFd = socket(AF_INET, SOCK_DGRAM, 0)) <= 2) { + printf("failed to open udp socket: %d (%s)", errno, strerror(errno)); + taosCloseSocketNoCheck(sockFd); + return -1; + } + + if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { + printf("failed to set the send buffer size for UDP socket\n"); + taosCloseSocket(sockFd); + return -1; + } + + if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { + printf("failed to set the receive buffer size for UDP socket\n"); + taosCloseSocket(sockFd); + return -1; + } + + /* bind socket to local address */ + if (bind(sockFd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) { + printf("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port); + taosCloseSocket(sockFd); + return -1; + } + + return sockFd; +} + +SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) { + SOCKET sockFd = 0; + int32_t ret; + struct sockaddr_in serverAddr, clientAddr; + int32_t bufSize = 1024 * 1024; + + sockFd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); + + if (sockFd <= 2) { + printf("failed to open the socket: %d (%s)", errno, strerror(errno)); + taosCloseSocketNoCheck(sockFd); + return -1; + } + + /* set REUSEADDR option, so the portnumber can be re-used */ + int32_t reuse = 1; + if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { + printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno)); + taosCloseSocket(sockFd); + return -1; + } + + if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { + printf("failed to set the send buffer size for TCP socket\n"); + taosCloseSocket(sockFd); + return -1; + } + + if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { + printf("failed to set the receive buffer size for TCP socket\n"); + taosCloseSocket(sockFd); + return -1; + } + + if (clientIp != 0) { + memset((char *)&clientAddr, 0, sizeof(clientAddr)); + clientAddr.sin_family = AF_INET; + clientAddr.sin_addr.s_addr = clientIp; + clientAddr.sin_port = 0; + + /* bind socket to client address */ + if (bind(sockFd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) { + printf("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", clientIp, destIp, destPort, + strerror(errno)); + taosCloseSocket(sockFd); + return -1; + } + } + + memset((char *)&serverAddr, 0, sizeof(serverAddr)); + serverAddr.sin_family = AF_INET; + serverAddr.sin_addr.s_addr = destIp; + serverAddr.sin_port = (uint16_t)htons((uint16_t)destPort); + +#ifdef _TD_LINUX + taosSetNonblocking(sockFd, 1); + ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)); + if (ret == -1) { + if (errno == EHOSTUNREACH) { + printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno)); + taosCloseSocket(sockFd); + return -1; + } else if (errno == EINPROGRESS || errno == EAGAIN || errno == EWOULDBLOCK) { + struct pollfd wfd[1]; + + wfd[0].fd = sockFd; + wfd[0].events = POLLOUT; + + int res = poll(wfd, 1, TCP_CONN_TIMEOUT); + if (res == -1 || res == 0) { + printf("failed to connect socket, ip:0x%x, port:%hu(poll error/conn timeout)", destIp, destPort); + taosCloseSocket(sockFd); // + return -1; + } + int optVal = -1, optLen = sizeof(int); + if ((0 != taosGetSockOpt(sockFd, SOL_SOCKET, SO_ERROR, &optVal, &optLen)) || (optVal != 0)) { + printf("failed to connect socket, ip:0x%x, port:%hu(connect host error)", destIp, destPort); + taosCloseSocket(sockFd); // + return -1; + } + ret = 0; + } else { // Other error + printf("failed to connect socket, ip:0x%x, port:%hu(target host cannot be reached)", destIp, destPort); + taosCloseSocket(sockFd); // + return -1; + } + } + taosSetNonblocking(sockFd, 0); + +#else + ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)); +#endif + + if (ret != 0) { + printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno)); + taosCloseSocket(sockFd); + sockFd = -1; + } else { + taosKeepTcpAlive(sockFd); + } + + return sockFd; +} + +int32_t taosKeepTcpAlive(SOCKET sockFd) { + int32_t alive = 1; + if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) { + printf("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno)); + taosCloseSocket(sockFd); + return -1; + } + +#ifndef __APPLE__ + // all fails on macosx + int32_t probes = 3; + if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) { + printf("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno)); + taosCloseSocket(sockFd); + return -1; + } + + int32_t alivetime = 10; + if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) { + printf("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno)); + taosCloseSocket(sockFd); + return -1; + } + + int32_t interval = 3; + if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) { + printf("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno)); + taosCloseSocket(sockFd); + return -1; + } +#endif // __APPLE__ + + int32_t nodelay = 1; + if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) { + printf("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno)); + taosCloseSocket(sockFd); + return -1; + } + + struct linger linger = {0}; + linger.l_onoff = 1; + linger.l_linger = 3; + if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) { + printf("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno)); + taosCloseSocket(sockFd); + return -1; + } + + return 0; +} + +SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { + struct sockaddr_in serverAdd; + SOCKET sockFd; + int32_t reuse; + + printf("open tcp server socket:0x%x:%hu", ip, port); + + bzero((char *)&serverAdd, sizeof(serverAdd)); + serverAdd.sin_family = AF_INET; + serverAdd.sin_addr.s_addr = ip; + serverAdd.sin_port = (uint16_t)htons(port); + + if ((sockFd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) { + printf("failed to open TCP socket: %d (%s)", errno, strerror(errno)); + taosCloseSocketNoCheck(sockFd); + return -1; + } + + /* set REUSEADDR option, so the portnumber can be re-used */ + reuse = 1; + if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { + printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno)); + taosCloseSocket(sockFd); + return -1; + } + + /* bind socket to server address */ + if (bind(sockFd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) { + printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno)); + taosCloseSocket(sockFd); + return -1; + } + + if (taosKeepTcpAlive(sockFd) < 0) { + printf("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno)); + taosCloseSocket(sockFd); + return -1; + } + + if (listen(sockFd, 1024) < 0) { + printf("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno)); + taosCloseSocket(sockFd); + return -1; + } + + return sockFd; +} + +void tinet_ntoa(char *ipstr, uint32_t ip) { + sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24); +} + +#define COPY_SIZE 32768 +// sendfile shall be used + +int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) { + int64_t leftLen; + int64_t readLen, writeLen; + char temp[COPY_SIZE]; + + leftLen = len; + + while (leftLen > 0) { + if (leftLen < COPY_SIZE) + readLen = leftLen; + else + readLen = COPY_SIZE; // 4K + + int64_t retLen = taosReadMsg(sfd, temp, (int32_t)readLen); + if (readLen != retLen) { + printf("read error, readLen:%" PRId64 " retLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s", + readLen, retLen, len, leftLen, strerror(errno)); + return -1; + } + + writeLen = taosWriteMsg(dfd, temp, (int32_t)readLen); + + if (readLen != writeLen) { + printf("copy error, readLen:%" PRId64 " writeLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s", + readLen, writeLen, len, leftLen, strerror(errno)); + return -1; + } + + leftLen -= readLen; + } + + return len; +} diff --git a/source/os/src/osTime.c b/source/os/src/osTime.c index 2deac7e216f117577967341a56c7ce22f8cde655..bf0585e86d7401c1a71c374cd0c32fb679576759 100644 --- a/source/os/src/osTime.c +++ b/source/os/src/osTime.c @@ -57,6 +57,7 @@ struct tm *localtime_r(const time_t *timep, struct tm *result) { */ #include +// #include "monotonic.h" FORCE_INLINE int32_t taosGetTimeOfDay(struct timeval *tv) { return gettimeofday(tv, NULL); diff --git a/source/os/src/osTimer.c b/source/os/src/osTimer.c index d0114044f3d25e2682eb8b162774ee095af2a1f4..2f18d76db9bef6157636033e18c10d79a1e4610c 100644 --- a/source/os/src/osTimer.c +++ b/source/os/src/osTimer.c @@ -222,4 +222,20 @@ void taosUninitTimer() { pthread_join(timerThread, NULL); } +int64_t taosGetMonotonicMs() { +#if 0 + return getMonotonicUs() / 1000; +#else + return taosGetTimestampMs(); +#endif +} + +const char *taosMonotonicInit() { +#if 0 + return monotonicInit(); +#else + return NULL; +#endif +} + #endif diff --git a/source/server/dnode/src/dnodeMain.c b/source/server/dnode/src/dnodeMain.c index 668b844f6cdad8f590ade78f93f3a049f60aec38..8836b633e47d2630961efbb9ca3ae90807085b31 100644 --- a/source/server/dnode/src/dnodeMain.c +++ b/source/server/dnode/src/dnodeMain.c @@ -27,14 +27,6 @@ #include "dnodeMain.h" #include "mnode.h" -static int32_t dnodeCreateDir(const char *dir) { - if (!taosMkDir(dir, 0755) && errno != EEXIST) { - return -1; - } - - return 0; -} - static void dnodeCheckDataDirOpenned(char *dir) { #if 0 char filepath[256] = {0}; @@ -87,7 +79,7 @@ int32_t dnodeInitMain(Dnode *dnode, DnMain **out) { taosSetCoreDump(tsEnableCoreFile); #endif - if (dnodeCreateDir(tsLogDir) < 0) { + if (!taosMkDir(tsLogDir)) { printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno)); return -1; } @@ -134,7 +126,7 @@ int32_t dnodeInitStorage(Dnode *dnode, void **m) { #endif // storage module init - if (tsDiskCfgNum == 1 && dnodeCreateDir(tsDataDir) < 0) { + if (tsDiskCfgNum == 1 && !taosMkDir(tsDataDir)) { dError("failed to create dir:%s since %s", tsDataDir, strerror(errno)); return -1; } @@ -151,12 +143,12 @@ int32_t dnodeInitStorage(Dnode *dnode, void **m) { sprintf(tsVnodeDir, "%s/vnode", tsDataDir); sprintf(tsDnodeDir, "%s/dnode", tsDataDir); - if (dnodeCreateDir(tsMnodeDir) < 0) { + if (!taosMkDir(tsMnodeDir)) { dError("failed to create dir:%s since %s", tsMnodeDir, strerror(errno)); return -1; } - if (dnodeCreateDir(tsDnodeDir) < 0) { + if (!taosMkDir(tsDnodeDir)) { dError("failed to create dir:%s since %s", tsDnodeDir, strerror(errno)); return -1; } diff --git a/source/util/src/tconfig.c b/source/util/src/tconfig.c index e7c0758b6193709826fc7a3e761a8663de828caa..b39e619ba2305d0d2a678b081f62f1e155fa6377 100644 --- a/source/util/src/tconfig.c +++ b/source/util/src/tconfig.c @@ -155,7 +155,7 @@ static bool taosReadDirectoryConfig(SGlobalCfg *cfg, char *input_value) { taosExpandDir(input_value, option, cfg->ptrLength); taosRealPath(option, cfg->ptrLength); - if (!taosMkDir(option, 0755)) { + if (!taosMkDir(option)) { uError("config option:%s, input value:%s, directory not exist, create fail:%s", cfg->option, input_value, strerror(errno)); return false; diff --git a/source/util/src/tfile.c b/source/util/src/tfile.c index 455c885e753e35724d27ec223f86ebf04751286f..f4e1e5c11508b4ffc39f5c3155b46d2cbd974029 100644 --- a/source/util/src/tfile.c +++ b/source/util/src/tfile.c @@ -53,13 +53,18 @@ static int64_t tfOpenImp(int32_t fd) { return rid; } -int64_t tfOpen(const char *pathname, int32_t flags) { - int32_t fd = open(pathname, flags | O_BINARY); +int64_t tfOpenReadWrite(const char *pathname, int32_t flags) { + int32_t fd = taosOpenFileReadWrite(pathname); return tfOpenImp(fd); } -int64_t tfOpenM(const char *pathname, int32_t flags, mode_t mode) { - int32_t fd = open(pathname, flags | O_BINARY, mode); +int64_t tfOpenCreateWrite(const char *pathname, int32_t flags, mode_t mode) { + int32_t fd = taosOpenFileCreateWrite(pathname); + return tfOpenImp(fd); +} + +int64_t tfOpenCreateWriteAppend(const char *pathname, int32_t flags, mode_t mode) { + int32_t fd = taosOpenFileCreateWriteAppend(pathname); return tfOpenImp(fd); } @@ -73,7 +78,7 @@ int64_t tfWrite(int64_t tfd, void *buf, int64_t count) { int32_t fd = (int32_t)(uintptr_t)p; - int64_t ret = taosWrite(fd, buf, count); + int64_t ret = taosWriteFile(fd, buf, count); if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno); taosReleaseRef(tsFileRsetId, tfd); @@ -86,7 +91,7 @@ int64_t tfRead(int64_t tfd, void *buf, int64_t count) { int32_t fd = (int32_t)(uintptr_t)p; - int64_t ret = taosRead(fd, buf, count); + int64_t ret = taosReadFile(fd, buf, count); if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno); taosReleaseRef(tsFileRsetId, tfd); @@ -98,7 +103,7 @@ int32_t tfFsync(int64_t tfd) { if (p == NULL) return -1; int32_t fd = (int32_t)(uintptr_t)p; - int32_t code = taosFsync(fd); + int32_t code = taosFsyncFile(fd); taosReleaseRef(tsFileRsetId, tfd); return code; @@ -117,7 +122,7 @@ int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence) { if (p == NULL) return -1; int32_t fd = (int32_t)(uintptr_t)p; - int64_t ret = taosLSeek(fd, offset, whence); + int64_t ret = taosLSeekFile(fd, offset, whence); taosReleaseRef(tsFileRsetId, tfd); return ret; @@ -128,7 +133,7 @@ int32_t tfFtruncate(int64_t tfd, int64_t length) { if (p == NULL) return -1; int32_t fd = (int32_t)(uintptr_t)p; - int32_t code = taosFtruncate(fd, length); + int32_t code = taosFtruncateFile(fd, length); taosReleaseRef(tsFileRsetId, tfd); return code; diff --git a/source/util/src/thashutil.c b/source/util/src/thashutil.c index 4a0208a3d0bf22f21b5f6a05513f435664e746af..7f1262e265d2d088ef60e61b52c478151fda1c73 100644 --- a/source/util/src/thashutil.c +++ b/source/util/src/thashutil.c @@ -15,8 +15,9 @@ #include "os.h" #include "hashfunc.h" -#include "tutil.h" #include "tcompare.h" +#include "tdef.h" +#include "tutil.h" #define ROTL32(x, r) ((x) << (r) | (x) >> (32u - (r))) diff --git a/source/util/src/tlog.c b/source/util/src/tlog.c index 63dd1ef606f2839636c19476280330e3a90ff2ff..0f81f596042c5b504f57e6e9687040ee8106ccec 100644 --- a/source/util/src/tlog.c +++ b/source/util/src/tlog.c @@ -201,7 +201,7 @@ static void *taosThreadToOpenNewFile(void *param) { taosUmaskFile(0); - int32_t fd = taosOpenFileTruncCreateWrite(name); + int32_t fd = taosOpenFileCreateWriteTrunc(name); if (fd < 0) { tsLogObj.openInProgress = 0; tsLogObj.lines = tsLogObj.maxLines - 1000; @@ -731,7 +731,7 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) { goto cmp_end; } - int32_t fd = taosOpenFileTruncCreateWrite(destFileName); + int32_t fd = taosOpenFileCreateWriteTrunc(destFileName); if (fd < 0) { ret = -2; goto cmp_end; diff --git a/source/util/src/tlosertree.c b/source/util/src/tlosertree.c index 0f104c4b63a36880a79ad564a0f837f9b09e7819..7be2ca74613c44fcc4155f48b908cd45e6a37e3d 100644 --- a/source/util/src/tlosertree.c +++ b/source/util/src/tlosertree.c @@ -15,7 +15,7 @@ #include "os.h" #include "tlosertree.h" -#include "taosmsg.h" +// #include "taosmsg.h" #include "tulog.h" // set initial value for loser tree @@ -46,7 +46,7 @@ uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* pa *pTree = (SLoserTreeInfo*)calloc(1, sizeof(SLoserTreeInfo) + sizeof(SLoserTreeNode) * totalEntries); if ((*pTree) == NULL) { uError("allocate memory for loser-tree failed. reason:%s", strerror(errno)); - return TSDB_CODE_QRY_OUT_OF_MEMORY; + return -1; } (*pTree)->pNode = (SLoserTreeNode*)(((char*)(*pTree)) + sizeof(SLoserTreeInfo)); @@ -74,7 +74,7 @@ uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* pa printf("initialize local reducer completed!\n"); #endif - return TSDB_CODE_SUCCESS; + return 0; } void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) { diff --git a/source/util/src/tmd5.c b/source/util/src/tmd5.c index a1fdcc6a0539ad8f7bd804ab039e0e639d20ad3e..9cc4b3b9d5fc91a4795b4c473c9fe326f9c09358 100644 --- a/source/util/src/tmd5.c +++ b/source/util/src/tmd5.c @@ -35,7 +35,6 @@ #include "os.h" #include "tmd5.h" -#include "taosdef.h" /* forward declaration */ static void Transform(uint32_t *buf, uint32_t *in); diff --git a/source/util/src/tnote.c b/source/util/src/tnote.c index b691abc5b9f6f828edcc46ec3a5989baa083f443..e4a326ba8f903906c210eef6bcfeaa2e5faf8e63 100644 --- a/source/util/src/tnote.c +++ b/source/util/src/tnote.c @@ -16,7 +16,7 @@ #define _DEFAULT_SOURCE #include "os.h" #include "tutil.h" -#include "tglobal.h" +#include "tdef.h" #include "tnote.h" SNoteObj tsHttpNote; @@ -43,6 +43,7 @@ static void taosInitNote(int32_t numOfLines, int32_t maxNotes, SNoteObj *pNote, void taosInitNotes() { char name[TSDB_FILENAME_LEN * 2] = {0}; +#if 0 if (tsTscEnableRecordSql) { snprintf(name, TSDB_FILENAME_LEN * 2, "%s/tscsql-%d", tsLogDir, taosGetPId()); taosInitNote(tsNumOfLogLines, 1, &tsTscNote, name); @@ -57,13 +58,14 @@ void taosInitNotes() { snprintf(name, TSDB_FILENAME_LEN * 2, "%s/taosinfo", tsLogDir); taosInitNote(tsNumOfLogLines, 1, &tsInfoNote, name); } +#endif } static bool taosLockNote(int32_t fd, SNoteObj *pNote) { if (fd < 0) return false; if (pNote->fileNum > 1) { - int32_t ret = (int32_t)(flock(fd, LOCK_EX | LOCK_NB)); + int32_t ret = (int32_t)taosLockFile(fd); if (ret == 0) { return true; } @@ -76,7 +78,7 @@ static void taosUnLockNote(int32_t fd, SNoteObj *pNote) { if (fd < 0) return; if (pNote->fileNum > 1) { - flock(fd, LOCK_UN | LOCK_NB); + taosUnLockFile(fd); } } @@ -90,9 +92,9 @@ static void *taosThreadToOpenNewNote(void *param) { pNote->lines = 0; sprintf(name, "%s.%d", pNote->name, pNote->flag); - umask(0); + taosUmaskFile(0); - int32_t fd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO); + int32_t fd = taosOpenFileCreateWriteTrunc(name); if (fd < 0) { return NULL; } @@ -132,7 +134,7 @@ static int32_t taosOpenNewNote(SNoteObj *pNote) { } static bool taosCheckNoteIsOpen(char *noteName, SNoteObj *pNote) { - int32_t fd = open(noteName, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); + int32_t fd = taosOpenFileCreateWrite(noteName); if (fd < 0) { fprintf(stderr, "failed to open note:%s reason:%s\n", noteName, strerror(errno)); return true; @@ -174,7 +176,7 @@ static void taosGetNoteName(char *fn, SNoteObj *pNote) { static int32_t taosOpenNoteWithMaxLines(char *fn, int32_t maxLines, int32_t maxNoteNum, SNoteObj *pNote) { char name[NOTE_FILE_NAME_LEN * 2] = {0}; int32_t size; - struct stat logstat0, logstat1; + int32_t logstat0_mtime, logstat1_mtime; pNote->maxLines = maxLines; pNote->fileNum = maxNoteNum; @@ -184,13 +186,13 @@ static int32_t taosOpenNoteWithMaxLines(char *fn, int32_t maxLines, int32_t maxN strcpy(name, fn); strcat(name, ".0"); } - bool log0Exist = stat(name, &logstat0) >= 0; + bool log0Exist = taosStatFile(name, NULL, &logstat0_mtime) >= 0; if (strlen(fn) < NOTE_FILE_NAME_LEN + 50 - 2) { strcpy(name, fn); strcat(name, ".1"); } - bool log1Exist = stat(name, &logstat1) >= 0; + bool log1Exist = taosStatFile(name, NULL, &logstat1_mtime) >= 0; if (!log0Exist && !log1Exist) { pNote->flag = 0; @@ -199,15 +201,15 @@ static int32_t taosOpenNoteWithMaxLines(char *fn, int32_t maxLines, int32_t maxN } else if (!log0Exist) { pNote->flag = 1; } else { - pNote->flag = (logstat0.st_mtime > logstat1.st_mtime) ? 0 : 1; + pNote->flag = (logstat0_mtime > logstat1_mtime) ? 0 : 1; } char noteName[NOTE_FILE_NAME_LEN * 2] = {0}; sprintf(noteName, "%s.%d", pNote->name, pNote->flag); pthread_mutex_init(&pNote->mutex, NULL); - umask(0); - pNote->fd = open(noteName, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); + taosUmaskFile(0); + pNote->fd = taosOpenFileCreateWrite(noteName); if (pNote->fd < 0) { fprintf(stderr, "failed to open note file:%s reason:%s\n", noteName, strerror(errno)); @@ -216,12 +218,12 @@ static int32_t taosOpenNoteWithMaxLines(char *fn, int32_t maxLines, int32_t maxN taosLockNote(pNote->fd, pNote); // only an estimate for number of lines - struct stat filestat; - if (fstat(pNote->fd, &filestat) < 0) { + int64_t filestat_size; + if (taosFStatFile(pNote->fd, &filestat_size, NULL) < 0) { fprintf(stderr, "failed to fstat note file:%s reason:%s\n", noteName, strerror(errno)); return -1; } - size = (int32_t)filestat.st_size; + size = (int32_t)filestat_size; pNote->lines = size / 60; lseek(pNote->fd, 0, SEEK_END); @@ -231,7 +233,7 @@ static int32_t taosOpenNoteWithMaxLines(char *fn, int32_t maxLines, int32_t maxN void taosNotePrintBuffer(SNoteObj *pNote, char *buffer, int32_t len) { if (pNote->fd <= 0) return; - taosWrite(pNote->fd, buffer, len); + taosWriteFile(pNote->fd, buffer, len); if (pNote->maxLines > 0) { pNote->lines++; @@ -247,7 +249,7 @@ void taosNotePrint(SNoteObj *pNote, const char *const format, ...) { struct timeval timeSecs; time_t curTime; - gettimeofday(&timeSecs, NULL); + taosGetTimeOfDay(&timeSecs); curTime = timeSecs.tv_sec; ptm = localtime_r(&curTime, &Tm); len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %08" PRId64 " ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour, diff --git a/source/util/src/tsched.c b/source/util/src/tsched.c index b86ebb38bcd6446b56357f9667636403e14d688c..915eaa8d4ffc75addb98035eeb27046056f194bc 100644 --- a/source/util/src/tsched.c +++ b/source/util/src/tsched.c @@ -14,7 +14,7 @@ */ #include "os.h" -#include "taosdef.h" +#include "tdef.h" #include "tutil.h" #include "tulog.h" #include "tsched.h" diff --git a/source/util/src/tskiplist.c b/source/util/src/tskiplist.c index 98fd9c094cba3e779c9f203fdacc548a3bda5ef4..4905b2723ed46fe51eb4f0c30b425e822b6c155e 100644 --- a/source/util/src/tskiplist.c +++ b/source/util/src/tskiplist.c @@ -16,7 +16,6 @@ #include "tskiplist.h" #include "os.h" #include "tcompare.h" -#include "tdataformat.h" #include "tulog.h" #include "tutil.h" diff --git a/source/util/src/tsocket.c b/source/util/src/tsocket.c deleted file mode 100644 index 8d69a87e77bff594e7a99b2a63d4d849214eebe9..0000000000000000000000000000000000000000 --- a/source/util/src/tsocket.c +++ /dev/null @@ -1,539 +0,0 @@ -/* - * Copyright (c) 2019 TAOS Data, Inc. - * - * This program is free software: you can use, redistribute, and/or modify - * it under the terms of the GNU Affero General Public License, version 3 - * or later ("AGPL"), as published by the Free Software Foundation. - * - * This program is distributed in the hope that it will be useful, but WITHOUT - * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or - * FITNESS FOR A PARTICULAR PURPOSE. - * - * You should have received a copy of the GNU Affero General Public License - * along with this program. If not, see . - */ - -#include "os.h" -#include "tulog.h" -#include "tsocket.h" -#include "taoserror.h" - -#ifndef SIGPIPE - #define SIGPIPE EPIPE -#endif - -#define TCP_CONN_TIMEOUT 3000 // conn timeout - -int32_t taosGetFqdn(char *fqdn) { - char hostname[1024]; - hostname[1023] = '\0'; - if (gethostname(hostname, 1023) == -1) { - uError("failed to get hostname, reason:%s", strerror(errno)); - return -1; - } - - struct addrinfo hints = {0}; - struct addrinfo *result = NULL; -#ifdef __APPLE__ - // on macosx, hostname -f has the form of xxx.local - // which will block getaddrinfo for a few seconds if AI_CANONNAME is set - // thus, we choose AF_INET (ipv4 for the moment) to make getaddrinfo return - // immediately - hints.ai_family = AF_INET; -#else // __APPLE__ - hints.ai_flags = AI_CANONNAME; -#endif // __APPLE__ - int32_t ret = getaddrinfo(hostname, NULL, &hints, &result); - if (!result) { - uError("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret)); - return -1; - } - -#ifdef __APPLE__ - // refer to comments above - strcpy(fqdn, hostname); -#else // __APPLE__ - strcpy(fqdn, result->ai_canonname); -#endif // __APPLE__ - freeaddrinfo(result); - return 0; -} - -uint32_t taosGetIpv4FromFqdn(const char *fqdn) { - struct addrinfo hints = {0}; - hints.ai_family = AF_INET; - hints.ai_socktype = SOCK_STREAM; - - struct addrinfo *result = NULL; - - int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result); - if (result) { - struct sockaddr * sa = result->ai_addr; - struct sockaddr_in *si = (struct sockaddr_in *)sa; - struct in_addr ia = si->sin_addr; - uint32_t ip = ia.s_addr; - freeaddrinfo(result); - return ip; - } else { -#ifdef EAI_SYSTEM - if (ret == EAI_SYSTEM) { - uError("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno)); - terrno = TAOS_SYSTEM_ERROR(errno); - } else { - uError("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret)); - } -#else - uError("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret)); -#endif - return 0xFFFFFFFF; - } -} - -// Function converting an IP address string to an uint32_t. -uint32_t ip2uint(const char *const ip_addr) { - char ip_addr_cpy[20]; - char ip[5]; - - tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy)); - - char *s_start, *s_end; - s_start = ip_addr_cpy; - s_end = ip_addr_cpy; - - int32_t k; - - for (k = 0; *s_start != '\0'; s_start = s_end) { - for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) { - } - if (*s_end == '.') { - *s_end = '\0'; - s_end++; - } - ip[k++] = (char)atoi(s_start); - } - - ip[k] = '\0'; - - return *((uint32_t *)ip); -} - -int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) { - int32_t nleft, nwritten; - char * ptr = (char *)buf; - - nleft = nbytes; - - while (nleft > 0) { - nwritten = (int32_t)taosWriteSocket(fd, (char *)ptr, (size_t)nleft); - if (nwritten <= 0) { - if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK */) - continue; - else - return -1; - } else { - nleft -= nwritten; - ptr += nwritten; - } - - if (errno == SIGPIPE || errno == EPIPE) { - return -1; - } - } - - return (nbytes - nleft); -} - -int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) { - int32_t nleft, nread; - char * ptr = (char *)buf; - - nleft = nbytes; - - if (fd < 0) return -1; - - while (nleft > 0) { - nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft); - if (nread == 0) { - break; - } else if (nread < 0) { - if (errno == EINTR/* || errno == EAGAIN || errno == EWOULDBLOCK*/) { - continue; - } else { - return -1; - } - } else { - nleft -= nread; - ptr += nread; - } - - if (errno == SIGPIPE || errno == EPIPE) { - return -1; - } - } - - return (nbytes - nleft); -} - -int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes) { - taosSetNonblocking(fd, 1); - - int32_t nleft, nwritten, nready; - fd_set fset; - struct timeval tv; - - nleft = nbytes; - while (nleft > 0) { - tv.tv_sec = 30; - tv.tv_usec = 0; - FD_ZERO(&fset); - FD_SET(fd, &fset); - if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) { - errno = ETIMEDOUT; - uError("fd %d timeout, no enough space to write", fd); - break; - - } else if (nready < 0) { - if (errno == EINTR) continue; - - uError("select error, %d (%s)", errno, strerror(errno)); - return -1; - } - - nwritten = (int32_t)taosSend(fd, ptr, (size_t)nleft, MSG_NOSIGNAL); - if (nwritten <= 0) { - if (errno == EAGAIN || errno == EINTR) continue; - - uError("write error, %d (%s)", errno, strerror(errno)); - return -1; - } - - nleft -= nwritten; - ptr += nwritten; - } - - taosSetNonblocking(fd, 0); - - return (nbytes - nleft); -} - -int32_t taosReadn(SOCKET fd, char *ptr, int32_t nbytes) { - int32_t nread, nready, nleft = nbytes; - - fd_set fset; - struct timeval tv; - - while (nleft > 0) { - tv.tv_sec = 30; - tv.tv_usec = 0; - FD_ZERO(&fset); - FD_SET(fd, &fset); - if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) { - errno = ETIMEDOUT; - uError("fd %d timeout\n", fd); - break; - } else if (nready < 0) { - if (errno == EINTR) continue; - uError("select error, %d (%s)", errno, strerror(errno)); - return -1; - } - - if ((nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft)) < 0) { - if (errno == EINTR) continue; - uError("read error, %d (%s)", errno, strerror(errno)); - return -1; - - } else if (nread == 0) { - uError("fd %d EOF", fd); - break; // EOF - } - - nleft -= nread; - ptr += nread; - } - - return (nbytes - nleft); -} - -SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) { - struct sockaddr_in localAddr; - SOCKET sockFd; - int32_t bufSize = 1024000; - - uDebug("open udp socket:0x%x:%hu", ip, port); - - memset((char *)&localAddr, 0, sizeof(localAddr)); - localAddr.sin_family = AF_INET; - localAddr.sin_addr.s_addr = ip; - localAddr.sin_port = (uint16_t)htons(port); - - if ((sockFd = socket(AF_INET, SOCK_DGRAM, 0)) <= 2) { - uError("failed to open udp socket: %d (%s)", errno, strerror(errno)); - taosCloseSocketNoCheck(sockFd); - return -1; - } - - if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { - uError("failed to set the send buffer size for UDP socket\n"); - taosCloseSocket(sockFd); - return -1; - } - - if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { - uError("failed to set the receive buffer size for UDP socket\n"); - taosCloseSocket(sockFd); - return -1; - } - - /* bind socket to local address */ - if (bind(sockFd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) { - uError("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port); - taosCloseSocket(sockFd); - return -1; - } - - return sockFd; -} - -SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) { - SOCKET sockFd = 0; - int32_t ret; - struct sockaddr_in serverAddr, clientAddr; - int32_t bufSize = 1024 * 1024; - - sockFd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP); - - if (sockFd <= 2) { - uError("failed to open the socket: %d (%s)", errno, strerror(errno)); - taosCloseSocketNoCheck(sockFd); - return -1; - } - - /* set REUSEADDR option, so the portnumber can be re-used */ - int32_t reuse = 1; - if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { - uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno)); - taosCloseSocket(sockFd); - return -1; - } - - if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { - uError("failed to set the send buffer size for TCP socket\n"); - taosCloseSocket(sockFd); - return -1; - } - - if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) { - uError("failed to set the receive buffer size for TCP socket\n"); - taosCloseSocket(sockFd); - return -1; - } - - if (clientIp != 0) { - memset((char *)&clientAddr, 0, sizeof(clientAddr)); - clientAddr.sin_family = AF_INET; - clientAddr.sin_addr.s_addr = clientIp; - clientAddr.sin_port = 0; - - /* bind socket to client address */ - if (bind(sockFd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) { - uError("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", clientIp, destIp, destPort, - strerror(errno)); - taosCloseSocket(sockFd); - return -1; - } - } - - memset((char *)&serverAddr, 0, sizeof(serverAddr)); - serverAddr.sin_family = AF_INET; - serverAddr.sin_addr.s_addr = destIp; - serverAddr.sin_port = (uint16_t)htons((uint16_t)destPort); - -#ifdef _TD_LINUX - taosSetNonblocking(sockFd, 1); - ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)); - if (ret == -1) { - if (errno == EHOSTUNREACH) { - uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno)); - taosCloseSocket(sockFd); - return -1; - } else if (errno == EINPROGRESS || errno == EAGAIN || errno == EWOULDBLOCK) { - struct pollfd wfd[1]; - - wfd[0].fd = sockFd; - wfd[0].events = POLLOUT; - - int res = poll(wfd, 1, TCP_CONN_TIMEOUT); - if (res == -1 || res == 0) { - uError("failed to connect socket, ip:0x%x, port:%hu(poll error/conn timeout)", destIp, destPort); - taosCloseSocket(sockFd); // - return -1; - } - int optVal = -1, optLen = sizeof(int); - if ((0 != taosGetSockOpt(sockFd, SOL_SOCKET, SO_ERROR, &optVal, &optLen)) || (optVal != 0)) { - uError("failed to connect socket, ip:0x%x, port:%hu(connect host error)", destIp, destPort); - taosCloseSocket(sockFd); // - return -1; - } - ret = 0; - } else { // Other error - uError("failed to connect socket, ip:0x%x, port:%hu(target host cannot be reached)", destIp, destPort); - taosCloseSocket(sockFd); // - return -1; - } - } - taosSetNonblocking(sockFd, 0); - -#else - ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr)); -#endif - - if (ret != 0) { - uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno)); - taosCloseSocket(sockFd); - sockFd = -1; - } else { - taosKeepTcpAlive(sockFd); - } - - return sockFd; -} - -int32_t taosKeepTcpAlive(SOCKET sockFd) { - int32_t alive = 1; - if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) { - uError("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno)); - taosCloseSocket(sockFd); - return -1; - } - -#ifndef __APPLE__ - // all fails on macosx - int32_t probes = 3; - if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) { - uError("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno)); - taosCloseSocket(sockFd); - return -1; - } - - int32_t alivetime = 10; - if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) { - uError("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno)); - taosCloseSocket(sockFd); - return -1; - } - - int32_t interval = 3; - if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) { - uError("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno)); - taosCloseSocket(sockFd); - return -1; - } -#endif // __APPLE__ - - int32_t nodelay = 1; - if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) { - uError("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno)); - taosCloseSocket(sockFd); - return -1; - } - - struct linger linger = {0}; - linger.l_onoff = 1; - linger.l_linger = 3; - if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) { - uError("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno)); - taosCloseSocket(sockFd); - return -1; - } - - return 0; -} - -SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) { - struct sockaddr_in serverAdd; - SOCKET sockFd; - int32_t reuse; - - uDebug("open tcp server socket:0x%x:%hu", ip, port); - - bzero((char *)&serverAdd, sizeof(serverAdd)); - serverAdd.sin_family = AF_INET; - serverAdd.sin_addr.s_addr = ip; - serverAdd.sin_port = (uint16_t)htons(port); - - if ((sockFd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) { - uError("failed to open TCP socket: %d (%s)", errno, strerror(errno)); - taosCloseSocketNoCheck(sockFd); - return -1; - } - - /* set REUSEADDR option, so the portnumber can be re-used */ - reuse = 1; - if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) { - uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno)); - taosCloseSocket(sockFd); - return -1; - } - - /* bind socket to server address */ - if (bind(sockFd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) { - uError("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno)); - taosCloseSocket(sockFd); - return -1; - } - - if (taosKeepTcpAlive(sockFd) < 0) { - uError("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno)); - taosCloseSocket(sockFd); - return -1; - } - - if (listen(sockFd, 1024) < 0) { - uError("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno)); - taosCloseSocket(sockFd); - return -1; - } - - return sockFd; -} - -void tinet_ntoa(char *ipstr, uint32_t ip) { - sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24); -} - -#define COPY_SIZE 32768 -// sendfile shall be used - -int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) { - int64_t leftLen; - int64_t readLen, writeLen; - char temp[COPY_SIZE]; - - leftLen = len; - - while (leftLen > 0) { - if (leftLen < COPY_SIZE) - readLen = leftLen; - else - readLen = COPY_SIZE; // 4K - - int64_t retLen = taosReadMsg(sfd, temp, (int32_t)readLen); - if (readLen != retLen) { - uError("read error, readLen:%" PRId64 " retLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s", - readLen, retLen, len, leftLen, strerror(errno)); - return -1; - } - - writeLen = taosWriteMsg(dfd, temp, (int32_t)readLen); - - if (readLen != writeLen) { - uError("copy error, readLen:%" PRId64 " writeLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s", - readLen, writeLen, len, leftLen, strerror(errno)); - return -1; - } - - leftLen -= readLen; - } - - return len; -} diff --git a/source/util/src/tthread.c b/source/util/src/tthread.c index 043b2de2f241297d209041294428dde2c55e974e..2ffefa25e6a13b8ffd7c61005d0a6056545da243 100644 --- a/source/util/src/tthread.c +++ b/source/util/src/tthread.c @@ -15,8 +15,7 @@ #include "os.h" #include "tthread.h" -#include "tglobal.h" -#include "taosdef.h" +#include "tdef.h" #include "tutil.h" #include "tulog.h" #include "taoserror.h" diff --git a/source/util/src/ttimer.c b/source/util/src/ttimer.c index 865e1159c1995b2796682d64ee06de02442b7a25..d0a1fffa9e4dd4f28f9a4169241a2e2d44874373 100644 --- a/source/util/src/ttimer.c +++ b/source/util/src/ttimer.c @@ -18,7 +18,6 @@ #include "tsched.h" #include "ttimer.h" #include "tutil.h" -#include "monotonic.h" extern int8_t tscEmbedded; @@ -187,10 +186,6 @@ static void removeTimer(uintptr_t id) { unlockTimerList(list); } -static int64_t getMonotonicMs(void) { - return (int64_t) getMonotonicUs() / 1000; -} - static void addToWheel(tmr_obj_t* timer, uint32_t delay) { timerAddRef(timer); // select a wheel for the timer, we are not an accurate timer, @@ -206,7 +201,7 @@ static void addToWheel(tmr_obj_t* timer, uint32_t delay) { time_wheel_t* wheel = wheels + timer->wheel; timer->prev = NULL; - timer->expireAt = getMonotonicMs() + delay; + timer->expireAt = taosGetMonotonicMs() + delay; pthread_mutex_lock(&wheel->mutex); @@ -339,7 +334,7 @@ tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle } static void taosTimerLoopFunc(int signo) { - int64_t now = getMonotonicMs(); + int64_t now = taosGetMonotonicMs(); for (int i = 0; i < tListLen(wheels); i++) { // `expried` is a temporary expire list. @@ -506,7 +501,7 @@ static void taosTmrModuleInit(void) { pthread_mutex_init(&tmrCtrlMutex, NULL); - int64_t now = getMonotonicMs(); + int64_t now = taosGetMonotonicMs(); for (int i = 0; i < tListLen(wheels); i++) { time_wheel_t* wheel = wheels + i; if (pthread_mutex_init(&wheel->mutex, NULL) != 0) { @@ -537,9 +532,6 @@ static void taosTmrModuleInit(void) { } void* taosTmrInit(int maxNumOfTmrs, int resolution, int longest, const char* label) { - const char* ret = monotonicInit(); - tmrDebug("ttimer monotonic clock source:%s", ret); - pthread_once(&tmrModuleInit, taosTmrModuleInit); pthread_mutex_lock(&tmrCtrlMutex); diff --git a/source/util/src/tutil.c b/source/util/src/tutil.c index 5f8c92898fc5f0abc4c733c0558befd68ac3cac7..9c79a7cca2fc479649582fa682dea62bbe2171a6 100644 --- a/source/util/src/tutil.c +++ b/source/util/src/tutil.c @@ -15,8 +15,7 @@ #include "os.h" #include "tcrc32c.h" -#include "tglobal.h" -#include "taosdef.h" +#include "tdef.h" #include "tutil.h" #include "tulog.h" #include "taoserror.h" diff --git a/src/client/src/tscSub.c b/src/client/src/tscSub.c index 2d2ce3829969d07d58d972b02cbc8717273da60d..8f9b6a7252f063e7684d4e4806f158f1c17e1a36 100644 --- a/src/client/src/tscSub.c +++ b/src/client/src/tscSub.c @@ -371,7 +371,7 @@ void tscSaveSubscriptionProgress(void* sub) { char path[256]; sprintf(path, "%s/subscribe", tsDataDir); - if (!taosMkDir(path, 0777)) { + if (!taosMkDir(path)) { tscError("failed to create subscribe dir: %s", path); } diff --git a/include/util/tnettest.h b/src/kit/shell/inc/tnettest.h similarity index 100% rename from include/util/tnettest.h rename to src/kit/shell/inc/tnettest.h diff --git a/source/util/src/tnettest.c b/src/kit/shell/src/tnettest.c similarity index 100% rename from source/util/src/tnettest.c rename to src/kit/shell/src/tnettest.c diff --git a/src/tfs/src/tfs.c b/src/tfs/src/tfs.c index ac7dbc7fd21740837c2caacce230ae1580e46577..b8e8972d934ae568cc5bd1f09489094b616c901b 100644 --- a/src/tfs/src/tfs.c +++ b/src/tfs/src/tfs.c @@ -252,7 +252,7 @@ int tfsMkdirAt(const char *rname, int level, int id) { char aname[TMPNAME_LEN]; snprintf(aname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), rname); - if (!taosMkDir(aname, 0755)) { + if (!taosMkDir(aname)) { terrno = TAOS_SYSTEM_ERROR(errno); return -1; } diff --git a/src/wal/src/walMgmt.c b/src/wal/src/walMgmt.c index 946d1aea26c7d52ff917e573a83340fb59abd468..dbff08d73039863313bc6b5cf9ec15faa5f5e432 100644 --- a/src/wal/src/walMgmt.c +++ b/src/wal/src/walMgmt.c @@ -139,7 +139,7 @@ void walClose(void *handle) { } static int32_t walInitObj(SWal *pWal) { - if (!taosMkDir(pWal->path, 0755)) { + if (!taosMkDir(pWal->path)) { wError("vgId:%d, path:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); } diff --git a/src/wal/src/walWrite.c b/src/wal/src/walWrite.c index e991bf02aa68c92d7cf4dfdb09982ebaa6541bdc..cae4291eb8011d72fd1010c9024fbec270571c62 100644 --- a/src/wal/src/walWrite.c +++ b/src/wal/src/walWrite.c @@ -51,7 +51,7 @@ int32_t walRenew(void *handle) { } snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); - pWal->tfd = tfOpenM(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO); + pWal->tfd = tfOpenCreateWrite(pWal->name); if (!tfValid(pWal->tfd)) { code = TAOS_SYSTEM_ERROR(errno); @@ -220,7 +220,7 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) { // open the existing WAL file in append mode pWal->fileId = 0; snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId); - pWal->tfd = tfOpenM(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO); + pWal->tfd = tfOpenCreateWriteAppend(pWal->name); if (!tfValid(pWal->tfd)) { wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno)); return TAOS_SYSTEM_ERROR(errno); @@ -425,7 +425,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch return TAOS_SYSTEM_ERROR(errno); } - int64_t tfd = tfOpen(name, O_RDWR); + int64_t tfd = tfOpenReadWrite(name); if (!tfValid(tfd)) { wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno)); tfree(buffer);