提交 140ec7c8 编写于 作者: S Shengliang Guan

[TD-10430] adjust util and common module

上级 c0a171b1
......@@ -22,7 +22,7 @@ extern "C" {
void taosRemoveDir(char *dirname);
bool taosDirExist(char *dirname);
bool taosMkDir(char *dirname, mode_t mode);
bool taosMkDir(char *dirname);
void taosRemoveOldFiles(char *dirname, int32_t keepDays);
bool taosExpandDir(char *dirname, char *outname, int32_t maxlen);
bool taosRealPath(char *dirname, int32_t maxlen);
......
......@@ -46,8 +46,10 @@ int32_t taosFStatFile(FileFd fd, int64_t *size, int32_t *mtime);
FileFd taosOpenFileWrite(const char *path);
FileFd taosOpenFileCreateWrite(const char *path);
FileFd taosOpenFileTruncCreateWrite(const char *path);
FileFd taosOpenFileCreateWriteTrunc(const char *path);
FileFd taosOpenFileCreateWriteAppend(const char *path);
FileFd taosOpenFileRead(const char *path);
FileFd taosOpenFileReadWrite(const char *path);
int64_t taosLSeekFile(FileFd fd, int64_t offset, int32_t whence);
int32_t taosFtruncateFile(FileFd fd, int64_t length);
......
......@@ -85,6 +85,23 @@ const char *taosInetNtoa(struct in_addr ipInt);
#define htobe64 htonll
#endif
int32_t taosReadn(SOCKET sock, char *buffer, int32_t len);
int32_t taosWriteMsg(SOCKET fd, void *ptr, int32_t nbytes);
int32_t taosReadMsg(SOCKET fd, void *ptr, int32_t nbytes);
int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes);
int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len);
int32_t taosSetNonblocking(SOCKET sock, int32_t on);
SOCKET taosOpenUdpSocket(uint32_t localIp, uint16_t localPort);
SOCKET taosOpenTcpClientSocket(uint32_t ip, uint16_t port, uint32_t localIp);
SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port);
int32_t taosKeepTcpAlive(SOCKET sockFd);
int32_t taosGetFqdn(char *);
uint32_t taosGetIpv4FromFqdn(const char *);
void tinet_ntoa(char *ipstr, uint32_t ip);
uint32_t ip2uint(const char *const ip_addr);
#ifdef __cplusplus
}
#endif
......
......@@ -24,6 +24,8 @@ extern "C" {
int32_t taosInitTimer(void (*callback)(int32_t), int32_t ms);
void taosUninitTimer();
int64_t taosGetMonotonicMs();
const char *taosMonotonicInit();
#ifdef __cplusplus
}
......
......@@ -19,7 +19,7 @@
extern "C" {
#endif
#include "os.h "
#include "os.h"
#define ENCODE_LIMIT (((uint8_t)1) << 7)
#define ZIGZAGE(T, v) ((u##T)((v) >> (sizeof(T) * 8 - 1))) ^ (((u##T)(v)) << 1) // zigzag encode
......
......@@ -28,8 +28,10 @@ void tfCleanup();
// the same syntax as UNIX standard open/close/read/write
// but FD is int64_t and will never be reused
int64_t tfOpen(const char *pathname, int32_t flags);
int64_t tfOpenM(const char *pathname, int32_t flags, mode_t mode);
int64_t tfOpenReadWrite(const char *pathname);
int64_t tfOpenCreateWrite(const char *pathname);
int64_t tfOpenCreateWriteAppend(const char *pathname);
int64_t tfClose(int64_t tfd);
int64_t tfWrite(int64_t tfd, void *buf, int64_t count);
int64_t tfRead(int64_t tfd, void *buf, int64_t count);
......
......@@ -21,7 +21,7 @@ extern "C" {
#endif
#include "os.h"
#include "taosdef.h"
#include "tdef.h"
#include "tarray.h"
#include "tfunctional.h"
......
......@@ -21,7 +21,7 @@ extern "C" {
#endif
#include "os.h"
#include "taosdef.h"
#include "tdef.h"
// create new thread
pthread_t* taosCreateThread( void *(*__start_routine) (void *), void* param);
......
......@@ -23,7 +23,9 @@
#include "tarray.h"
#include "tbuffer.h"
#include "tcompare.h"
#include "tsdb.h"
#include "tname.h"
#include "hash.h"
// #include "tsdb.h"
#include "tskiplist.h"
#include "texpr.h"
#include "tarithoperator.h"
......
......@@ -17,6 +17,7 @@
#include "hash.h"
#include "taos.h"
#include "taosdef.h"
#include "ttime.h"
#include "ttoken.h"
#include "ttokendef.h"
#include "ttype.h"
......
......@@ -60,7 +60,7 @@ void taosRemoveDir(char *dirname) {
bool taosDirExist(char *dirname) { return access(dirname, F_OK) == 0; }
bool taosMkDir(char *dirname, mode_t mode) {
bool taosMkDir(char *dirname) {
int32_t code = mkdir(dirname, 0755);
if (code < 0 && errno == EEXIST) {
return true;
......
......@@ -16,6 +16,22 @@
#include "os.h"
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
#include <io.h>
#if defined(_MSDOS)
#define open _open
#endif
#if defined(_WIN32)
extern int openA(const char *, int, ...); /* MsvcLibX ANSI version of open */
extern int openU(const char *, int, ...); /* MsvcLibX UTF-8 version of open */
#if defined(_UTF8_SOURCE) || defined(_BSD_SOURCE) || defined(_GNU_SOURCE)
#define open openU
#else /* _ANSI_SOURCE */
#define open openA
#endif /* defined(_UTF8_SOURCE) */
#endif /* defined(_WIN32) */
#else
#include <fcntl.h>
#include <sys/file.h>
......@@ -478,26 +494,44 @@ int32_t taosOpenFileWrite(const char *path) {
#endif
}
FileFd taosOpenFileRead(const char *path) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
int32_t taosOpenFileCreateWrite(const char *path) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
return open(path, O_RDONLY);
return open(path, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
#endif
}
int32_t taosOpenFileCreateWrite(const char *path) {
int32_t taosOpenFileCreateWriteTrunc(const char *path) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
return open(path, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
return open(path, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
#endif
}
int32_t taosOpenFileTruncCreateWrite(const char *path) {
int32_t taosOpenFileCreateWriteAppend(const char *path) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
return open(path, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
return open(path, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
#endif
}
\ No newline at end of file
}
FileFd taosOpenFileRead(const char *path) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
return open(path, O_RDONLY, S_IRWXU | S_IRWXG | S_IRWXO);
#endif
}
FileFd taosOpenFileReadWrite(const char *path) {
#if defined(_TD_WINDOWS_64) || defined(_TD_WINDOWS_32)
return 0;
#else
return open(path, O_RDWR, S_IRWXU | S_IRWXG | S_IRWXO);
#endif
}
......@@ -224,4 +224,525 @@ uint64_t htonll(uint64_t val) { return (((uint64_t)htonl(val)) << 32) + htonl(va
#endif
#endif
\ No newline at end of file
#endif
#ifndef SIGPIPE
#define SIGPIPE EPIPE
#endif
#define TCP_CONN_TIMEOUT 3000 // conn timeout
int32_t taosGetFqdn(char *fqdn) {
char hostname[1024];
hostname[1023] = '\0';
if (gethostname(hostname, 1023) == -1) {
printf("failed to get hostname, reason:%s", strerror(errno));
return -1;
}
struct addrinfo hints = {0};
struct addrinfo *result = NULL;
#ifdef __APPLE__
// on macosx, hostname -f has the form of xxx.local
// which will block getaddrinfo for a few seconds if AI_CANONNAME is set
// thus, we choose AF_INET (ipv4 for the moment) to make getaddrinfo return
// immediately
hints.ai_family = AF_INET;
#else // __APPLE__
hints.ai_flags = AI_CANONNAME;
#endif // __APPLE__
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
if (!result) {
printf("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
return -1;
}
#ifdef __APPLE__
// refer to comments above
strcpy(fqdn, hostname);
#else // __APPLE__
strcpy(fqdn, result->ai_canonname);
#endif // __APPLE__
freeaddrinfo(result);
return 0;
}
uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
struct addrinfo hints = {0};
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
struct addrinfo *result = NULL;
int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result);
if (result) {
struct sockaddr * sa = result->ai_addr;
struct sockaddr_in *si = (struct sockaddr_in *)sa;
struct in_addr ia = si->sin_addr;
uint32_t ip = ia.s_addr;
freeaddrinfo(result);
return ip;
} else {
#ifdef EAI_SYSTEM
if (ret == EAI_SYSTEM) {
printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno));
} else {
printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
}
#else
printf("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
#endif
return 0xFFFFFFFF;
}
}
// Function converting an IP address string to an uint32_t.
uint32_t ip2uint(const char *const ip_addr) {
char ip_addr_cpy[20];
char ip[5];
tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));
char *s_start, *s_end;
s_start = ip_addr_cpy;
s_end = ip_addr_cpy;
int32_t k;
for (k = 0; *s_start != '\0'; s_start = s_end) {
for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
}
if (*s_end == '.') {
*s_end = '\0';
s_end++;
}
ip[k++] = (char)atoi(s_start);
}
ip[k] = '\0';
return *((uint32_t *)ip);
}
int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) {
int32_t nleft, nwritten;
char * ptr = (char *)buf;
nleft = nbytes;
while (nleft > 0) {
nwritten = (int32_t)taosWriteSocket(fd, (char *)ptr, (size_t)nleft);
if (nwritten <= 0) {
if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK */)
continue;
else
return -1;
} else {
nleft -= nwritten;
ptr += nwritten;
}
if (errno == SIGPIPE || errno == EPIPE) {
return -1;
}
}
return (nbytes - nleft);
}
int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) {
int32_t nleft, nread;
char * ptr = (char *)buf;
nleft = nbytes;
if (fd < 0) return -1;
while (nleft > 0) {
nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft);
if (nread == 0) {
break;
} else if (nread < 0) {
if (errno == EINTR/* || errno == EAGAIN || errno == EWOULDBLOCK*/) {
continue;
} else {
return -1;
}
} else {
nleft -= nread;
ptr += nread;
}
if (errno == SIGPIPE || errno == EPIPE) {
return -1;
}
}
return (nbytes - nleft);
}
int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes) {
taosSetNonblocking(fd, 1);
int32_t nleft, nwritten, nready;
fd_set fset;
struct timeval tv;
nleft = nbytes;
while (nleft > 0) {
tv.tv_sec = 30;
tv.tv_usec = 0;
FD_ZERO(&fset);
FD_SET(fd, &fset);
if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) {
errno = ETIMEDOUT;
printf("fd %d timeout, no enough space to write", fd);
break;
} else if (nready < 0) {
if (errno == EINTR) continue;
printf("select error, %d (%s)", errno, strerror(errno));
return -1;
}
nwritten = (int32_t)taosSend(fd, ptr, (size_t)nleft, MSG_NOSIGNAL);
if (nwritten <= 0) {
if (errno == EAGAIN || errno == EINTR) continue;
printf("write error, %d (%s)", errno, strerror(errno));
return -1;
}
nleft -= nwritten;
ptr += nwritten;
}
taosSetNonblocking(fd, 0);
return (nbytes - nleft);
}
int32_t taosReadn(SOCKET fd, char *ptr, int32_t nbytes) {
int32_t nread, nready, nleft = nbytes;
fd_set fset;
struct timeval tv;
while (nleft > 0) {
tv.tv_sec = 30;
tv.tv_usec = 0;
FD_ZERO(&fset);
FD_SET(fd, &fset);
if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) {
errno = ETIMEDOUT;
printf("fd %d timeout\n", fd);
break;
} else if (nready < 0) {
if (errno == EINTR) continue;
printf("select error, %d (%s)", errno, strerror(errno));
return -1;
}
if ((nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft)) < 0) {
if (errno == EINTR) continue;
printf("read error, %d (%s)", errno, strerror(errno));
return -1;
} else if (nread == 0) {
printf("fd %d EOF", fd);
break; // EOF
}
nleft -= nread;
ptr += nread;
}
return (nbytes - nleft);
}
SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) {
struct sockaddr_in localAddr;
SOCKET sockFd;
int32_t bufSize = 1024000;
printf("open udp socket:0x%x:%hu", ip, port);
memset((char *)&localAddr, 0, sizeof(localAddr));
localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = ip;
localAddr.sin_port = (uint16_t)htons(port);
if ((sockFd = socket(AF_INET, SOCK_DGRAM, 0)) <= 2) {
printf("failed to open udp socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd);
return -1;
}
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
printf("failed to set the send buffer size for UDP socket\n");
taosCloseSocket(sockFd);
return -1;
}
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
printf("failed to set the receive buffer size for UDP socket\n");
taosCloseSocket(sockFd);
return -1;
}
/* bind socket to local address */
if (bind(sockFd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
printf("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port);
taosCloseSocket(sockFd);
return -1;
}
return sockFd;
}
SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) {
SOCKET sockFd = 0;
int32_t ret;
struct sockaddr_in serverAddr, clientAddr;
int32_t bufSize = 1024 * 1024;
sockFd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sockFd <= 2) {
printf("failed to open the socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd);
return -1;
}
/* set REUSEADDR option, so the portnumber can be re-used */
int32_t reuse = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
printf("failed to set the send buffer size for TCP socket\n");
taosCloseSocket(sockFd);
return -1;
}
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
printf("failed to set the receive buffer size for TCP socket\n");
taosCloseSocket(sockFd);
return -1;
}
if (clientIp != 0) {
memset((char *)&clientAddr, 0, sizeof(clientAddr));
clientAddr.sin_family = AF_INET;
clientAddr.sin_addr.s_addr = clientIp;
clientAddr.sin_port = 0;
/* bind socket to client address */
if (bind(sockFd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) {
printf("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", clientIp, destIp, destPort,
strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
}
memset((char *)&serverAddr, 0, sizeof(serverAddr));
serverAddr.sin_family = AF_INET;
serverAddr.sin_addr.s_addr = destIp;
serverAddr.sin_port = (uint16_t)htons((uint16_t)destPort);
#ifdef _TD_LINUX
taosSetNonblocking(sockFd, 1);
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
if (ret == -1) {
if (errno == EHOSTUNREACH) {
printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
taosCloseSocket(sockFd);
return -1;
} else if (errno == EINPROGRESS || errno == EAGAIN || errno == EWOULDBLOCK) {
struct pollfd wfd[1];
wfd[0].fd = sockFd;
wfd[0].events = POLLOUT;
int res = poll(wfd, 1, TCP_CONN_TIMEOUT);
if (res == -1 || res == 0) {
printf("failed to connect socket, ip:0x%x, port:%hu(poll error/conn timeout)", destIp, destPort);
taosCloseSocket(sockFd); //
return -1;
}
int optVal = -1, optLen = sizeof(int);
if ((0 != taosGetSockOpt(sockFd, SOL_SOCKET, SO_ERROR, &optVal, &optLen)) || (optVal != 0)) {
printf("failed to connect socket, ip:0x%x, port:%hu(connect host error)", destIp, destPort);
taosCloseSocket(sockFd); //
return -1;
}
ret = 0;
} else { // Other error
printf("failed to connect socket, ip:0x%x, port:%hu(target host cannot be reached)", destIp, destPort);
taosCloseSocket(sockFd); //
return -1;
}
}
taosSetNonblocking(sockFd, 0);
#else
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
#endif
if (ret != 0) {
printf("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
taosCloseSocket(sockFd);
sockFd = -1;
} else {
taosKeepTcpAlive(sockFd);
}
return sockFd;
}
int32_t taosKeepTcpAlive(SOCKET sockFd) {
int32_t alive = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) {
printf("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
#ifndef __APPLE__
// all fails on macosx
int32_t probes = 3;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
printf("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
int32_t alivetime = 10;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) {
printf("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
int32_t interval = 3;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) {
printf("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
#endif // __APPLE__
int32_t nodelay = 1;
if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
printf("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
struct linger linger = {0};
linger.l_onoff = 1;
linger.l_linger = 3;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) {
printf("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
return 0;
}
SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
struct sockaddr_in serverAdd;
SOCKET sockFd;
int32_t reuse;
printf("open tcp server socket:0x%x:%hu", ip, port);
bzero((char *)&serverAdd, sizeof(serverAdd));
serverAdd.sin_family = AF_INET;
serverAdd.sin_addr.s_addr = ip;
serverAdd.sin_port = (uint16_t)htons(port);
if ((sockFd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
printf("failed to open TCP socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd);
return -1;
}
/* set REUSEADDR option, so the portnumber can be re-used */
reuse = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
printf("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
/* bind socket to server address */
if (bind(sockFd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
printf("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
if (taosKeepTcpAlive(sockFd) < 0) {
printf("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
if (listen(sockFd, 1024) < 0) {
printf("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
return sockFd;
}
void tinet_ntoa(char *ipstr, uint32_t ip) {
sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24);
}
#define COPY_SIZE 32768
// sendfile shall be used
int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) {
int64_t leftLen;
int64_t readLen, writeLen;
char temp[COPY_SIZE];
leftLen = len;
while (leftLen > 0) {
if (leftLen < COPY_SIZE)
readLen = leftLen;
else
readLen = COPY_SIZE; // 4K
int64_t retLen = taosReadMsg(sfd, temp, (int32_t)readLen);
if (readLen != retLen) {
printf("read error, readLen:%" PRId64 " retLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
readLen, retLen, len, leftLen, strerror(errno));
return -1;
}
writeLen = taosWriteMsg(dfd, temp, (int32_t)readLen);
if (readLen != writeLen) {
printf("copy error, readLen:%" PRId64 " writeLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
readLen, writeLen, len, leftLen, strerror(errno));
return -1;
}
leftLen -= readLen;
}
return len;
}
......@@ -57,6 +57,7 @@ struct tm *localtime_r(const time_t *timep, struct tm *result) {
*/
#include <sys/time.h>
// #include "monotonic.h"
FORCE_INLINE int32_t taosGetTimeOfDay(struct timeval *tv) {
return gettimeofday(tv, NULL);
......
......@@ -222,4 +222,20 @@ void taosUninitTimer() {
pthread_join(timerThread, NULL);
}
int64_t taosGetMonotonicMs() {
#if 0
return getMonotonicUs() / 1000;
#else
return taosGetTimestampMs();
#endif
}
const char *taosMonotonicInit() {
#if 0
return monotonicInit();
#else
return NULL;
#endif
}
#endif
......@@ -27,14 +27,6 @@
#include "dnodeMain.h"
#include "mnode.h"
static int32_t dnodeCreateDir(const char *dir) {
if (!taosMkDir(dir, 0755) && errno != EEXIST) {
return -1;
}
return 0;
}
static void dnodeCheckDataDirOpenned(char *dir) {
#if 0
char filepath[256] = {0};
......@@ -87,7 +79,7 @@ int32_t dnodeInitMain(Dnode *dnode, DnMain **out) {
taosSetCoreDump(tsEnableCoreFile);
#endif
if (dnodeCreateDir(tsLogDir) < 0) {
if (!taosMkDir(tsLogDir)) {
printf("failed to create dir: %s, reason: %s\n", tsLogDir, strerror(errno));
return -1;
}
......@@ -134,7 +126,7 @@ int32_t dnodeInitStorage(Dnode *dnode, void **m) {
#endif
// storage module init
if (tsDiskCfgNum == 1 && dnodeCreateDir(tsDataDir) < 0) {
if (tsDiskCfgNum == 1 && !taosMkDir(tsDataDir)) {
dError("failed to create dir:%s since %s", tsDataDir, strerror(errno));
return -1;
}
......@@ -151,12 +143,12 @@ int32_t dnodeInitStorage(Dnode *dnode, void **m) {
sprintf(tsVnodeDir, "%s/vnode", tsDataDir);
sprintf(tsDnodeDir, "%s/dnode", tsDataDir);
if (dnodeCreateDir(tsMnodeDir) < 0) {
if (!taosMkDir(tsMnodeDir)) {
dError("failed to create dir:%s since %s", tsMnodeDir, strerror(errno));
return -1;
}
if (dnodeCreateDir(tsDnodeDir) < 0) {
if (!taosMkDir(tsDnodeDir)) {
dError("failed to create dir:%s since %s", tsDnodeDir, strerror(errno));
return -1;
}
......
......@@ -155,7 +155,7 @@ static bool taosReadDirectoryConfig(SGlobalCfg *cfg, char *input_value) {
taosExpandDir(input_value, option, cfg->ptrLength);
taosRealPath(option, cfg->ptrLength);
if (!taosMkDir(option, 0755)) {
if (!taosMkDir(option)) {
uError("config option:%s, input value:%s, directory not exist, create fail:%s", cfg->option, input_value,
strerror(errno));
return false;
......
......@@ -53,13 +53,18 @@ static int64_t tfOpenImp(int32_t fd) {
return rid;
}
int64_t tfOpen(const char *pathname, int32_t flags) {
int32_t fd = open(pathname, flags | O_BINARY);
int64_t tfOpenReadWrite(const char *pathname, int32_t flags) {
int32_t fd = taosOpenFileReadWrite(pathname);
return tfOpenImp(fd);
}
int64_t tfOpenM(const char *pathname, int32_t flags, mode_t mode) {
int32_t fd = open(pathname, flags | O_BINARY, mode);
int64_t tfOpenCreateWrite(const char *pathname, int32_t flags, mode_t mode) {
int32_t fd = taosOpenFileCreateWrite(pathname);
return tfOpenImp(fd);
}
int64_t tfOpenCreateWriteAppend(const char *pathname, int32_t flags, mode_t mode) {
int32_t fd = taosOpenFileCreateWriteAppend(pathname);
return tfOpenImp(fd);
}
......@@ -73,7 +78,7 @@ int64_t tfWrite(int64_t tfd, void *buf, int64_t count) {
int32_t fd = (int32_t)(uintptr_t)p;
int64_t ret = taosWrite(fd, buf, count);
int64_t ret = taosWriteFile(fd, buf, count);
if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno);
taosReleaseRef(tsFileRsetId, tfd);
......@@ -86,7 +91,7 @@ int64_t tfRead(int64_t tfd, void *buf, int64_t count) {
int32_t fd = (int32_t)(uintptr_t)p;
int64_t ret = taosRead(fd, buf, count);
int64_t ret = taosReadFile(fd, buf, count);
if (ret < 0) terrno = TAOS_SYSTEM_ERROR(errno);
taosReleaseRef(tsFileRsetId, tfd);
......@@ -98,7 +103,7 @@ int32_t tfFsync(int64_t tfd) {
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
int32_t code = taosFsync(fd);
int32_t code = taosFsyncFile(fd);
taosReleaseRef(tsFileRsetId, tfd);
return code;
......@@ -117,7 +122,7 @@ int64_t tfLseek(int64_t tfd, int64_t offset, int32_t whence) {
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
int64_t ret = taosLSeek(fd, offset, whence);
int64_t ret = taosLSeekFile(fd, offset, whence);
taosReleaseRef(tsFileRsetId, tfd);
return ret;
......@@ -128,7 +133,7 @@ int32_t tfFtruncate(int64_t tfd, int64_t length) {
if (p == NULL) return -1;
int32_t fd = (int32_t)(uintptr_t)p;
int32_t code = taosFtruncate(fd, length);
int32_t code = taosFtruncateFile(fd, length);
taosReleaseRef(tsFileRsetId, tfd);
return code;
......
......@@ -15,8 +15,9 @@
#include "os.h"
#include "hashfunc.h"
#include "tutil.h"
#include "tcompare.h"
#include "tdef.h"
#include "tutil.h"
#define ROTL32(x, r) ((x) << (r) | (x) >> (32u - (r)))
......
......@@ -201,7 +201,7 @@ static void *taosThreadToOpenNewFile(void *param) {
taosUmaskFile(0);
int32_t fd = taosOpenFileTruncCreateWrite(name);
int32_t fd = taosOpenFileCreateWriteTrunc(name);
if (fd < 0) {
tsLogObj.openInProgress = 0;
tsLogObj.lines = tsLogObj.maxLines - 1000;
......@@ -731,7 +731,7 @@ int32_t taosCompressFile(char *srcFileName, char *destFileName) {
goto cmp_end;
}
int32_t fd = taosOpenFileTruncCreateWrite(destFileName);
int32_t fd = taosOpenFileCreateWriteTrunc(destFileName);
if (fd < 0) {
ret = -2;
goto cmp_end;
......
......@@ -15,7 +15,7 @@
#include "os.h"
#include "tlosertree.h"
#include "taosmsg.h"
// #include "taosmsg.h"
#include "tulog.h"
// set initial value for loser tree
......@@ -46,7 +46,7 @@ uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* pa
*pTree = (SLoserTreeInfo*)calloc(1, sizeof(SLoserTreeInfo) + sizeof(SLoserTreeNode) * totalEntries);
if ((*pTree) == NULL) {
uError("allocate memory for loser-tree failed. reason:%s", strerror(errno));
return TSDB_CODE_QRY_OUT_OF_MEMORY;
return -1;
}
(*pTree)->pNode = (SLoserTreeNode*)(((char*)(*pTree)) + sizeof(SLoserTreeInfo));
......@@ -74,7 +74,7 @@ uint32_t tLoserTreeCreate(SLoserTreeInfo** pTree, int32_t numOfEntries, void* pa
printf("initialize local reducer completed!\n");
#endif
return TSDB_CODE_SUCCESS;
return 0;
}
void tLoserTreeAdjust(SLoserTreeInfo* pTree, int32_t idx) {
......
......@@ -35,7 +35,6 @@
#include "os.h"
#include "tmd5.h"
#include "taosdef.h"
/* forward declaration */
static void Transform(uint32_t *buf, uint32_t *in);
......
......@@ -16,7 +16,7 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "tutil.h"
#include "tglobal.h"
#include "tdef.h"
#include "tnote.h"
SNoteObj tsHttpNote;
......@@ -43,6 +43,7 @@ static void taosInitNote(int32_t numOfLines, int32_t maxNotes, SNoteObj *pNote,
void taosInitNotes() {
char name[TSDB_FILENAME_LEN * 2] = {0};
#if 0
if (tsTscEnableRecordSql) {
snprintf(name, TSDB_FILENAME_LEN * 2, "%s/tscsql-%d", tsLogDir, taosGetPId());
taosInitNote(tsNumOfLogLines, 1, &tsTscNote, name);
......@@ -57,13 +58,14 @@ void taosInitNotes() {
snprintf(name, TSDB_FILENAME_LEN * 2, "%s/taosinfo", tsLogDir);
taosInitNote(tsNumOfLogLines, 1, &tsInfoNote, name);
}
#endif
}
static bool taosLockNote(int32_t fd, SNoteObj *pNote) {
if (fd < 0) return false;
if (pNote->fileNum > 1) {
int32_t ret = (int32_t)(flock(fd, LOCK_EX | LOCK_NB));
int32_t ret = (int32_t)taosLockFile(fd);
if (ret == 0) {
return true;
}
......@@ -76,7 +78,7 @@ static void taosUnLockNote(int32_t fd, SNoteObj *pNote) {
if (fd < 0) return;
if (pNote->fileNum > 1) {
flock(fd, LOCK_UN | LOCK_NB);
taosUnLockFile(fd);
}
}
......@@ -90,9 +92,9 @@ static void *taosThreadToOpenNewNote(void *param) {
pNote->lines = 0;
sprintf(name, "%s.%d", pNote->name, pNote->flag);
umask(0);
taosUmaskFile(0);
int32_t fd = open(name, O_WRONLY | O_CREAT | O_TRUNC, S_IRWXU | S_IRWXG | S_IRWXO);
int32_t fd = taosOpenFileCreateWriteTrunc(name);
if (fd < 0) {
return NULL;
}
......@@ -132,7 +134,7 @@ static int32_t taosOpenNewNote(SNoteObj *pNote) {
}
static bool taosCheckNoteIsOpen(char *noteName, SNoteObj *pNote) {
int32_t fd = open(noteName, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
int32_t fd = taosOpenFileCreateWrite(noteName);
if (fd < 0) {
fprintf(stderr, "failed to open note:%s reason:%s\n", noteName, strerror(errno));
return true;
......@@ -174,7 +176,7 @@ static void taosGetNoteName(char *fn, SNoteObj *pNote) {
static int32_t taosOpenNoteWithMaxLines(char *fn, int32_t maxLines, int32_t maxNoteNum, SNoteObj *pNote) {
char name[NOTE_FILE_NAME_LEN * 2] = {0};
int32_t size;
struct stat logstat0, logstat1;
int32_t logstat0_mtime, logstat1_mtime;
pNote->maxLines = maxLines;
pNote->fileNum = maxNoteNum;
......@@ -184,13 +186,13 @@ static int32_t taosOpenNoteWithMaxLines(char *fn, int32_t maxLines, int32_t maxN
strcpy(name, fn);
strcat(name, ".0");
}
bool log0Exist = stat(name, &logstat0) >= 0;
bool log0Exist = taosStatFile(name, NULL, &logstat0_mtime) >= 0;
if (strlen(fn) < NOTE_FILE_NAME_LEN + 50 - 2) {
strcpy(name, fn);
strcat(name, ".1");
}
bool log1Exist = stat(name, &logstat1) >= 0;
bool log1Exist = taosStatFile(name, NULL, &logstat1_mtime) >= 0;
if (!log0Exist && !log1Exist) {
pNote->flag = 0;
......@@ -199,15 +201,15 @@ static int32_t taosOpenNoteWithMaxLines(char *fn, int32_t maxLines, int32_t maxN
} else if (!log0Exist) {
pNote->flag = 1;
} else {
pNote->flag = (logstat0.st_mtime > logstat1.st_mtime) ? 0 : 1;
pNote->flag = (logstat0_mtime > logstat1_mtime) ? 0 : 1;
}
char noteName[NOTE_FILE_NAME_LEN * 2] = {0};
sprintf(noteName, "%s.%d", pNote->name, pNote->flag);
pthread_mutex_init(&pNote->mutex, NULL);
umask(0);
pNote->fd = open(noteName, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
taosUmaskFile(0);
pNote->fd = taosOpenFileCreateWrite(noteName);
if (pNote->fd < 0) {
fprintf(stderr, "failed to open note file:%s reason:%s\n", noteName, strerror(errno));
......@@ -216,12 +218,12 @@ static int32_t taosOpenNoteWithMaxLines(char *fn, int32_t maxLines, int32_t maxN
taosLockNote(pNote->fd, pNote);
// only an estimate for number of lines
struct stat filestat;
if (fstat(pNote->fd, &filestat) < 0) {
int64_t filestat_size;
if (taosFStatFile(pNote->fd, &filestat_size, NULL) < 0) {
fprintf(stderr, "failed to fstat note file:%s reason:%s\n", noteName, strerror(errno));
return -1;
}
size = (int32_t)filestat.st_size;
size = (int32_t)filestat_size;
pNote->lines = size / 60;
lseek(pNote->fd, 0, SEEK_END);
......@@ -231,7 +233,7 @@ static int32_t taosOpenNoteWithMaxLines(char *fn, int32_t maxLines, int32_t maxN
void taosNotePrintBuffer(SNoteObj *pNote, char *buffer, int32_t len) {
if (pNote->fd <= 0) return;
taosWrite(pNote->fd, buffer, len);
taosWriteFile(pNote->fd, buffer, len);
if (pNote->maxLines > 0) {
pNote->lines++;
......@@ -247,7 +249,7 @@ void taosNotePrint(SNoteObj *pNote, const char *const format, ...) {
struct timeval timeSecs;
time_t curTime;
gettimeofday(&timeSecs, NULL);
taosGetTimeOfDay(&timeSecs);
curTime = timeSecs.tv_sec;
ptm = localtime_r(&curTime, &Tm);
len = sprintf(buffer, "%02d/%02d %02d:%02d:%02d.%06d %08" PRId64 " ", ptm->tm_mon + 1, ptm->tm_mday, ptm->tm_hour,
......
......@@ -14,7 +14,7 @@
*/
#include "os.h"
#include "taosdef.h"
#include "tdef.h"
#include "tutil.h"
#include "tulog.h"
#include "tsched.h"
......
......@@ -16,7 +16,6 @@
#include "tskiplist.h"
#include "os.h"
#include "tcompare.h"
#include "tdataformat.h"
#include "tulog.h"
#include "tutil.h"
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "os.h"
#include "tulog.h"
#include "tsocket.h"
#include "taoserror.h"
#ifndef SIGPIPE
#define SIGPIPE EPIPE
#endif
#define TCP_CONN_TIMEOUT 3000 // conn timeout
int32_t taosGetFqdn(char *fqdn) {
char hostname[1024];
hostname[1023] = '\0';
if (gethostname(hostname, 1023) == -1) {
uError("failed to get hostname, reason:%s", strerror(errno));
return -1;
}
struct addrinfo hints = {0};
struct addrinfo *result = NULL;
#ifdef __APPLE__
// on macosx, hostname -f has the form of xxx.local
// which will block getaddrinfo for a few seconds if AI_CANONNAME is set
// thus, we choose AF_INET (ipv4 for the moment) to make getaddrinfo return
// immediately
hints.ai_family = AF_INET;
#else // __APPLE__
hints.ai_flags = AI_CANONNAME;
#endif // __APPLE__
int32_t ret = getaddrinfo(hostname, NULL, &hints, &result);
if (!result) {
uError("failed to get fqdn, code:%d, reason:%s", ret, gai_strerror(ret));
return -1;
}
#ifdef __APPLE__
// refer to comments above
strcpy(fqdn, hostname);
#else // __APPLE__
strcpy(fqdn, result->ai_canonname);
#endif // __APPLE__
freeaddrinfo(result);
return 0;
}
uint32_t taosGetIpv4FromFqdn(const char *fqdn) {
struct addrinfo hints = {0};
hints.ai_family = AF_INET;
hints.ai_socktype = SOCK_STREAM;
struct addrinfo *result = NULL;
int32_t ret = getaddrinfo(fqdn, NULL, &hints, &result);
if (result) {
struct sockaddr * sa = result->ai_addr;
struct sockaddr_in *si = (struct sockaddr_in *)sa;
struct in_addr ia = si->sin_addr;
uint32_t ip = ia.s_addr;
freeaddrinfo(result);
return ip;
} else {
#ifdef EAI_SYSTEM
if (ret == EAI_SYSTEM) {
uError("failed to get the ip address, fqdn:%s, since:%s", fqdn, strerror(errno));
terrno = TAOS_SYSTEM_ERROR(errno);
} else {
uError("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
}
#else
uError("failed to get the ip address, fqdn:%s, since:%s", fqdn, gai_strerror(ret));
#endif
return 0xFFFFFFFF;
}
}
// Function converting an IP address string to an uint32_t.
uint32_t ip2uint(const char *const ip_addr) {
char ip_addr_cpy[20];
char ip[5];
tstrncpy(ip_addr_cpy, ip_addr, sizeof(ip_addr_cpy));
char *s_start, *s_end;
s_start = ip_addr_cpy;
s_end = ip_addr_cpy;
int32_t k;
for (k = 0; *s_start != '\0'; s_start = s_end) {
for (s_end = s_start; *s_end != '.' && *s_end != '\0'; s_end++) {
}
if (*s_end == '.') {
*s_end = '\0';
s_end++;
}
ip[k++] = (char)atoi(s_start);
}
ip[k] = '\0';
return *((uint32_t *)ip);
}
int32_t taosWriteMsg(SOCKET fd, void *buf, int32_t nbytes) {
int32_t nleft, nwritten;
char * ptr = (char *)buf;
nleft = nbytes;
while (nleft > 0) {
nwritten = (int32_t)taosWriteSocket(fd, (char *)ptr, (size_t)nleft);
if (nwritten <= 0) {
if (errno == EINTR /* || errno == EAGAIN || errno == EWOULDBLOCK */)
continue;
else
return -1;
} else {
nleft -= nwritten;
ptr += nwritten;
}
if (errno == SIGPIPE || errno == EPIPE) {
return -1;
}
}
return (nbytes - nleft);
}
int32_t taosReadMsg(SOCKET fd, void *buf, int32_t nbytes) {
int32_t nleft, nread;
char * ptr = (char *)buf;
nleft = nbytes;
if (fd < 0) return -1;
while (nleft > 0) {
nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft);
if (nread == 0) {
break;
} else if (nread < 0) {
if (errno == EINTR/* || errno == EAGAIN || errno == EWOULDBLOCK*/) {
continue;
} else {
return -1;
}
} else {
nleft -= nread;
ptr += nread;
}
if (errno == SIGPIPE || errno == EPIPE) {
return -1;
}
}
return (nbytes - nleft);
}
int32_t taosNonblockwrite(SOCKET fd, char *ptr, int32_t nbytes) {
taosSetNonblocking(fd, 1);
int32_t nleft, nwritten, nready;
fd_set fset;
struct timeval tv;
nleft = nbytes;
while (nleft > 0) {
tv.tv_sec = 30;
tv.tv_usec = 0;
FD_ZERO(&fset);
FD_SET(fd, &fset);
if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) {
errno = ETIMEDOUT;
uError("fd %d timeout, no enough space to write", fd);
break;
} else if (nready < 0) {
if (errno == EINTR) continue;
uError("select error, %d (%s)", errno, strerror(errno));
return -1;
}
nwritten = (int32_t)taosSend(fd, ptr, (size_t)nleft, MSG_NOSIGNAL);
if (nwritten <= 0) {
if (errno == EAGAIN || errno == EINTR) continue;
uError("write error, %d (%s)", errno, strerror(errno));
return -1;
}
nleft -= nwritten;
ptr += nwritten;
}
taosSetNonblocking(fd, 0);
return (nbytes - nleft);
}
int32_t taosReadn(SOCKET fd, char *ptr, int32_t nbytes) {
int32_t nread, nready, nleft = nbytes;
fd_set fset;
struct timeval tv;
while (nleft > 0) {
tv.tv_sec = 30;
tv.tv_usec = 0;
FD_ZERO(&fset);
FD_SET(fd, &fset);
if ((nready = select((int32_t)(fd + 1), NULL, &fset, NULL, &tv)) == 0) {
errno = ETIMEDOUT;
uError("fd %d timeout\n", fd);
break;
} else if (nready < 0) {
if (errno == EINTR) continue;
uError("select error, %d (%s)", errno, strerror(errno));
return -1;
}
if ((nread = (int32_t)taosReadSocket(fd, ptr, (size_t)nleft)) < 0) {
if (errno == EINTR) continue;
uError("read error, %d (%s)", errno, strerror(errno));
return -1;
} else if (nread == 0) {
uError("fd %d EOF", fd);
break; // EOF
}
nleft -= nread;
ptr += nread;
}
return (nbytes - nleft);
}
SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) {
struct sockaddr_in localAddr;
SOCKET sockFd;
int32_t bufSize = 1024000;
uDebug("open udp socket:0x%x:%hu", ip, port);
memset((char *)&localAddr, 0, sizeof(localAddr));
localAddr.sin_family = AF_INET;
localAddr.sin_addr.s_addr = ip;
localAddr.sin_port = (uint16_t)htons(port);
if ((sockFd = socket(AF_INET, SOCK_DGRAM, 0)) <= 2) {
uError("failed to open udp socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd);
return -1;
}
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the send buffer size for UDP socket\n");
taosCloseSocket(sockFd);
return -1;
}
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the receive buffer size for UDP socket\n");
taosCloseSocket(sockFd);
return -1;
}
/* bind socket to local address */
if (bind(sockFd, (struct sockaddr *)&localAddr, sizeof(localAddr)) < 0) {
uError("failed to bind udp socket: %d (%s), 0x%x:%hu", errno, strerror(errno), ip, port);
taosCloseSocket(sockFd);
return -1;
}
return sockFd;
}
SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientIp) {
SOCKET sockFd = 0;
int32_t ret;
struct sockaddr_in serverAddr, clientAddr;
int32_t bufSize = 1024 * 1024;
sockFd = socket(PF_INET, SOCK_STREAM, IPPROTO_TCP);
if (sockFd <= 2) {
uError("failed to open the socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd);
return -1;
}
/* set REUSEADDR option, so the portnumber can be re-used */
int32_t reuse = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_SNDBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the send buffer size for TCP socket\n");
taosCloseSocket(sockFd);
return -1;
}
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_RCVBUF, (void *)&bufSize, sizeof(bufSize)) != 0) {
uError("failed to set the receive buffer size for TCP socket\n");
taosCloseSocket(sockFd);
return -1;
}
if (clientIp != 0) {
memset((char *)&clientAddr, 0, sizeof(clientAddr));
clientAddr.sin_family = AF_INET;
clientAddr.sin_addr.s_addr = clientIp;
clientAddr.sin_port = 0;
/* bind socket to client address */
if (bind(sockFd, (struct sockaddr *)&clientAddr, sizeof(clientAddr)) < 0) {
uError("bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)", clientIp, destIp, destPort,
strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
}
memset((char *)&serverAddr, 0, sizeof(serverAddr));
serverAddr.sin_family = AF_INET;
serverAddr.sin_addr.s_addr = destIp;
serverAddr.sin_port = (uint16_t)htons((uint16_t)destPort);
#ifdef _TD_LINUX
taosSetNonblocking(sockFd, 1);
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
if (ret == -1) {
if (errno == EHOSTUNREACH) {
uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
taosCloseSocket(sockFd);
return -1;
} else if (errno == EINPROGRESS || errno == EAGAIN || errno == EWOULDBLOCK) {
struct pollfd wfd[1];
wfd[0].fd = sockFd;
wfd[0].events = POLLOUT;
int res = poll(wfd, 1, TCP_CONN_TIMEOUT);
if (res == -1 || res == 0) {
uError("failed to connect socket, ip:0x%x, port:%hu(poll error/conn timeout)", destIp, destPort);
taosCloseSocket(sockFd); //
return -1;
}
int optVal = -1, optLen = sizeof(int);
if ((0 != taosGetSockOpt(sockFd, SOL_SOCKET, SO_ERROR, &optVal, &optLen)) || (optVal != 0)) {
uError("failed to connect socket, ip:0x%x, port:%hu(connect host error)", destIp, destPort);
taosCloseSocket(sockFd); //
return -1;
}
ret = 0;
} else { // Other error
uError("failed to connect socket, ip:0x%x, port:%hu(target host cannot be reached)", destIp, destPort);
taosCloseSocket(sockFd); //
return -1;
}
}
taosSetNonblocking(sockFd, 0);
#else
ret = connect(sockFd, (struct sockaddr *)&serverAddr, sizeof(serverAddr));
#endif
if (ret != 0) {
uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
taosCloseSocket(sockFd);
sockFd = -1;
} else {
taosKeepTcpAlive(sockFd);
}
return sockFd;
}
int32_t taosKeepTcpAlive(SOCKET sockFd) {
int32_t alive = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_KEEPALIVE, (void *)&alive, sizeof(alive)) < 0) {
uError("fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
#ifndef __APPLE__
// all fails on macosx
int32_t probes = 3;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPCNT, (void *)&probes, sizeof(probes)) < 0) {
uError("fd:%d setsockopt SO_KEEPCNT failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
int32_t alivetime = 10;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPIDLE, (void *)&alivetime, sizeof(alivetime)) < 0) {
uError("fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
int32_t interval = 3;
if (taosSetSockOpt(sockFd, SOL_TCP, TCP_KEEPINTVL, (void *)&interval, sizeof(interval)) < 0) {
uError("fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
#endif // __APPLE__
int32_t nodelay = 1;
if (taosSetSockOpt(sockFd, IPPROTO_TCP, TCP_NODELAY, (void *)&nodelay, sizeof(nodelay)) < 0) {
uError("fd:%d setsockopt TCP_NODELAY failed %d (%s)", sockFd, errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
struct linger linger = {0};
linger.l_onoff = 1;
linger.l_linger = 3;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_LINGER, (void *)&linger, sizeof(linger)) < 0) {
uError("setsockopt SO_LINGER failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
return 0;
}
SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
struct sockaddr_in serverAdd;
SOCKET sockFd;
int32_t reuse;
uDebug("open tcp server socket:0x%x:%hu", ip, port);
bzero((char *)&serverAdd, sizeof(serverAdd));
serverAdd.sin_family = AF_INET;
serverAdd.sin_addr.s_addr = ip;
serverAdd.sin_port = (uint16_t)htons(port);
if ((sockFd = socket(AF_INET, SOCK_STREAM, IPPROTO_TCP)) <= 2) {
uError("failed to open TCP socket: %d (%s)", errno, strerror(errno));
taosCloseSocketNoCheck(sockFd);
return -1;
}
/* set REUSEADDR option, so the portnumber can be re-used */
reuse = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
/* bind socket to server address */
if (bind(sockFd, (struct sockaddr *)&serverAdd, sizeof(serverAdd)) < 0) {
uError("bind tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
if (taosKeepTcpAlive(sockFd) < 0) {
uError("failed to set tcp server keep-alive option, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
if (listen(sockFd, 1024) < 0) {
uError("listen tcp server socket failed, 0x%x:%hu(%s)", ip, port, strerror(errno));
taosCloseSocket(sockFd);
return -1;
}
return sockFd;
}
void tinet_ntoa(char *ipstr, uint32_t ip) {
sprintf(ipstr, "%d.%d.%d.%d", ip & 0xFF, (ip >> 8) & 0xFF, (ip >> 16) & 0xFF, ip >> 24);
}
#define COPY_SIZE 32768
// sendfile shall be used
int64_t taosCopyFds(SOCKET sfd, int32_t dfd, int64_t len) {
int64_t leftLen;
int64_t readLen, writeLen;
char temp[COPY_SIZE];
leftLen = len;
while (leftLen > 0) {
if (leftLen < COPY_SIZE)
readLen = leftLen;
else
readLen = COPY_SIZE; // 4K
int64_t retLen = taosReadMsg(sfd, temp, (int32_t)readLen);
if (readLen != retLen) {
uError("read error, readLen:%" PRId64 " retLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
readLen, retLen, len, leftLen, strerror(errno));
return -1;
}
writeLen = taosWriteMsg(dfd, temp, (int32_t)readLen);
if (readLen != writeLen) {
uError("copy error, readLen:%" PRId64 " writeLen:%" PRId64 " len:%" PRId64 " leftLen:%" PRId64 ", reason:%s",
readLen, writeLen, len, leftLen, strerror(errno));
return -1;
}
leftLen -= readLen;
}
return len;
}
......@@ -15,8 +15,7 @@
#include "os.h"
#include "tthread.h"
#include "tglobal.h"
#include "taosdef.h"
#include "tdef.h"
#include "tutil.h"
#include "tulog.h"
#include "taoserror.h"
......
......@@ -18,7 +18,6 @@
#include "tsched.h"
#include "ttimer.h"
#include "tutil.h"
#include "monotonic.h"
extern int8_t tscEmbedded;
......@@ -187,10 +186,6 @@ static void removeTimer(uintptr_t id) {
unlockTimerList(list);
}
static int64_t getMonotonicMs(void) {
return (int64_t) getMonotonicUs() / 1000;
}
static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
timerAddRef(timer);
// select a wheel for the timer, we are not an accurate timer,
......@@ -206,7 +201,7 @@ static void addToWheel(tmr_obj_t* timer, uint32_t delay) {
time_wheel_t* wheel = wheels + timer->wheel;
timer->prev = NULL;
timer->expireAt = getMonotonicMs() + delay;
timer->expireAt = taosGetMonotonicMs() + delay;
pthread_mutex_lock(&wheel->mutex);
......@@ -339,7 +334,7 @@ tmr_h taosTmrStart(TAOS_TMR_CALLBACK fp, int mseconds, void* param, void* handle
}
static void taosTimerLoopFunc(int signo) {
int64_t now = getMonotonicMs();
int64_t now = taosGetMonotonicMs();
for (int i = 0; i < tListLen(wheels); i++) {
// `expried` is a temporary expire list.
......@@ -506,7 +501,7 @@ static void taosTmrModuleInit(void) {
pthread_mutex_init(&tmrCtrlMutex, NULL);
int64_t now = getMonotonicMs();
int64_t now = taosGetMonotonicMs();
for (int i = 0; i < tListLen(wheels); i++) {
time_wheel_t* wheel = wheels + i;
if (pthread_mutex_init(&wheel->mutex, NULL) != 0) {
......@@ -537,9 +532,6 @@ static void taosTmrModuleInit(void) {
}
void* taosTmrInit(int maxNumOfTmrs, int resolution, int longest, const char* label) {
const char* ret = monotonicInit();
tmrDebug("ttimer monotonic clock source:%s", ret);
pthread_once(&tmrModuleInit, taosTmrModuleInit);
pthread_mutex_lock(&tmrCtrlMutex);
......
......@@ -15,8 +15,7 @@
#include "os.h"
#include "tcrc32c.h"
#include "tglobal.h"
#include "taosdef.h"
#include "tdef.h"
#include "tutil.h"
#include "tulog.h"
#include "taoserror.h"
......
......@@ -371,7 +371,7 @@ void tscSaveSubscriptionProgress(void* sub) {
char path[256];
sprintf(path, "%s/subscribe", tsDataDir);
if (!taosMkDir(path, 0777)) {
if (!taosMkDir(path)) {
tscError("failed to create subscribe dir: %s", path);
}
......
......@@ -252,7 +252,7 @@ int tfsMkdirAt(const char *rname, int level, int id) {
char aname[TMPNAME_LEN];
snprintf(aname, TMPNAME_LEN, "%s/%s", DISK_DIR(pDisk), rname);
if (!taosMkDir(aname, 0755)) {
if (!taosMkDir(aname)) {
terrno = TAOS_SYSTEM_ERROR(errno);
return -1;
}
......
......@@ -139,7 +139,7 @@ void walClose(void *handle) {
}
static int32_t walInitObj(SWal *pWal) {
if (!taosMkDir(pWal->path, 0755)) {
if (!taosMkDir(pWal->path)) {
wError("vgId:%d, path:%s, failed to create directory since %s", pWal->vgId, pWal->path, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
}
......
......@@ -51,7 +51,7 @@ int32_t walRenew(void *handle) {
}
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
pWal->tfd = tfOpenM(pWal->name, O_WRONLY | O_CREAT, S_IRWXU | S_IRWXG | S_IRWXO);
pWal->tfd = tfOpenCreateWrite(pWal->name);
if (!tfValid(pWal->tfd)) {
code = TAOS_SYSTEM_ERROR(errno);
......@@ -220,7 +220,7 @@ int32_t walRestore(void *handle, void *pVnode, FWalWrite writeFp) {
// open the existing WAL file in append mode
pWal->fileId = 0;
snprintf(pWal->name, sizeof(pWal->name), "%s/%s%" PRId64, pWal->path, WAL_PREFIX, pWal->fileId);
pWal->tfd = tfOpenM(pWal->name, O_WRONLY | O_CREAT | O_APPEND, S_IRWXU | S_IRWXG | S_IRWXO);
pWal->tfd = tfOpenCreateWriteAppend(pWal->name);
if (!tfValid(pWal->tfd)) {
wError("vgId:%d, file:%s, failed to open since %s", pWal->vgId, pWal->name, strerror(errno));
return TAOS_SYSTEM_ERROR(errno);
......@@ -425,7 +425,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
return TAOS_SYSTEM_ERROR(errno);
}
int64_t tfd = tfOpen(name, O_RDWR);
int64_t tfd = tfOpenReadWrite(name);
if (!tfValid(tfd)) {
wError("vgId:%d, file:%s, failed to open for restore since %s", pWal->vgId, name, strerror(errno));
tfree(buffer);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册