Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
bc2a109b
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
bc2a109b
编写于
6月 05, 2020
作者:
陶建辉(Jeff)
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix the coveraity scan bugs
上级
55ee7868
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
89 addition
and
61 deletion
+89
-61
src/rpc/src/rpcCache.c
src/rpc/src/rpcCache.c
+1
-1
src/rpc/src/rpcMain.c
src/rpc/src/rpcMain.c
+7
-7
src/rpc/src/rpcTcp.c
src/rpc/src/rpcTcp.c
+3
-3
src/rpc/src/rpcUdp.c
src/rpc/src/rpcUdp.c
+2
-2
src/rpc/test/rclient.c
src/rpc/test/rclient.c
+3
-2
src/rpc/test/rsclient.c
src/rpc/test/rsclient.c
+2
-1
src/util/src/tsocket.c
src/util/src/tsocket.c
+11
-8
src/wal/src/walMain.c
src/wal/src/walMain.c
+60
-37
未找到文件。
src/rpc/src/rpcCache.c
浏览文件 @
bc2a109b
...
...
@@ -127,7 +127,7 @@ void rpcAddConnIntoCache(void *handle, void *data, char *fqdn, uint16_t port, in
hash
=
rpcHashConn
(
pCache
,
fqdn
,
port
,
connType
);
pNode
=
(
SConnHash
*
)
taosMemPoolMalloc
(
pCache
->
connHashMemPool
);
strcpy
(
pNode
->
fqdn
,
fqdn
);
tstrncpy
(
pNode
->
fqdn
,
fqdn
,
sizeof
(
pNode
->
fqdn
)
);
pNode
->
port
=
port
;
pNode
->
connType
=
connType
;
pNode
->
data
=
data
;
...
...
src/rpc/src/rpcMain.c
浏览文件 @
bc2a109b
...
...
@@ -60,7 +60,7 @@ typedef struct {
void
*
idPool
;
// handle to ID pool
void
*
tmrCtrl
;
// handle to timer
void
*
hash
;
// handle returned by hash utility
SHashObj
*
hash
;
// handle returned by hash utility
void
*
tcphandle
;
// returned handle from TCP initialization
void
*
udphandle
;
// returned handle from UDP initialization
void
*
pCache
;
// connection cache
...
...
@@ -211,7 +211,7 @@ void *rpcOpen(const SRpcInit *pInit) {
pRpc
=
(
SRpcInfo
*
)
calloc
(
1
,
sizeof
(
SRpcInfo
));
if
(
pRpc
==
NULL
)
return
NULL
;
if
(
pInit
->
label
)
strcpy
(
pRpc
->
label
,
pInit
->
label
);
if
(
pInit
->
label
)
tstrncpy
(
pRpc
->
label
,
pInit
->
label
,
sizeof
(
pRpc
->
label
)
);
pRpc
->
connType
=
pInit
->
connType
;
pRpc
->
idleTime
=
pInit
->
idleTime
;
pRpc
->
numOfThreads
=
pInit
->
numOfThreads
>
TSDB_MAX_RPC_THREADS
?
TSDB_MAX_RPC_THREADS
:
pInit
->
numOfThreads
;
...
...
@@ -228,7 +228,7 @@ void *rpcOpen(const SRpcInit *pInit) {
size_t
size
=
sizeof
(
SRpcConn
)
*
pRpc
->
sessions
;
pRpc
->
connList
=
(
SRpcConn
*
)
calloc
(
1
,
size
);
if
(
pRpc
->
connList
==
NULL
)
{
tError
(
"%s failed to allocate memory for taos connections, size:%d"
,
pRpc
->
label
,
size
);
tError
(
"%s failed to allocate memory for taos connections, size:%
l
d"
,
pRpc
->
label
,
size
);
rpcClose
(
pRpc
);
return
NULL
;
}
...
...
@@ -459,7 +459,7 @@ int rpcGetConnInfo(void *thandle, SRpcConnInfo *pInfo) {
pInfo
->
clientPort
=
pConn
->
peerPort
;
// pInfo->serverIp = pConn->destIp;
str
cpy
(
pInfo
->
user
,
pConn
->
user
);
str
ncpy
(
pInfo
->
user
,
pConn
->
user
,
sizeof
(
pInfo
->
user
)
);
return
0
;
}
...
...
@@ -503,10 +503,10 @@ static SRpcConn *rpcOpenConn(SRpcInfo *pRpc, char *peerFqdn, uint16_t peerPort,
pConn
=
rpcAllocateClientConn
(
pRpc
);
if
(
pConn
)
{
strcpy
(
pConn
->
peerFqdn
,
peerFqdn
);
tstrncpy
(
pConn
->
peerFqdn
,
peerFqdn
,
sizeof
(
pConn
->
peerFqdn
)
);
pConn
->
peerIp
=
peerIp
;
pConn
->
peerPort
=
peerPort
;
strcpy
(
pConn
->
user
,
pRpc
->
user
);
tstrncpy
(
pConn
->
user
,
pRpc
->
user
,
sizeof
(
pConn
->
user
)
);
pConn
->
connType
=
connType
;
if
(
taosOpenConn
[
connType
])
{
...
...
@@ -804,7 +804,7 @@ static SRpcConn *rpcProcessMsgHead(SRpcInfo *pRpc, SRecvInfo *pRecv) {
pConn
=
rpcGetConnObj
(
pRpc
,
sid
,
pRecv
);
if
(
pConn
==
NULL
)
{
tTrace
(
"%s %p, failed to get connection obj(%s)"
,
pRpc
->
label
,
pHead
->
ahandle
,
tstrerror
(
terrno
));
tTrace
(
"%s %p, failed to get connection obj(%s)"
,
pRpc
->
label
,
(
void
*
)
pHead
->
ahandle
,
tstrerror
(
terrno
));
return
NULL
;
}
else
{
if
(
rpcIsReq
(
pHead
->
msgType
))
{
...
...
src/rpc/src/rpcTcp.c
浏览文件 @
bc2a109b
...
...
@@ -73,7 +73,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pServerObj
=
(
SServerObj
*
)
calloc
(
sizeof
(
SServerObj
),
1
);
pServerObj
->
ip
=
ip
;
pServerObj
->
port
=
port
;
strcpy
(
pServerObj
->
label
,
label
);
tstrncpy
(
pServerObj
->
label
,
label
,
sizeof
(
pServerObj
->
label
)
);
pServerObj
->
numOfThreads
=
numOfThreads
;
pServerObj
->
pThreadObj
=
(
SThreadObj
*
)
calloc
(
sizeof
(
SThreadObj
),
numOfThreads
);
...
...
@@ -87,7 +87,7 @@ void *taosInitTcpServer(uint32_t ip, uint16_t port, char *label, int numOfThread
pThreadObj
=
pServerObj
->
pThreadObj
;
for
(
int
i
=
0
;
i
<
numOfThreads
;
++
i
)
{
pThreadObj
->
processData
=
fp
;
strcpy
(
pThreadObj
->
label
,
label
);
tstrncpy
(
pThreadObj
->
label
,
label
,
sizeof
(
pThreadObj
->
label
)
);
pThreadObj
->
shandle
=
shandle
;
code
=
pthread_mutex_init
(
&
(
pThreadObj
->
mutex
),
NULL
);
...
...
@@ -247,7 +247,7 @@ void *taosInitTcpClient(uint32_t ip, uint16_t port, char *label, int num, void *
pThreadObj
=
(
SThreadObj
*
)
malloc
(
sizeof
(
SThreadObj
));
memset
(
pThreadObj
,
0
,
sizeof
(
SThreadObj
));
strcpy
(
pThreadObj
->
label
,
label
);
tstrncpy
(
pThreadObj
->
label
,
label
,
sizeof
(
pThreadObj
->
label
)
);
pThreadObj
->
ip
=
ip
;
pThreadObj
->
shandle
=
shandle
;
...
...
src/rpc/src/rpcUdp.c
浏览文件 @
bc2a109b
...
...
@@ -72,7 +72,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
pSet
->
port
=
port
;
pSet
->
shandle
=
shandle
;
pSet
->
fp
=
fp
;
strcpy
(
pSet
->
label
,
label
);
tstrncpy
(
pSet
->
label
,
label
,
sizeof
(
pSet
->
label
)
);
uint16_t
ownPort
;
for
(
int
i
=
0
;
i
<
threads
;
++
i
)
{
...
...
@@ -99,7 +99,7 @@ void *taosInitUdpConnection(uint32_t ip, uint16_t port, char *label, int threads
pConn
->
localPort
=
(
uint16_t
)
ntohs
(
sin
.
sin_port
);
}
strcpy
(
pConn
->
label
,
label
);
tstrncpy
(
pConn
->
label
,
label
,
sizeof
(
pConn
->
label
)
);
pConn
->
shandle
=
shandle
;
pConn
->
processData
=
fp
;
pConn
->
index
=
i
;
...
...
src/rpc/test/rclient.c
浏览文件 @
bc2a109b
...
...
@@ -76,6 +76,7 @@ int main(int argc, char *argv[]) {
int
numOfReqs
=
0
;
int
appThreads
=
1
;
char
serverIp
[
40
]
=
"127.0.0.1"
;
char
secret
[
TSDB_KEY_LEN
]
=
"mypassword"
;
struct
timeval
systemTime
;
int64_t
startTime
,
endTime
;
pthread_attr_t
thattr
;
...
...
@@ -97,7 +98,7 @@ int main(int argc, char *argv[]) {
rpcInit
.
sessions
=
100
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
user
=
"michael"
;
rpcInit
.
secret
=
"mypassword"
;
rpcInit
.
secret
=
secret
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
spi
=
1
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
...
...
@@ -106,7 +107,7 @@ int main(int argc, char *argv[]) {
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
&&
i
<
argc
-
1
)
{
ipSet
.
port
[
0
]
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-i"
)
==
0
&&
i
<
argc
-
1
)
{
tstrncpy
(
ipSet
.
fqdn
[
0
],
argv
[
++
i
],
sizeof
(
ipSet
.
fqdn
));
tstrncpy
(
ipSet
.
fqdn
[
0
],
argv
[
++
i
],
sizeof
(
ipSet
.
fqdn
[
0
]
));
}
else
if
(
strcmp
(
argv
[
i
],
"-t"
)
==
0
&&
i
<
argc
-
1
)
{
rpcInit
.
numOfThreads
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-m"
)
==
0
&&
i
<
argc
-
1
)
{
...
...
src/rpc/test/rsclient.c
浏览文件 @
bc2a109b
...
...
@@ -77,6 +77,7 @@ int main(int argc, char *argv[]) {
int
numOfReqs
=
0
;
int
appThreads
=
1
;
char
serverIp
[
40
]
=
"127.0.0.1"
;
char
secret
[
TSDB_KEY_LEN
]
=
"mypassword"
;
struct
timeval
systemTime
;
int64_t
startTime
,
endTime
;
pthread_attr_t
thattr
;
...
...
@@ -98,7 +99,7 @@ int main(int argc, char *argv[]) {
rpcInit
.
sessions
=
100
;
rpcInit
.
idleTime
=
tsShellActivityTimer
*
1000
;
rpcInit
.
user
=
"michael"
;
rpcInit
.
secret
=
"mypassword"
;
rpcInit
.
secret
=
secret
;
rpcInit
.
ckey
=
"key"
;
rpcInit
.
spi
=
1
;
rpcInit
.
connType
=
TAOS_CONN_CLIENT
;
...
...
src/util/src/tsocket.c
浏览文件 @
bc2a109b
...
...
@@ -19,6 +19,7 @@
#include "tutil.h"
int
taosGetFqdn
(
char
*
fqdn
)
{
int
code
=
0
;
char
hostname
[
1024
];
hostname
[
1023
]
=
'\0'
;
gethostname
(
hostname
,
1023
);
...
...
@@ -27,13 +28,15 @@ int taosGetFqdn(char *fqdn) {
h
=
gethostbyname
(
hostname
);
if
(
h
!=
NULL
)
{
strcpy
(
fqdn
,
h
->
h_name
);
return
0
;
}
else
{
uError
(
"failed to get host name
"
);
return
-
1
;
uError
(
"failed to get host name
(%s)"
,
strerror
(
errno
)
);
code
=
-
1
;
}
free
(
h
);
// to do: free the resources
// free(h);
return
code
;
}
uint32_t
taosGetIpFromFqdn
(
const
char
*
fqdn
)
{
...
...
@@ -47,7 +50,7 @@ uint32_t ip2uint(const char *const ip_addr) {
char
ip_addr_cpy
[
20
];
char
ip
[
5
];
strcpy
(
ip_addr_cpy
,
ip_addr
);
tstrncpy
(
ip_addr_cpy
,
ip_addr
,
sizeof
(
ip_addr_cpy
)
);
char
*
s_start
,
*
s_end
;
s_start
=
ip_addr_cpy
;
...
...
@@ -206,7 +209,7 @@ int taosOpenUdpSocket(uint32_t ip, uint16_t port) {
int
reuse
,
nocheck
;
int
bufSize
=
8192000
;
uTrace
(
"open udp socket:
%s
:%hu"
,
ip
,
port
);
uTrace
(
"open udp socket:
0x%x
:%hu"
,
ip
,
port
);
memset
((
char
*
)
&
localAddr
,
0
,
sizeof
(
localAddr
));
localAddr
.
sin_family
=
AF_INET
;
...
...
@@ -257,7 +260,7 @@ int taosOpenUdpSocket(uint32_t ip, uint16_t port) {
/* bind socket to local address */
if
(
bind
(
sockFd
,
(
struct
sockaddr
*
)
&
localAddr
,
sizeof
(
localAddr
))
<
0
)
{
uError
(
"failed to bind udp socket: %d (%s),
%s
:%hu"
,
errno
,
strerror
(
errno
),
ip
,
port
);
uError
(
"failed to bind udp socket: %d (%s),
0x%x
:%hu"
,
errno
,
strerror
(
errno
),
ip
,
port
);
taosCloseSocket
(
sockFd
);
return
-
1
;
}
...
...
@@ -363,7 +366,7 @@ int taosOpenTcpServerSocket(uint32_t ip, uint16_t port) {
int
sockFd
;
int
reuse
;
uTrace
(
"open tcp server socket:
%s
:%hu"
,
ip
,
port
);
uTrace
(
"open tcp server socket:
0x%x
:%hu"
,
ip
,
port
);
bzero
((
char
*
)
&
serverAdd
,
sizeof
(
serverAdd
));
serverAdd
.
sin_family
=
AF_INET
;
...
...
src/wal/src/walMain.c
浏览文件 @
bc2a109b
...
...
@@ -68,11 +68,19 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
pWal
->
num
=
0
;
pWal
->
level
=
pCfg
->
walLevel
;
pWal
->
keep
=
pCfg
->
keep
;
strcpy
(
pWal
->
path
,
path
);
tstrncpy
(
pWal
->
path
,
path
,
sizeof
(
pWal
->
path
)
);
pthread_mutex_init
(
&
pWal
->
mutex
,
NULL
);
if
(
access
(
path
,
F_OK
)
!=
0
)
mkdir
(
path
,
0755
);
if
(
access
(
path
,
F_OK
)
!=
0
)
{
if
(
mkdir
(
path
,
0755
)
!=
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"wal:%s, failed to create directory(%s)"
,
path
,
strerror
(
errno
));
pthread_mutex_destroy
(
&
pWal
->
mutex
);
free
(
pWal
);
pWal
=
NULL
;
}
}
if
(
pCfg
->
keep
==
1
)
return
pWal
;
if
(
walHandleExistingFiles
(
path
)
==
0
)
...
...
@@ -80,7 +88,7 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
if
(
pWal
->
fd
<
0
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
wError
(
"wal:%s, failed to open
"
,
path
);
wError
(
"wal:%s, failed to open
(%s)"
,
path
,
strerror
(
errno
)
);
pthread_mutex_destroy
(
&
pWal
->
mutex
);
free
(
pWal
);
pWal
=
NULL
;
...
...
@@ -119,7 +127,8 @@ void walClose(void *handle) {
int
walRenew
(
void
*
handle
)
{
if
(
handle
==
NULL
)
return
0
;
SWal
*
pWal
=
handle
;
int
code
=
0
;
terrno
=
0
;
pthread_mutex_lock
(
&
pWal
->
mutex
);
...
...
@@ -135,8 +144,8 @@ int walRenew(void *handle) {
pWal
->
fd
=
open
(
pWal
->
name
,
O_WRONLY
|
O_CREAT
,
S_IRWXU
|
S_IRWXG
|
S_IRWXO
);
if
(
pWal
->
fd
<
0
)
{
wError
(
"wal:%
d
, failed to open(%s)"
,
pWal
->
name
,
strerror
(
errno
));
code
=
-
1
;
wError
(
"wal:%
s
, failed to open(%s)"
,
pWal
->
name
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
)
;
}
else
{
wTrace
(
"wal:%s, it is created"
,
pWal
->
name
);
...
...
@@ -156,14 +165,15 @@ int walRenew(void *handle) {
pthread_mutex_unlock
(
&
pWal
->
mutex
);
return
code
;
return
terrno
;
}
int
walWrite
(
void
*
handle
,
SWalHead
*
pHead
)
{
SWal
*
pWal
=
handle
;
int
code
=
0
;
if
(
pWal
==
NULL
)
return
-
1
;
terrno
=
0
;
// no wal
if
(
pWal
->
level
==
TAOS_WAL_NOLOG
)
return
0
;
if
(
pHead
->
version
<=
pWal
->
version
)
return
0
;
...
...
@@ -174,12 +184,12 @@ int walWrite(void *handle, SWalHead *pHead) {
if
(
write
(
pWal
->
fd
,
pHead
,
contLen
)
!=
contLen
)
{
wError
(
"wal:%s, failed to write(%s)"
,
pWal
->
name
,
strerror
(
errno
));
code
=
-
1
;
terrno
=
TAOS_SYSTEM_ERROR
(
errno
)
;
}
else
{
pWal
->
version
=
pHead
->
version
;
}
return
code
;
return
terrno
;
}
void
walFsync
(
void
*
handle
)
{
...
...
@@ -196,11 +206,11 @@ void walFsync(void *handle) {
int
walRestore
(
void
*
handle
,
void
*
pVnode
,
int
(
*
writeFp
)(
void
*
,
void
*
,
int
))
{
SWal
*
pWal
=
handle
;
int
code
=
0
;
struct
dirent
*
ent
;
int
count
=
0
;
uint32_t
maxId
=
0
,
minId
=
-
1
,
index
=
0
;
terrno
=
0
;
int
plen
=
strlen
(
walPrefix
);
char
opath
[
TSDB_FILENAME_LEN
+
5
];
...
...
@@ -224,30 +234,30 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
closedir
(
dir
);
if
(
count
==
0
)
{
if
(
pWal
->
keep
)
code
=
walRenew
(
pWal
);
return
code
;
if
(
pWal
->
keep
)
terrno
=
walRenew
(
pWal
);
return
terrno
;
}
if
(
count
!=
(
maxId
-
minId
+
1
)
)
{
wError
(
"wal:%s, messed up, count:%d max:%d min:%d"
,
opath
,
count
,
maxId
,
minId
);
code
=
-
1
;
terrno
=
TAOS_SYSTEM_ERROR
(
TSDB_CODE_APP_ERROR
)
;
}
else
{
wTrace
(
"wal:%s, %d files will be restored"
,
opath
,
count
);
for
(
index
=
minId
;
index
<=
maxId
;
++
index
)
{
sprintf
(
pWal
->
name
,
"%s/%s%d"
,
opath
,
walPrefix
,
index
);
code
=
walRestoreWalFile
(
pWal
,
pVnode
,
writeFp
);
if
(
code
<
0
)
break
;
terrno
=
walRestoreWalFile
(
pWal
,
pVnode
,
writeFp
);
if
(
terrno
<
0
)
break
;
}
}
if
(
code
==
0
)
{
if
(
terrno
==
0
)
{
if
(
pWal
->
keep
==
0
)
{
code
=
walRemoveWalFiles
(
opath
);
if
(
code
==
0
)
{
terrno
=
walRemoveWalFiles
(
opath
);
if
(
terrno
==
0
)
{
if
(
remove
(
opath
)
<
0
)
{
wError
(
"wal:%s, failed to remove directory(%s)"
,
opath
,
strerror
(
errno
));
code
=
-
1
;
terrno
=
TAOS_SYSTEM_ERROR
(
errno
)
;
}
}
}
else
{
...
...
@@ -258,12 +268,12 @@ int walRestore(void *handle, void *pVnode, int (*writeFp)(void *, void *, int))
pWal
->
fd
=
open
(
pWal
->
name
,
O_WRONLY
|
O_CREAT
|
O_APPEND
,
S_IRWXU
|
S_IRWXG
|
S_IRWXO
);
if
(
pWal
->
fd
<
0
)
{
wError
(
"wal:%s, failed to open file(%s)"
,
pWal
->
name
,
strerror
(
errno
));
code
=
-
1
;
terrno
=
TAOS_SYSTEM_ERROR
(
errno
)
;
}
}
}
return
code
;
return
terrno
;
}
int
walGetWalFile
(
void
*
handle
,
char
*
name
,
uint32_t
*
index
)
{
...
...
@@ -292,40 +302,47 @@ int walGetWalFile(void *handle, char *name, uint32_t *index) {
}
static
int
walRestoreWalFile
(
SWal
*
pWal
,
void
*
pVnode
,
FWalWrite
writeFp
)
{
int
code
=
0
;
char
*
name
=
pWal
->
name
;
terrno
=
0
;
char
*
buffer
=
malloc
(
1024000
);
// size for one record
if
(
buffer
==
NULL
)
return
-
1
;
if
(
buffer
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
terrno
;
}
SWalHead
*
pHead
=
(
SWalHead
*
)
buffer
;
int
fd
=
open
(
name
,
O_RDONLY
);
if
(
fd
<
0
)
{
wError
(
"wal:%s, failed to open for restore(%s)"
,
name
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
free
(
buffer
);
return
-
1
;
return
terrno
;
}
wTrace
(
"wal:%s, start to restore"
,
name
);
while
(
1
)
{
int
ret
=
read
(
fd
,
pHead
,
sizeof
(
SWalHead
));
if
(
ret
==
0
)
{
code
=
0
;
break
;}
if
(
ret
==
0
)
break
;
if
(
ret
!=
sizeof
(
SWalHead
))
{
wWarn
(
"wal:%s, failed to read head, skip, ret:%d(%s)"
,
name
,
ret
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
break
;
}
if
(
!
taosCheckChecksumWhole
((
uint8_t
*
)
pHead
,
sizeof
(
SWalHead
)))
{
wWarn
(
"wal:%s, cksum is messed up, skip the rest of file"
,
name
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
break
;
}
ret
=
read
(
fd
,
pHead
->
cont
,
pHead
->
len
);
if
(
ret
!=
pHead
->
len
)
{
wWarn
(
"wal:%s, failed to read body, skip, len:%d ret:%d"
,
name
,
pHead
->
len
,
ret
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
break
;
}
...
...
@@ -336,11 +353,10 @@ static int walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp) {
close
(
fd
);
free
(
buffer
);
return
code
;
return
terrno
;
}
int
walHandleExistingFiles
(
const
char
*
path
)
{
int
code
=
0
;
char
oname
[
TSDB_FILENAME_LEN
*
3
];
char
nname
[
TSDB_FILENAME_LEN
*
3
];
char
opath
[
TSDB_FILENAME_LEN
];
...
...
@@ -350,6 +366,7 @@ int walHandleExistingFiles(const char *path) {
struct
dirent
*
ent
;
DIR
*
dir
=
opendir
(
path
);
int
plen
=
strlen
(
walPrefix
);
terrno
=
0
;
if
(
access
(
opath
,
F_OK
)
==
0
)
{
// old directory is there, it means restore process is not finished
...
...
@@ -360,13 +377,19 @@ int walHandleExistingFiles(const char *path) {
int
count
=
0
;
while
((
ent
=
readdir
(
dir
))
!=
NULL
)
{
if
(
strncmp
(
ent
->
d_name
,
walPrefix
,
plen
)
==
0
)
{
if
(
access
(
opath
,
F_OK
)
!=
0
)
mkdir
(
opath
,
0755
);
sprintf
(
oname
,
"%s/%s"
,
path
,
ent
->
d_name
);
sprintf
(
nname
,
"%s/old/%s"
,
path
,
ent
->
d_name
);
if
(
access
(
opath
,
F_OK
)
!=
0
)
{
if
(
mkdir
(
opath
,
0755
)
!=
0
)
{
wError
(
"wal:%s, failed to create directory:%s(%s)"
,
oname
,
opath
,
strerror
(
errno
));
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
break
;
}
}
if
(
rename
(
oname
,
nname
)
<
0
)
{
wError
(
"wal:%s, failed to move to new:%s"
,
oname
,
nname
);
code
=
-
1
;
terrno
=
TAOS_SYSTEM_ERROR
(
errno
)
;
break
;
}
...
...
@@ -378,14 +401,14 @@ int walHandleExistingFiles(const char *path) {
}
closedir
(
dir
);
return
code
;
return
terrno
;
}
static
int
walRemoveWalFiles
(
const
char
*
path
)
{
int
plen
=
strlen
(
walPrefix
);
char
name
[
TSDB_FILENAME_LEN
*
3
];
int
code
=
0
;
terrno
=
0
;
if
(
access
(
path
,
F_OK
)
!=
0
)
return
0
;
struct
dirent
*
ent
;
...
...
@@ -396,13 +419,13 @@ static int walRemoveWalFiles(const char *path) {
sprintf
(
name
,
"%s/%s"
,
path
,
ent
->
d_name
);
if
(
remove
(
name
)
<
0
)
{
wError
(
"wal:%s, failed to remove(%s)"
,
name
,
strerror
(
errno
));
code
=
-
1
;
break
;
terrno
=
TAOS_SYSTEM_ERROR
(
errno
)
;
}
}
}
closedir
(
dir
);
return
code
;
return
terrno
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录