Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
ff25a2c4
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
ff25a2c4
编写于
11月 03, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
TD-1912
上级
977589de
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
248 addition
and
96 deletion
+248
-96
src/os/inc/os.h
src/os/inc/os.h
+2
-1
src/os/inc/osAlloc.h
src/os/inc/osAlloc.h
+3
-5
src/os/inc/osFile.h
src/os/inc/osFile.h
+17
-1
src/os/inc/osSocket.h
src/os/inc/osSocket.h
+9
-11
src/os/src/detail/osAlloc.c
src/os/src/detail/osAlloc.c
+2
-2
src/os/src/detail/osFail.c
src/os/src/detail/osFail.c
+141
-0
src/os/src/detail/osSocket.c
src/os/src/detail/osSocket.c
+4
-4
src/os/src/windows/wSocket.c
src/os/src/windows/wSocket.c
+3
-3
src/util/inc/tsocket.h
src/util/inc/tsocket.h
+12
-12
src/util/src/tsocket.c
src/util/src/tsocket.c
+55
-55
src/wal/src/walMgmt.c
src/wal/src/walMgmt.c
+0
-1
src/wal/src/walWrite.c
src/wal/src/walWrite.c
+0
-1
未找到文件。
src/os/inc/os.h
浏览文件 @
ff25a2c4
...
...
@@ -52,9 +52,10 @@ extern "C" {
#include "osWindows.h"
#endif
#include "osDef.h"
#include "osAlloc.h"
#include "osAtomic.h"
#include "osCommon.h"
#include "osDef.h"
#include "osDir.h"
#include "osFile.h"
#include "osLz4.h"
...
...
src/
util/inc/ta
lloc.h
→
src/
os/inc/osA
lloc.h
浏览文件 @
ff25a2c4
...
...
@@ -13,16 +13,14 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_
UTIL
_ALLOC_H
#define TDENGINE_
UTIL
_ALLOC_H
#ifndef TDENGINE_
OS
_ALLOC_H
#define TDENGINE_
OS
_ALLOC_H
#ifdef __cplusplus
extern
"C"
{
#endif
#define TSDB_USE_SYS_MEM
#ifdef TSDB_USE_SYS_MEM
#ifndef TAOS_OS_FUNC_ALLOC
#define tmalloc(size) malloc(size)
#define tcalloc(size) calloc(1, size)
#define trealloc(p, size) realloc(p, size)
...
...
src/os/inc/osFile.h
浏览文件 @
ff25a2c4
...
...
@@ -35,12 +35,28 @@ int64_t taosRead(int32_t fd, void *buf, int64_t count);
int64_t
taosWrite
(
int32_t
fd
,
void
*
buf
,
int64_t
count
);
int64_t
taosLSeek
(
int32_t
fd
,
int64_t
offset
,
int32_t
whence
);
int32_t
taosRenameFile
(
char
*
fullPath
,
char
*
suffix
,
char
delimiter
,
char
**
dstPath
);
#define taosClose(x) tclose(x)
// TAOS_OS_FUNC_FILE_SENDIFLE
int64_t
taosSendFile
(
int32_t
dfd
,
int32_t
sfd
,
int64_t
*
offset
,
int64_t
size
);
int64_t
taosFSendFile
(
FILE
*
outfile
,
FILE
*
infile
,
int64_t
*
offset
,
int64_t
size
);
#ifdef TAOS_RANDOM_FILE_FAIL
void
taosSetRandomFileFailFactor
(
int
factor
);
void
taosSetRandomFileFailOutput
(
const
char
*
path
);
#ifdef TAOS_RANDOM_FILE_FAIL_TEST
ssize_t
taosReadFileRandomFail
(
int
fd
,
void
*
buf
,
size_t
count
,
const
char
*
file
,
uint32_t
line
);
ssize_t
taosWriteFileRandomFail
(
int
fd
,
void
*
buf
,
size_t
count
,
const
char
*
file
,
uint32_t
line
);
off_t
taosLSeekRandomFail
(
int
fd
,
off_t
offset
,
int
whence
,
const
char
*
file
,
uint32_t
line
);
#undef taosRead
#undef taosWrite
#undef taosLSeek
#define taosRead(fd, buf, count) taosReadFileRandomFail(fd, buf, count, __FILE__, __LINE__)
#define taosWrite(fd, buf, count) taosWriteFileRandomFail(fd, buf, count, __FILE__, __LINE__)
#define taosLSeek(fd, offset, whence) taosLSeekRandomFail(fd, offset, whence, __FILE__, __LINE__)
#endif
#endif
// TAOS_OS_FUNC_FILE_GETTMPFILEPATH
void
taosGetTmpfilePath
(
const
char
*
fileNamePrefix
,
char
*
dstPath
);
...
...
src/os/inc/osSocket.h
浏览文件 @
ff25a2c4
...
...
@@ -33,21 +33,19 @@ extern "C" {
x = FD_INITIALIZER; \
} \
}
typedef
int
SOCKET
;
typedef
int
32_t
SOCKET
;
#endif
#ifndef TAOS_OS_DEF_EPOLL
#define TAOS_EPOLL_WAIT_TIME -1
#endif
#define taosClose(x) taosCloseSocket(x)
#ifdef TAOS_RANDOM_NETWORK_FAIL
#ifdef TAOS_RANDOM_NETWORK_FAIL_TEST
ssize_t
taosSendRandomFail
(
int
sockfd
,
const
void
*
buf
,
size_t
len
,
in
t
flags
);
ssize_t
taosSendToRandomFail
(
int
sockfd
,
const
void
*
buf
,
size_t
len
,
in
t
flags
,
const
struct
sockaddr
*
dest_addr
,
socklen_t
addrlen
);
ssize_t
taosReadSocketRandomFail
(
int
fd
,
void
*
buf
,
size_t
count
);
ssize_t
taosWriteSocketRandomFail
(
int
fd
,
const
void
*
buf
,
size_t
count
);
ssize_t
taosSendRandomFail
(
int
32_t
sockfd
,
const
void
*
buf
,
size_t
len
,
int32_
t
flags
);
ssize_t
taosSendToRandomFail
(
int
32_t
sockfd
,
const
void
*
buf
,
size_t
len
,
int32_
t
flags
,
const
struct
sockaddr
*
dest_addr
,
socklen_t
addrlen
);
ssize_t
taosReadSocketRandomFail
(
int
32_t
fd
,
void
*
buf
,
size_t
count
);
ssize_t
taosWriteSocketRandomFail
(
int
32_t
fd
,
const
void
*
buf
,
size_t
count
);
#undef taosSend
#undef taosSendto
#undef taosReadSocket
...
...
@@ -60,14 +58,14 @@ extern "C" {
#endif
// TAOS_OS_FUNC_SOCKET
int
taosSetNonblocking
(
SOCKET
sock
,
in
t
on
);
void
taosBlockSIGPIPE
();
int
32_t
taosSetNonblocking
(
SOCKET
sock
,
int32_
t
on
);
void
taosBlockSIGPIPE
();
// TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
int
taosSetSockOpt
(
SOCKET
socketfd
,
int
level
,
int
optname
,
void
*
optval
,
in
t
optlen
);
int
32_t
taosSetSockOpt
(
SOCKET
socketfd
,
int32_t
level
,
int32_t
optname
,
void
*
optval
,
int32_
t
optlen
);
// TAOS_OS_FUNC_SOCKET_INET
uint32_t
taosInetAddr
(
char
*
ipAddr
);
uint32_t
taosInetAddr
(
char
*
ipAddr
);
const
char
*
taosInetNtoa
(
struct
in_addr
ipInt
);
#ifdef __cplusplus
...
...
src/
util/src/ta
lloc.c
→
src/
os/src/detail/osA
lloc.c
浏览文件 @
ff25a2c4
...
...
@@ -17,10 +17,10 @@
#include "os.h"
#include "taoserror.h"
#include "tulog.h"
#include "
ta
lloc.h"
#include "
osA
lloc.h"
#define TSDB_HAVE_MEMALIGN
#if
ndef TSDB_USE_SYS_MEM
#if
def TAOS_OS_FUNC_ALLOC
void
*
tmalloc
(
int32_t
size
)
{
void
*
p
=
malloc
(
size
);
...
...
src/os/src/detail/osFail.c
0 → 100644
浏览文件 @
ff25a2c4
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#define RANDOM_NETWORK_FAIL_FACTOR 20
#ifdef TAOS_RANDOM_NETWORK_FAIL
ssize_t
taosSendRandomFail
(
int32_t
sockfd
,
const
void
*
buf
,
size_t
len
,
int32_t
flags
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
ECONNRESET
;
return
-
1
;
}
return
send
(
sockfd
,
buf
,
len
,
flags
);
}
ssize_t
taosSendToRandomFail
(
int32_t
sockfd
,
const
void
*
buf
,
size_t
len
,
int32_t
flags
,
const
struct
sockaddr
*
dest_addr
,
socklen_t
addrlen
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
ECONNRESET
;
return
-
1
;
}
return
sendto
(
sockfd
,
buf
,
len
,
flags
,
dest_addr
,
addrlen
);
}
ssize_t
taosReadSocketRandomFail
(
int32_t
fd
,
void
*
buf
,
size_t
count
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
ECONNRESET
;
return
-
1
;
}
return
read
(
fd
,
buf
,
count
);
}
ssize_t
taosWriteSocketRandomFail
(
int32_t
fd
,
const
void
*
buf
,
size_t
count
)
{
if
(
rand
()
%
RANDOM_NETWORK_FAIL_FACTOR
==
0
)
{
errno
=
EINTR
;
return
-
1
;
}
return
write
(
fd
,
buf
,
count
);
}
#endif //TAOS_RANDOM_NETWORK_FAIL
#ifdef TAOS_RANDOM_FILE_FAIL
static
int32_t
random_file_fail_factor
=
20
;
static
FILE
*
fpRandomFileFailOutput
=
NULL
;
void
taosSetRandomFileFailFactor
(
int32_t
factor
)
{
random_file_fail_factor
=
factor
;
}
static
void
close_random_file_fail_output
()
{
if
(
fpRandomFileFailOutput
!=
NULL
)
{
if
(
fpRandomFileFailOutput
!=
stdout
)
{
fclose
(
fpRandomFileFailOutput
);
}
fpRandomFileFailOutput
=
NULL
;
}
}
static
void
random_file_fail_output_sig
(
int32_t
sig
)
{
fprintf
(
fpRandomFileFailOutput
,
"signal %d received.
\n
"
,
sig
);
struct
sigaction
act
=
{
0
};
act
.
sa_handler
=
SIG_DFL
;
sigaction
(
sig
,
&
act
,
NULL
);
close_random_file_fail_output
();
exit
(
EXIT_FAILURE
);
}
void
taosSetRandomFileFailOutput
(
const
char
*
path
)
{
if
(
path
==
NULL
)
{
fpRandomFileFailOutput
=
stdout
;
}
else
if
((
fpRandomFileFailOutput
=
fopen
(
path
,
"w"
))
!=
NULL
)
{
atexit
(
close_random_file_fail_output
);
}
else
{
printf
(
"failed to open random file fail log file '%s', errno=%d
\n
"
,
path
,
errno
);
return
;
}
struct
sigaction
act
=
{
0
};
act
.
sa_handler
=
random_file_fail_output_sig
;
sigaction
(
SIGFPE
,
&
act
,
NULL
);
sigaction
(
SIGSEGV
,
&
act
,
NULL
);
sigaction
(
SIGILL
,
&
act
,
NULL
);
}
ssize_t
taosReadFileRandomFail
(
int32_t
fd
,
void
*
buf
,
size_t
count
,
const
char
*
file
,
uint32_t
line
)
{
if
(
random_file_fail_factor
>
0
)
{
if
(
rand
()
%
random_file_fail_factor
==
0
)
{
errno
=
EIO
;
return
-
1
;
}
}
return
taosRead
(
fd
,
buf
,
count
);
}
ssize_t
taosWriteFileRandomFail
(
int32_t
fd
,
void
*
buf
,
size_t
count
,
const
char
*
file
,
uint32_t
line
)
{
if
(
random_file_fail_factor
>
0
)
{
if
(
rand
()
%
random_file_fail_factor
==
0
)
{
errno
=
EIO
;
return
-
1
;
}
}
return
taosWrite
(
fd
,
buf
,
count
);
}
off_t
taosLSeekRandomFail
(
int32_t
fd
,
off_t
offset
,
int32_t
whence
,
const
char
*
file
,
uint32_t
line
)
{
if
(
random_file_fail_factor
>
0
)
{
if
(
rand
()
%
random_file_fail_factor
==
0
)
{
errno
=
EIO
;
return
-
1
;
}
}
return
taosLSeek
(
fd
,
offset
,
whence
);
}
#endif //TAOS_RANDOM_FILE_FAIL
src/os/src/detail/osSocket.c
浏览文件 @
ff25a2c4
...
...
@@ -19,8 +19,8 @@
#ifndef TAOS_OS_FUNC_SOCKET
int
taosSetNonblocking
(
SOCKET
sock
,
in
t
on
)
{
int
flags
=
0
;
int
32_t
taosSetNonblocking
(
SOCKET
sock
,
int32_
t
on
)
{
int
32_t
flags
=
0
;
if
((
flags
=
fcntl
(
sock
,
F_GETFL
,
0
))
<
0
)
{
uError
(
"fcntl(F_GETFL) error: %d (%s)
\n
"
,
errno
,
strerror
(
errno
));
return
1
;
...
...
@@ -43,7 +43,7 @@ void taosBlockSIGPIPE() {
sigset_t
signal_mask
;
sigemptyset
(
&
signal_mask
);
sigaddset
(
&
signal_mask
,
SIGPIPE
);
int
rc
=
pthread_sigmask
(
SIG_BLOCK
,
&
signal_mask
,
NULL
);
int
32_t
rc
=
pthread_sigmask
(
SIG_BLOCK
,
&
signal_mask
,
NULL
);
if
(
rc
!=
0
)
{
uError
(
"failed to block SIGPIPE"
);
}
...
...
@@ -53,7 +53,7 @@ void taosBlockSIGPIPE() {
#ifndef TAOS_OS_FUNC_SOCKET_SETSOCKETOPT
int
taosSetSockOpt
(
SOCKET
socketfd
,
int
level
,
int
optname
,
void
*
optval
,
in
t
optlen
)
{
int
32_t
taosSetSockOpt
(
SOCKET
socketfd
,
int32_t
level
,
int32_t
optname
,
void
*
optval
,
int32_
t
optlen
)
{
return
setsockopt
(
socketfd
,
level
,
optname
,
optval
,
(
socklen_t
)
optlen
);
}
...
...
src/os/src/windows/wSocket.c
浏览文件 @
ff25a2c4
...
...
@@ -34,7 +34,7 @@ void taosWinSocketInit() {
}
}
int
taosSetNonblocking
(
SOCKET
sock
,
in
t
on
)
{
int
32_t
taosSetNonblocking
(
SOCKET
sock
,
int32_
t
on
)
{
u_long
mode
;
if
(
on
)
{
mode
=
1
;
...
...
@@ -48,7 +48,7 @@ int taosSetNonblocking(SOCKET sock, int on) {
void
taosBlockSIGPIPE
()
{}
int
taosSetSockOpt
(
SOCKET
socketfd
,
int
level
,
int
optname
,
void
*
optval
,
in
t
optlen
)
{
int
32_t
taosSetSockOpt
(
SOCKET
socketfd
,
int32_t
level
,
int32_t
optname
,
void
*
optval
,
int32_
t
optlen
)
{
if
(
level
==
SOL_SOCKET
&&
optname
==
TCP_KEEPCNT
)
{
return
0
;
}
...
...
@@ -72,7 +72,7 @@ int taosSetSockOpt(SOCKET socketfd, int level, int optname, void *optval, int op
uint32_t
taosInetAddr
(
char
*
ipAddr
)
{
uint32_t
value
;
int
ret
=
inet_pton
(
AF_INET
,
ipAddr
,
&
value
);
int
32_t
ret
=
inet_pton
(
AF_INET
,
ipAddr
,
&
value
);
if
(
ret
<=
0
)
{
return
INADDR_NONE
;
}
else
{
...
...
src/util/inc/tsocket.h
浏览文件 @
ff25a2c4
...
...
@@ -20,21 +20,21 @@
extern
"C"
{
#endif
int
taosReadn
(
SOCKET
sock
,
char
*
buffer
,
in
t
len
);
int
taosWriteMsg
(
SOCKET
fd
,
void
*
ptr
,
in
t
nbytes
);
int
taosReadMsg
(
SOCKET
fd
,
void
*
ptr
,
in
t
nbytes
);
int
taosNonblockwrite
(
SOCKET
fd
,
char
*
ptr
,
in
t
nbytes
);
int
taosCopyFds
(
SOCKET
sfd
,
SOCKET
dfd
,
int64_t
len
);
int
taosSetNonblocking
(
SOCKET
sock
,
in
t
on
);
int
32_t
taosReadn
(
SOCKET
sock
,
char
*
buffer
,
int32_
t
len
);
int
32_t
taosWriteMsg
(
SOCKET
fd
,
void
*
ptr
,
int32_
t
nbytes
);
int
32_t
taosReadMsg
(
SOCKET
fd
,
void
*
ptr
,
int32_
t
nbytes
);
int
32_t
taosNonblockwrite
(
SOCKET
fd
,
char
*
ptr
,
int32_
t
nbytes
);
int
32_t
taosCopyFds
(
SOCKET
sfd
,
SOCKET
dfd
,
int64_t
len
);
int
32_t
taosSetNonblocking
(
SOCKET
sock
,
int32_
t
on
);
SOCKET
taosOpenUdpSocket
(
uint32_t
localIp
,
uint16_t
localPort
);
SOCKET
taosOpenTcpClientSocket
(
uint32_t
ip
,
uint16_t
port
,
uint32_t
localIp
);
SOCKET
taosOpenTcpServerSocket
(
uint32_t
ip
,
uint16_t
port
);
int
taosKeepTcpAlive
(
SOCKET
sockFd
);
SOCKET
taosOpenUdpSocket
(
uint32_t
localIp
,
uint16_t
localPort
);
SOCKET
taosOpenTcpClientSocket
(
uint32_t
ip
,
uint16_t
port
,
uint32_t
localIp
);
SOCKET
taosOpenTcpServerSocket
(
uint32_t
ip
,
uint16_t
port
);
int
32_t
taosKeepTcpAlive
(
SOCKET
sockFd
);
int
taosGetFqdn
(
char
*
);
int
32_t
taosGetFqdn
(
char
*
);
uint32_t
taosGetIpFromFqdn
(
const
char
*
);
void
tinet_ntoa
(
char
*
ipstr
,
u
nsigned
in
t
ip
);
void
tinet_ntoa
(
char
*
ipstr
,
u
int32_
t
ip
);
uint32_t
ip2uint
(
const
char
*
const
ip_addr
);
#ifdef __cplusplus
...
...
src/util/src/tsocket.c
浏览文件 @
ff25a2c4
...
...
@@ -18,7 +18,7 @@
#include "tsocket.h"
#include "taoserror.h"
int
taosGetFqdn
(
char
*
fqdn
)
{
int
32_t
taosGetFqdn
(
char
*
fqdn
)
{
char
hostname
[
1024
];
hostname
[
1023
]
=
'\0'
;
if
(
gethostname
(
hostname
,
1023
)
==
-
1
)
{
...
...
@@ -26,10 +26,10 @@ int taosGetFqdn(char *fqdn) {
return
-
1
;
}
struct
addrinfo
hints
=
{
0
};
struct
addrinfo
hints
=
{
0
};
struct
addrinfo
*
result
=
NULL
;
hints
.
ai_flags
=
AI_CANONNAME
;
int
ret
=
getaddrinfo
(
hostname
,
NULL
,
&
hints
,
&
result
);
int
32_t
ret
=
getaddrinfo
(
hostname
,
NULL
,
&
hints
,
&
result
);
if
(
!
result
)
{
uError
(
"failed to get fqdn, code:%d, reason:%s"
,
ret
,
gai_strerror
(
ret
));
return
-
1
;
...
...
@@ -49,10 +49,10 @@ uint32_t taosGetIpFromFqdn(const char *fqdn) {
int32_t
ret
=
getaddrinfo
(
fqdn
,
NULL
,
&
hints
,
&
result
);
if
(
result
)
{
struct
sockaddr
*
sa
=
result
->
ai_addr
;
struct
sockaddr_in
*
si
=
(
struct
sockaddr_in
*
)
sa
;
struct
in_addr
ia
=
si
->
sin_addr
;
uint32_t
ip
=
ia
.
s_addr
;
struct
sockaddr
*
sa
=
result
->
ai_addr
;
struct
sockaddr_in
*
si
=
(
struct
sockaddr_in
*
)
sa
;
struct
in_addr
ia
=
si
->
sin_addr
;
uint32_t
ip
=
ia
.
s_addr
;
freeaddrinfo
(
result
);
return
ip
;
}
else
{
...
...
@@ -70,7 +70,7 @@ uint32_t taosGetIpFromFqdn(const char *fqdn) {
}
}
// Function converting an IP address string to an u
nsigned in
t.
// Function converting an IP address string to an u
int32_
t.
uint32_t
ip2uint
(
const
char
*
const
ip_addr
)
{
char
ip_addr_cpy
[
20
];
char
ip
[
5
];
...
...
@@ -81,7 +81,7 @@ uint32_t ip2uint(const char *const ip_addr) {
s_start
=
ip_addr_cpy
;
s_end
=
ip_addr_cpy
;
int
k
;
int
32_t
k
;
for
(
k
=
0
;
*
s_start
!=
'\0'
;
s_start
=
s_end
)
{
for
(
s_end
=
s_start
;
*
s_end
!=
'.'
&&
*
s_end
!=
'\0'
;
s_end
++
)
{
...
...
@@ -95,17 +95,17 @@ uint32_t ip2uint(const char *const ip_addr) {
ip
[
k
]
=
'\0'
;
return
*
((
u
nsigned
in
t
*
)
ip
);
return
*
((
u
int32_
t
*
)
ip
);
}
int
taosWriteMsg
(
SOCKET
fd
,
void
*
buf
,
in
t
nbytes
)
{
int
nleft
,
nwritten
;
char
*
ptr
=
(
char
*
)
buf
;
int
32_t
taosWriteMsg
(
SOCKET
fd
,
void
*
buf
,
int32_
t
nbytes
)
{
int
32_t
nleft
,
nwritten
;
char
*
ptr
=
(
char
*
)
buf
;
nleft
=
nbytes
;
while
(
nleft
>
0
)
{
nwritten
=
(
int
)
taosWriteSocket
(
fd
,
(
char
*
)
ptr
,
(
size_t
)
nleft
);
nwritten
=
(
int
32_t
)
taosWriteSocket
(
fd
,
(
char
*
)
ptr
,
(
size_t
)
nleft
);
if
(
nwritten
<=
0
)
{
if
(
errno
==
EINTR
)
continue
;
...
...
@@ -120,16 +120,16 @@ int taosWriteMsg(SOCKET fd, void *buf, int nbytes) {
return
(
nbytes
-
nleft
);
}
int
taosReadMsg
(
SOCKET
fd
,
void
*
buf
,
in
t
nbytes
)
{
int
nleft
,
nread
;
char
*
ptr
=
(
char
*
)
buf
;
int
32_t
taosReadMsg
(
SOCKET
fd
,
void
*
buf
,
int32_
t
nbytes
)
{
int
32_t
nleft
,
nread
;
char
*
ptr
=
(
char
*
)
buf
;
nleft
=
nbytes
;
if
(
fd
<
0
)
return
-
1
;
while
(
nleft
>
0
)
{
nread
=
(
int
)
taosReadSocket
(
fd
,
ptr
,
(
size_t
)
nleft
);
nread
=
(
int
32_t
)
taosReadSocket
(
fd
,
ptr
,
(
size_t
)
nleft
);
if
(
nread
==
0
)
{
break
;
}
else
if
(
nread
<
0
)
{
...
...
@@ -147,11 +147,11 @@ int taosReadMsg(SOCKET fd, void *buf, int nbytes) {
return
(
nbytes
-
nleft
);
}
int
taosNonblockwrite
(
SOCKET
fd
,
char
*
ptr
,
in
t
nbytes
)
{
int
32_t
taosNonblockwrite
(
SOCKET
fd
,
char
*
ptr
,
int32_
t
nbytes
)
{
taosSetNonblocking
(
fd
,
1
);
int
nleft
,
nwritten
,
nready
;
fd_set
fset
;
int
32_t
nleft
,
nwritten
,
nready
;
fd_set
fset
;
struct
timeval
tv
;
nleft
=
nbytes
;
...
...
@@ -160,7 +160,7 @@ int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes) {
tv
.
tv_usec
=
0
;
FD_ZERO
(
&
fset
);
FD_SET
(
fd
,
&
fset
);
if
((
nready
=
select
((
int
)(
fd
+
1
),
NULL
,
&
fset
,
NULL
,
&
tv
))
==
0
)
{
if
((
nready
=
select
((
int
32_t
)(
fd
+
1
),
NULL
,
&
fset
,
NULL
,
&
tv
))
==
0
)
{
errno
=
ETIMEDOUT
;
uError
(
"fd %d timeout, no enough space to write"
,
fd
);
break
;
...
...
@@ -172,7 +172,7 @@ int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes) {
return
-
1
;
}
nwritten
=
(
int
)
taosSend
(
fd
,
ptr
,
(
size_t
)
nleft
,
MSG_NOSIGNAL
);
nwritten
=
(
int
32_t
)
taosSend
(
fd
,
ptr
,
(
size_t
)
nleft
,
MSG_NOSIGNAL
);
if
(
nwritten
<=
0
)
{
if
(
errno
==
EAGAIN
||
errno
==
EINTR
)
continue
;
...
...
@@ -189,10 +189,10 @@ int taosNonblockwrite(SOCKET fd, char *ptr, int nbytes) {
return
(
nbytes
-
nleft
);
}
int
taosReadn
(
SOCKET
fd
,
char
*
ptr
,
in
t
nbytes
)
{
int
nread
,
nready
,
nleft
=
nbytes
;
int
32_t
taosReadn
(
SOCKET
fd
,
char
*
ptr
,
int32_
t
nbytes
)
{
int
32_t
nread
,
nready
,
nleft
=
nbytes
;
fd_set
fset
;
fd_set
fset
;
struct
timeval
tv
;
while
(
nleft
>
0
)
{
...
...
@@ -200,7 +200,7 @@ int taosReadn(SOCKET fd, char *ptr, int nbytes) {
tv
.
tv_usec
=
0
;
FD_ZERO
(
&
fset
);
FD_SET
(
fd
,
&
fset
);
if
((
nready
=
select
((
int
)(
fd
+
1
),
NULL
,
&
fset
,
NULL
,
&
tv
))
==
0
)
{
if
((
nready
=
select
((
int
32_t
)(
fd
+
1
),
NULL
,
&
fset
,
NULL
,
&
tv
))
==
0
)
{
errno
=
ETIMEDOUT
;
uError
(
"fd %d timeout
\n
"
,
fd
);
break
;
...
...
@@ -210,7 +210,7 @@ int taosReadn(SOCKET fd, char *ptr, int nbytes) {
return
-
1
;
}
if
((
nread
=
(
int
)
taosReadSocket
(
fd
,
ptr
,
(
size_t
)
nleft
))
<
0
)
{
if
((
nread
=
(
int
32_t
)
taosReadSocket
(
fd
,
ptr
,
(
size_t
)
nleft
))
<
0
)
{
if
(
errno
==
EINTR
)
continue
;
uError
(
"read error, %d (%s)"
,
errno
,
strerror
(
errno
));
return
-
1
;
...
...
@@ -229,8 +229,8 @@ int taosReadn(SOCKET fd, char *ptr, int nbytes) {
SOCKET
taosOpenUdpSocket
(
uint32_t
ip
,
uint16_t
port
)
{
struct
sockaddr_in
localAddr
;
SOCKET
sockFd
;
int
bufSize
=
1024000
;
SOCKET
sockFd
;
int
32_t
bufSize
=
1024000
;
uDebug
(
"open udp socket:0x%x:%hu"
,
ip
,
port
);
...
...
@@ -239,7 +239,7 @@ SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) {
localAddr
.
sin_addr
.
s_addr
=
ip
;
localAddr
.
sin_port
=
(
uint16_t
)
htons
(
port
);
if
((
sockFd
=
(
int
)
socket
(
AF_INET
,
SOCK_DGRAM
,
0
))
<=
2
)
{
if
((
sockFd
=
(
int
32_t
)
socket
(
AF_INET
,
SOCK_DGRAM
,
0
))
<=
2
)
{
uError
(
"failed to open udp socket: %d (%s)"
,
errno
,
strerror
(
errno
));
taosCloseSocketNoCheck
(
sockFd
);
return
-
1
;
...
...
@@ -268,9 +268,9 @@ SOCKET taosOpenUdpSocket(uint32_t ip, uint16_t port) {
}
SOCKET
taosOpenTcpClientSocket
(
uint32_t
destIp
,
uint16_t
destPort
,
uint32_t
clientIp
)
{
SOCKET
sockFd
=
0
;
SOCKET
sockFd
=
0
;
int32_t
ret
;
struct
sockaddr_in
serverAddr
,
clientAddr
;
int
ret
;
sockFd
=
socket
(
PF_INET
,
SOCK_STREAM
,
IPPROTO_TCP
);
...
...
@@ -281,7 +281,7 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
}
/* set REUSEADDR option, so the portnumber can be re-used */
int
reuse
=
1
;
int
32_t
reuse
=
1
;
if
(
taosSetSockOpt
(
sockFd
,
SOL_SOCKET
,
SO_REUSEADDR
,
(
void
*
)
&
reuse
,
sizeof
(
reuse
))
<
0
)
{
uError
(
"setsockopt SO_REUSEADDR failed: %d (%s)"
,
errno
,
strerror
(
errno
));
taosCloseSocket
(
sockFd
);
...
...
@@ -296,8 +296,8 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
/* bind socket to client address */
if
(
bind
(
sockFd
,
(
struct
sockaddr
*
)
&
clientAddr
,
sizeof
(
clientAddr
))
<
0
)
{
uError
(
"bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)"
,
clientIp
,
destIp
,
destPort
,
strerror
(
errno
));
uError
(
"bind tcp client socket failed, client(0x%x:0), dest(0x%x:%d), reason:(%s)"
,
clientIp
,
destIp
,
destPort
,
strerror
(
errno
));
taosCloseSocket
(
sockFd
);
return
-
1
;
}
...
...
@@ -311,7 +311,7 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
ret
=
connect
(
sockFd
,
(
struct
sockaddr
*
)
&
serverAddr
,
sizeof
(
serverAddr
));
if
(
ret
!=
0
)
{
//uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
//
uError("failed to connect socket, ip:0x%x, port:%hu(%s)", destIp, destPort, strerror(errno));
taosCloseSocket
(
sockFd
);
sockFd
=
-
1
;
}
else
{
...
...
@@ -321,36 +321,36 @@ SOCKET taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clie
return
sockFd
;
}
int
taosKeepTcpAlive
(
SOCKET
sockFd
)
{
int
alive
=
1
;
int
32_t
taosKeepTcpAlive
(
SOCKET
sockFd
)
{
int
32_t
alive
=
1
;
if
(
taosSetSockOpt
(
sockFd
,
SOL_SOCKET
,
SO_KEEPALIVE
,
(
void
*
)
&
alive
,
sizeof
(
alive
))
<
0
)
{
uError
(
"fd:%d setsockopt SO_KEEPALIVE failed: %d (%s)"
,
sockFd
,
errno
,
strerror
(
errno
));
taosCloseSocket
(
sockFd
);
return
-
1
;
}
int
probes
=
3
;
int
32_t
probes
=
3
;
if
(
taosSetSockOpt
(
sockFd
,
SOL_TCP
,
TCP_KEEPCNT
,
(
void
*
)
&
probes
,
sizeof
(
probes
))
<
0
)
{
uError
(
"fd:%d setsockopt SO_KEEPCNT failed: %d (%s)"
,
sockFd
,
errno
,
strerror
(
errno
));
taosCloseSocket
(
sockFd
);
return
-
1
;
}
int
alivetime
=
10
;
int
32_t
alivetime
=
10
;
if
(
taosSetSockOpt
(
sockFd
,
SOL_TCP
,
TCP_KEEPIDLE
,
(
void
*
)
&
alivetime
,
sizeof
(
alivetime
))
<
0
)
{
uError
(
"fd:%d setsockopt SO_KEEPIDLE failed: %d (%s)"
,
sockFd
,
errno
,
strerror
(
errno
));
taosCloseSocket
(
sockFd
);
return
-
1
;
}
int
interval
=
3
;
int
32_t
interval
=
3
;
if
(
taosSetSockOpt
(
sockFd
,
SOL_TCP
,
TCP_KEEPINTVL
,
(
void
*
)
&
interval
,
sizeof
(
interval
))
<
0
)
{
uError
(
"fd:%d setsockopt SO_KEEPINTVL failed: %d (%s)"
,
sockFd
,
errno
,
strerror
(
errno
));
taosCloseSocket
(
sockFd
);
return
-
1
;
}
int
nodelay
=
1
;
int
32_t
nodelay
=
1
;
if
(
taosSetSockOpt
(
sockFd
,
IPPROTO_TCP
,
TCP_NODELAY
,
(
void
*
)
&
nodelay
,
sizeof
(
nodelay
))
<
0
)
{
uError
(
"fd:%d setsockopt TCP_NODELAY failed %d (%s)"
,
sockFd
,
errno
,
strerror
(
errno
));
taosCloseSocket
(
sockFd
);
...
...
@@ -371,8 +371,8 @@ int taosKeepTcpAlive(SOCKET sockFd) {
SOCKET
taosOpenTcpServerSocket
(
uint32_t
ip
,
uint16_t
port
)
{
struct
sockaddr_in
serverAdd
;
SOCKET
sockFd
;
int
reuse
;
SOCKET
sockFd
;
int
32_t
reuse
;
uDebug
(
"open tcp server socket:0x%x:%hu"
,
ip
,
port
);
...
...
@@ -381,7 +381,7 @@ SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
serverAdd
.
sin_addr
.
s_addr
=
ip
;
serverAdd
.
sin_port
=
(
uint16_t
)
htons
(
port
);
if
((
sockFd
=
(
int
)
socket
(
AF_INET
,
SOCK_STREAM
,
IPPROTO_TCP
))
<=
2
)
{
if
((
sockFd
=
(
int
32_t
)
socket
(
AF_INET
,
SOCK_STREAM
,
IPPROTO_TCP
))
<=
2
)
{
uError
(
"failed to open TCP socket: %d (%s)"
,
errno
,
strerror
(
errno
));
taosCloseSocketNoCheck
(
sockFd
);
return
-
1
;
...
...
@@ -417,38 +417,38 @@ SOCKET taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
return
sockFd
;
}
void
tinet_ntoa
(
char
*
ipstr
,
u
nsigned
in
t
ip
)
{
void
tinet_ntoa
(
char
*
ipstr
,
u
int32_
t
ip
)
{
sprintf
(
ipstr
,
"%d.%d.%d.%d"
,
ip
&
0xFF
,
(
ip
>>
8
)
&
0xFF
,
(
ip
>>
16
)
&
0xFF
,
ip
>>
24
);
}
#define COPY_SIZE 32768
// sendfile shall be used
int
taosCopyFds
(
SOCKET
sfd
,
SOCKET
dfd
,
int64_t
len
)
{
int
32_t
taosCopyFds
(
SOCKET
sfd
,
SOCKET
dfd
,
int64_t
len
)
{
int64_t
leftLen
;
int
readLen
,
writeLen
;
int
32_t
readLen
,
writeLen
;
char
temp
[
COPY_SIZE
];
leftLen
=
len
;
while
(
leftLen
>
0
)
{
if
(
leftLen
<
COPY_SIZE
)
readLen
=
(
int
)
leftLen
;
readLen
=
(
int
32_t
)
leftLen
;
else
readLen
=
COPY_SIZE
;
// 4K
int
retLen
=
taosReadMsg
(
sfd
,
temp
,
(
in
t
)
readLen
);
int
32_t
retLen
=
taosReadMsg
(
sfd
,
temp
,
(
int32_
t
)
readLen
);
if
(
readLen
!=
retLen
)
{
uError
(
"read error, readLen:%d retLen:%d len:%"
PRId64
" leftLen:%"
PRId64
", reason:%s"
,
readLen
,
retLen
,
len
,
leftLen
,
strerror
(
errno
));
uError
(
"read error, readLen:%d retLen:%d len:%"
PRId64
" leftLen:%"
PRId64
", reason:%s"
,
readLen
,
retLen
,
len
,
leftLen
,
strerror
(
errno
));
return
-
1
;
}
writeLen
=
taosWriteMsg
(
dfd
,
temp
,
readLen
);
if
(
readLen
!=
writeLen
)
{
uError
(
"copy error, readLen:%d writeLen:%d len:%"
PRId64
" leftLen:%"
PRId64
", reason:%s"
,
readLen
,
writeLen
,
len
,
leftLen
,
strerror
(
errno
));
uError
(
"copy error, readLen:%d writeLen:%d len:%"
PRId64
" leftLen:%"
PRId64
", reason:%s"
,
readLen
,
writeLen
,
len
,
leftLen
,
strerror
(
errno
));
return
-
1
;
}
...
...
src/wal/src/walMgmt.c
浏览文件 @
ff25a2c4
...
...
@@ -16,7 +16,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "taoserror.h"
#include "talloc.h"
#include "tref.h"
#include "twal.h"
#include "walInt.h"
...
...
src/wal/src/walWrite.c
浏览文件 @
ff25a2c4
...
...
@@ -15,7 +15,6 @@
#define _DEFAULT_SOURCE
#include "os.h"
#include "talloc.h"
#include "taoserror.h"
#include "tchecksum.h"
#include "twal.h"
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录