Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
516cfe5c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
516cfe5c
编写于
12月 26, 2022
作者:
M
Minglei Jin
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'main' into fix/TD-21498
上级
3ea9fadf
43561bfb
变更
13
隐藏空白更改
内联
并排
Showing
13 changed file
with
150 addition
and
94 deletion
+150
-94
include/util/taoserror.h
include/util/taoserror.h
+1
-0
include/util/tdef.h
include/util/tdef.h
+1
-1
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+4
-3
source/dnode/vnode/src/vnd/vnodeSnapshot.c
source/dnode/vnode/src/vnd/vnodeSnapshot.c
+2
-2
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+2
-2
source/libs/sync/src/syncPipeline.c
source/libs/sync/src/syncPipeline.c
+1
-1
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+12
-12
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+91
-62
source/libs/tdb/src/db/tdbPager.c
source/libs/tdb/src/db/tdbPager.c
+23
-3
source/libs/tdb/src/db/tdbTable.c
source/libs/tdb/src/db/tdbTable.c
+2
-2
source/util/src/terror.c
source/util/src/terror.c
+1
-0
tests/script/sh/deploy.sh
tests/script/sh/deploy.sh
+1
-0
tests/script/test.sh
tests/script/test.sh
+9
-6
未找到文件。
include/util/taoserror.h
浏览文件 @
516cfe5c
...
...
@@ -517,6 +517,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_STANDBY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0912)
#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913)
#define TSDB_CODE_SYN_RESTORING TAOS_DEF_ERROR_CODE(0, 0x0914)
#define TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG TAOS_DEF_ERROR_CODE(0, 0x0915) // internal
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq
...
...
include/util/tdef.h
浏览文件 @
516cfe5c
...
...
@@ -254,7 +254,7 @@ typedef enum ELogicConditionType {
#define TSDB_EP_LEN (TSDB_FQDN_LEN + 6)
#define TSDB_IPv4ADDR_LEN 16
#define TSDB_FILENAME_LEN 128
#define TSDB_SHOW_SQL_LEN
1024
#define TSDB_SHOW_SQL_LEN
2048
#define TSDB_SLOW_QUERY_SQL_LEN 512
#define TSDB_SHOW_SUBQUERY_LEN 1000
...
...
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
516cfe5c
...
...
@@ -268,10 +268,11 @@ _exit:
if
(
NULL
!=
pInfo
)
{
taosMemoryFree
(
pInfo
);
}
vError
(
"vgId:%d,
%s failed since %s, commit id:%"
PRId64
,
TD_VID
(
pVnode
),
__func__
,
tstrerror
(
code
),
vError
(
"vgId:%d,
vnode async commit failed since %s, commitId:%"
PRId64
,
TD_VID
(
pVnode
)
,
tstrerror
(
code
),
pVnode
->
state
.
commitID
);
}
else
{
vDebug
(
"vgId:%d, %s done"
,
TD_VID
(
pVnode
),
__func__
);
vInfo
(
"vgId:%d, vnode async commit done, commitId:%"
PRId64
" term:%"
PRId64
" applied:%"
PRId64
,
TD_VID
(
pVnode
),
pVnode
->
state
.
commitID
,
pVnode
->
state
.
applyTerm
,
pVnode
->
state
.
applied
);
}
return
code
;
}
...
...
@@ -290,7 +291,7 @@ static int vnodeCommitImpl(SCommitInfo *pInfo) {
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
SVnode
*
pVnode
=
pInfo
->
pVnode
;
vInfo
(
"vgId:%d, start to commit, commit
ID
:%"
PRId64
" version:%"
PRId64
" term: %"
PRId64
,
TD_VID
(
pVnode
),
vInfo
(
"vgId:%d, start to commit, commit
Id
:%"
PRId64
" version:%"
PRId64
" term: %"
PRId64
,
TD_VID
(
pVnode
),
pVnode
->
state
.
commitID
,
pVnode
->
state
.
applied
,
pVnode
->
state
.
applyTerm
);
// persist wal before starting
...
...
source/dnode/vnode/src/vnd/vnodeSnapshot.c
浏览文件 @
516cfe5c
...
...
@@ -423,8 +423,8 @@ int32_t vnodeSnapWrite(SVSnapWriter *pWriter, uint8_t *pData, uint32_t nData) {
ASSERT
(
pHdr
->
index
==
pWriter
->
index
+
1
);
pWriter
->
index
=
pHdr
->
index
;
v
Info
(
"vgId:%d, vnode snapshot write data, index:%"
PRId64
" type:%d nData
:%d"
,
TD_VID
(
pVnode
),
pHdr
->
index
,
pHdr
->
type
,
nData
);
v
Debug
(
"vgId:%d, vnode snapshot write data, index:%"
PRId64
" type:%d blockLen
:%d"
,
TD_VID
(
pVnode
),
pHdr
->
index
,
pHdr
->
type
,
nData
);
switch
(
pHdr
->
type
)
{
case
SNAP_DATA_CFG
:
{
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
516cfe5c
...
...
@@ -465,9 +465,9 @@ static int32_t vnodeSnapshotStopWrite(const SSyncFSM *pFsm, void *pWriter, bool
static
int32_t
vnodeSnapshotDoWrite
(
const
SSyncFSM
*
pFsm
,
void
*
pWriter
,
void
*
pBuf
,
int32_t
len
)
{
SVnode
*
pVnode
=
pFsm
->
data
;
vDebug
(
"vgId:%d, continue write vnode snapshot,
l
en:%d"
,
pVnode
->
config
.
vgId
,
len
);
vDebug
(
"vgId:%d, continue write vnode snapshot,
blockL
en:%d"
,
pVnode
->
config
.
vgId
,
len
);
int32_t
code
=
vnodeSnapWrite
(
pWriter
,
pBuf
,
len
);
vDebug
(
"vgId:%d, continue write vnode snapshot finished,
l
en:%d"
,
pVnode
->
config
.
vgId
,
len
);
vDebug
(
"vgId:%d, continue write vnode snapshot finished,
blockL
en:%d"
,
pVnode
->
config
.
vgId
,
len
);
return
code
;
}
...
...
source/libs/sync/src/syncPipeline.c
浏览文件 @
516cfe5c
...
...
@@ -112,7 +112,7 @@ SyncTerm syncLogReplMgrGetPrevLogTerm(SSyncLogReplMgr* pMgr, SSyncNode* pNode, S
return
prevLogTerm
;
}
s
Error
(
"vgId:%d, failed to get log term since %s. index: %"
PRId64
""
,
pNode
->
vgId
,
terrstr
(),
prevIndex
);
s
Info
(
"vgId:%d, failed to get log term since %s. index:%"
PRId64
,
pNode
->
vgId
,
terrstr
(),
prevIndex
);
terrno
=
TSDB_CODE_WAL_LOG_NOT_EXIST
;
return
-
1
;
}
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
516cfe5c
...
...
@@ -115,8 +115,8 @@ static int32_t raftLogRestoreFromSnapshot(struct SSyncLogStore* pLogStore, SyncI
const
char
*
sysErrStr
=
strerror
(
errno
);
sNError
(
pData
->
pSyncNode
,
"wal restore from snapshot error, index:%"
PRId64
", err:
%d %X, msg:%s, syserr:%d, sysmsg:%s"
,
snapshotIndex
,
err
,
err
,
errStr
,
sysErr
,
sysErrStr
);
"wal restore from snapshot error, index:%"
PRId64
", err:
0x%x, msg:%s, syserr:%d, sysmsg:%s"
,
snapshotIndex
,
err
,
errStr
,
sysErr
,
sysErrStr
);
return
-
1
;
}
...
...
@@ -212,8 +212,8 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
int32_t
sysErr
=
errno
;
const
char
*
sysErrStr
=
strerror
(
errno
);
sNError
(
pData
->
pSyncNode
,
"wal write error, index:%"
PRId64
", err:
%d %X
, msg:%s, syserr:%d, sysmsg:%s"
,
pEntry
->
index
,
err
,
err
,
err
Str
,
sysErr
,
sysErrStr
);
sNError
(
pData
->
pSyncNode
,
"wal write error, index:%"
PRId64
", err:
0x%x
, msg:%s, syserr:%d, sysmsg:%s"
,
pEntry
->
index
,
err
,
errStr
,
sysErr
,
sysErrStr
);
return
-
1
;
}
...
...
@@ -257,11 +257,11 @@ int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index, SSyncR
const
char
*
sysErrStr
=
strerror
(
errno
);
if
(
terrno
==
TSDB_CODE_WAL_LOG_NOT_EXIST
)
{
sNTrace
(
pData
->
pSyncNode
,
"wal read not exist, index:%"
PRId64
", err:
%d %X
, msg:%s, syserr:%d, sysmsg:%s"
,
index
,
err
,
err
,
err
Str
,
sysErr
,
sysErrStr
);
sNTrace
(
pData
->
pSyncNode
,
"wal read not exist, index:%"
PRId64
", err:
0x%x
, msg:%s, syserr:%d, sysmsg:%s"
,
index
,
err
,
errStr
,
sysErr
,
sysErrStr
);
}
else
{
sNTrace
(
pData
->
pSyncNode
,
"wal read error, index:%"
PRId64
", err:
%d %X, msg:%s, syserr:%d, sysmsg:%s"
,
index
,
err
,
err
,
err
Str
,
sysErr
,
sysErrStr
);
sNTrace
(
pData
->
pSyncNode
,
"wal read error, index:%"
PRId64
", err:
0x%x, msg:%s, syserr:%d, sysmsg:%s"
,
index
,
err
,
errStr
,
sysErr
,
sysErrStr
);
}
/*
...
...
@@ -341,8 +341,8 @@ static int32_t raftLogTruncate(struct SSyncLogStore* pLogStore, SyncIndex fromIn
const
char
*
errStr
=
tstrerror
(
err
);
int32_t
sysErr
=
errno
;
const
char
*
sysErrStr
=
strerror
(
errno
);
sError
(
"vgId:%d, wal truncate error, from-index:%"
PRId64
", err:
%d %X
, msg:%s, syserr:%d, sysmsg:%s"
,
pData
->
pSyncNode
->
vgId
,
fromIndex
,
err
,
err
,
err
Str
,
sysErr
,
sysErrStr
);
sError
(
"vgId:%d, wal truncate error, from-index:%"
PRId64
", err:
0x%x
, msg:%s, syserr:%d, sysmsg:%s"
,
pData
->
pSyncNode
->
vgId
,
fromIndex
,
err
,
errStr
,
sysErr
,
sysErrStr
);
}
// event log
...
...
@@ -392,8 +392,8 @@ int32_t raftLogUpdateCommitIndex(SSyncLogStore* pLogStore, SyncIndex index) {
const
char
*
errStr
=
tstrerror
(
err
);
int32_t
sysErr
=
errno
;
const
char
*
sysErrStr
=
strerror
(
errno
);
sError
(
"vgId:%d, wal update commit index error, index:%"
PRId64
", err:
%d %X
, msg:%s, syserr:%d, sysmsg:%s"
,
pData
->
pSyncNode
->
vgId
,
index
,
err
,
err
,
err
Str
,
sysErr
,
sysErrStr
);
sError
(
"vgId:%d, wal update commit index error, index:%"
PRId64
", err:
0x%x
, msg:%s, syserr:%d, sysmsg:%s"
,
pData
->
pSyncNode
->
vgId
,
index
,
err
,
errStr
,
sysErr
,
sysErrStr
);
return
-
1
;
}
return
0
;
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
516cfe5c
...
...
@@ -49,7 +49,6 @@ SSyncSnapshotSender *snapshotSenderCreate(SSyncNode *pSyncNode, int32_t replicaI
pSender
->
pSyncNode
->
pFsm
->
FpGetSnapshotInfo
(
pSender
->
pSyncNode
->
pFsm
,
&
pSender
->
snapshot
);
pSender
->
finish
=
false
;
sDebug
(
"vgId:%d, snapshot sender create"
,
pSender
->
pSyncNode
->
vgId
);
return
pSender
;
}
...
...
@@ -294,7 +293,7 @@ int32_t syncNodeStartSnapshot(SSyncNode *pSyncNode, SRaftId *pDestId) {
}
if
(
snapshotSenderIsStart
(
pSender
))
{
sS
Error
(
pSender
,
"snapshot sender already start, ignore"
);
sS
Info
(
pSender
,
"snapshot sender already start, ignore"
);
return
0
;
}
...
...
@@ -523,7 +522,7 @@ static int32_t snapshotReceiverFinish(SSyncSnapshotReceiver *pReceiver, SyncSnap
static
int32_t
snapshotReceiverGotData
(
SSyncSnapshotReceiver
*
pReceiver
,
SyncSnapshotSend
*
pMsg
)
{
if
(
pMsg
->
seq
!=
pReceiver
->
ack
+
1
)
{
sRError
(
pReceiver
,
"snapshot receiver invalid seq, ack:%d seq:%d"
,
pReceiver
->
ack
,
pMsg
->
seq
);
terrno
=
TSDB_CODE_SYN_IN
TERNAL_ERROR
;
terrno
=
TSDB_CODE_SYN_IN
VALID_SNAPSHOT_MSG
;
return
-
1
;
}
...
...
@@ -721,8 +720,12 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS
timeNow
=
taosGetTimestampMs
();
}
int32_t
code
=
0
;
if
(
snapshotReceiverGotData
(
pReceiver
,
pMsg
)
!=
0
)
{
return
-
1
;
code
=
terrno
;
if
(
code
>=
SYNC_SNAPSHOT_SEQ_INVALID
)
{
code
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
}
}
// build msg
...
...
@@ -740,11 +743,11 @@ static int32_t syncNodeOnSnapshotTransfering(SSyncNode *pSyncNode, SyncSnapshotS
pRspMsg
->
lastTerm
=
pMsg
->
lastTerm
;
pRspMsg
->
startTime
=
pReceiver
->
startTime
;
pRspMsg
->
ack
=
pReceiver
->
ack
;
// receiver maybe already closed
pRspMsg
->
code
=
0
;
pRspMsg
->
code
=
code
;
pRspMsg
->
snapBeginIndex
=
pReceiver
->
snapshotParam
.
start
;
// send msg
syncLogSendSyncSnapshotRsp
(
pSyncNode
,
pRspMsg
,
"snapshot receiver receiv
ing
"
);
syncLogSendSyncSnapshotRsp
(
pSyncNode
,
pRspMsg
,
"snapshot receiver receiv
ed
"
);
if
(
syncNodeSendMsgById
(
&
pRspMsg
->
destId
,
pSyncNode
,
&
rpcMsg
)
!=
0
)
{
sRError
(
pReceiver
,
"snapshot receiver send resp failed since %s"
,
terrstr
());
return
-
1
;
...
...
@@ -861,7 +864,7 @@ int32_t syncNodeOnSnapshot(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
syncLogRecvSyncSnapshotSend
(
pSyncNode
,
pMsg
,
"process force stop"
);
snapshotReceiverForceStop
(
pReceiver
);
}
else
if
(
pMsg
->
seq
>
SYNC_SNAPSHOT_SEQ_BEGIN
&&
pMsg
->
seq
<
SYNC_SNAPSHOT_SEQ_END
)
{
syncLogRecvSyncSnapshotSend
(
pSyncNode
,
pMsg
,
"process seq"
);
syncLogRecvSyncSnapshotSend
(
pSyncNode
,
pMsg
,
"process seq
data
"
);
syncNodeOnSnapshotTransfering
(
pSyncNode
,
pMsg
);
}
else
{
// error log
...
...
@@ -976,76 +979,102 @@ int32_t syncNodeOnSnapshotRsp(SSyncNode *pSyncNode, const SRpcMsg *pRpcMsg) {
return
-
1
;
}
// state, term, seq/ack
if
(
pSyncNode
->
state
!=
TAOS_SYNC_STATE_LEADER
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"snapshot sender not leader"
);
sSError
(
pSender
,
"snapshot sender not leader"
);
goto
_ERROR
;
}
if
(
pMsg
->
startTime
!=
pSender
->
startTime
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"sender:% "
PRId64
" receiver:%"
PRId64
" time not match"
);
return
-
1
;
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"snapshot sender and receiver time not match"
);
sSError
(
pSender
,
"sender:%"
PRId64
" receiver:%"
PRId64
" time not match, code:0x%x"
,
pMsg
->
startTime
,
pSender
->
startTime
,
pMsg
->
code
);
goto
_ERROR
;
}
// state, term, seq/ack
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
if
(
pMsg
->
term
==
pSyncNode
->
pRaftStore
->
currentTerm
)
{
// prepare <begin, end>, send begin msg
if
(
pMsg
->
ack
==
SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"process seq pre-snapshot"
);
syncNodeOnSnapshotReplyPre
(
pSyncNode
,
pMsg
);
return
0
;
}
if
(
pMsg
->
term
!=
pSyncNode
->
pRaftStore
->
currentTerm
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"snapshot sender and receiver term not match"
);
sSError
(
pSender
,
"snapshot sender term not equal, msg term:%"
PRId64
" currentTerm:%"
PRId64
,
pMsg
->
term
,
pSyncNode
->
pRaftStore
->
currentTerm
);
goto
_ERROR
;
}
if
(
pMsg
->
ack
==
SYNC_SNAPSHOT_SEQ_BEGIN
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"process seq begin
"
);
if
(
snapshotSenderUpdateProgress
(
pSender
,
pMsg
)
!=
0
)
{
return
-
1
;
}
if
(
pMsg
->
code
!=
0
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"receive error code
"
);
sSError
(
pSender
,
"snapshot sender receive error code:0x%x and stop sender"
,
pMsg
->
code
);
goto
_ERROR
;
}
if
(
snapshotSend
(
pSender
)
!=
0
)
{
return
-
1
;
}
return
0
;
}
// prepare <begin, end>, send begin msg
if
(
pMsg
->
ack
==
SYNC_SNAPSHOT_SEQ_PRE_SNAPSHOT
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"process seq pre-snapshot"
);
syncNodeOnSnapshotReplyPre
(
pSyncNode
,
pMsg
);
return
0
;
}
// receive ack is finish, close sender
if
(
pMsg
->
ack
==
SYNC_SNAPSHOT_SEQ_END
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"process seq end"
);
snapshotSenderStop
(
pSender
,
true
);
SSyncLogReplMgr
*
pMgr
=
syncNodeGetLogReplMgr
(
pSyncNode
,
&
pMsg
->
srcId
);
if
(
pMgr
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"reset repl mgr"
);
syncLogReplMgrReset
(
pMgr
);
}
return
0
;
}
if
(
pMsg
->
ack
==
SYNC_SNAPSHOT_SEQ_BEGIN
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"process seq begin"
);
if
(
snapshotSenderUpdateProgress
(
pSender
,
pMsg
)
!=
0
)
{
return
-
1
;
}
// send next msg
if
(
pMsg
->
ack
==
pSender
->
seq
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"process seq"
);
// update sender ack
if
(
snapshotSenderUpdateProgress
(
pSender
,
pMsg
)
!=
0
)
{
return
-
1
;
}
if
(
snapshotSend
(
pSender
)
!=
0
)
{
return
-
1
;
}
if
(
snapshotSend
(
pSender
)
!=
0
)
{
return
-
1
;
}
return
0
;
}
}
else
if
(
pMsg
->
ack
==
pSender
->
seq
-
1
)
{
// maybe resend
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"process seq and resend"
);
snapshotReSend
(
pSender
);
// receive ack is finish, close sender
if
(
pMsg
->
ack
==
SYNC_SNAPSHOT_SEQ_END
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"process seq end"
);
snapshotSenderStop
(
pSender
,
true
);
SSyncLogReplMgr
*
pMgr
=
syncNodeGetLogReplMgr
(
pSyncNode
,
&
pMsg
->
srcId
);
if
(
pMgr
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"reset repl mgr"
);
syncLogReplMgrReset
(
pMgr
);
}
return
0
;
}
}
else
{
// error log
sSError
(
pSender
,
"snapshot sender recv error ack:%d, my seq:%d"
,
pMsg
->
ack
,
pSender
->
seq
);
return
-
1
;
}
}
else
{
// error log
sSError
(
pSender
,
"snapshot sender term not equal"
);
// send next msg
if
(
pMsg
->
ack
==
pSender
->
seq
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"process seq data"
);
// update sender ack
if
(
snapshotSenderUpdateProgress
(
pSender
,
pMsg
)
!=
0
)
{
return
-
1
;
}
if
(
snapshotSend
(
pSender
)
!=
0
)
{
return
-
1
;
}
}
else
if
(
pMsg
->
ack
==
pSender
->
seq
-
1
)
{
// maybe resend
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"process seq and resend"
);
snapshotReSend
(
pSender
);
}
else
{
// error log
sSError
(
pSender
,
"snapshot sender not leader"
);
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"receive error ack"
);
sSError
(
pSender
,
"snapshot sender receive error ack:%d, my seq:%d"
,
pMsg
->
ack
,
pSender
->
seq
);
snapshotSenderStop
(
pSender
,
true
);
SSyncLogReplMgr
*
pMgr
=
syncNodeGetLogReplMgr
(
pSyncNode
,
&
pMsg
->
srcId
);
if
(
pMgr
)
{
syncLogReplMgrReset
(
pMgr
);
}
return
-
1
;
}
return
0
;
_ERROR:
snapshotSenderStop
(
pSender
,
true
);
SSyncLogReplMgr
*
pMgr
=
syncNodeGetLogReplMgr
(
pSyncNode
,
&
pMsg
->
srcId
);
if
(
pMgr
)
{
syncLogRecvSyncSnapshotRsp
(
pSyncNode
,
pMsg
,
"reset repl mgr"
);
syncLogReplMgrReset
(
pMgr
);
}
return
-
1
;
}
source/libs/tdb/src/db/tdbPager.c
浏览文件 @
516cfe5c
...
...
@@ -869,11 +869,19 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) {
return
-
1
;
}
if
(
tdbOsLSeek
(
jfd
,
0L
,
SEEK_SET
)
<
0
)
{
tdbError
(
"failed to lseek jfd due to %s. file:%s, offset:0"
,
strerror
(
errno
),
pPager
->
dbFileName
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
}
pageBuf
=
tdbOsCalloc
(
1
,
pPager
->
pageSize
);
if
(
pageBuf
==
NULL
)
{
return
-
1
;
}
tdbDebug
(
"pager/restore: %p, %d/%d, txnId:%s"
,
pPager
,
pPager
->
dbOrigSize
,
pPager
->
dbFileSize
,
jFileName
);
for
(
int
pgIndex
=
0
;
pgIndex
<
journalSize
;
++
pgIndex
)
{
// read pgno & the page from journal
SPgno
pgno
;
...
...
@@ -884,6 +892,8 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) {
return
-
1
;
}
tdbTrace
(
"pager/restore: restore pgno:%d,"
,
pgno
);
ret
=
tdbOsRead
(
jfd
,
pageBuf
,
pPager
->
pageSize
);
if
(
ret
<
0
)
{
tdbOsFree
(
pageBuf
);
...
...
@@ -923,7 +933,7 @@ static int tdbPagerRestore(SPager *pPager, const char *jFileName) {
return
-
1
;
}
if
(
tdbOsRemove
(
pPager
->
jFileName
)
<
0
&&
errno
!=
ENOENT
)
{
if
(
tdbOsRemove
(
jFileName
)
<
0
&&
errno
!=
ENOENT
)
{
tdbError
(
"failed to remove file due to %s. jFileName:%s"
,
strerror
(
errno
),
pPager
->
jFileName
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
-
1
;
...
...
@@ -943,7 +953,12 @@ int tdbPagerRestoreJournals(SPager *pPager) {
while
((
pDirEntry
=
tdbReadDir
(
pDir
))
!=
NULL
)
{
char
*
name
=
tdbDirEntryBaseName
(
tdbGetDirEntryName
(
pDirEntry
));
if
(
strncmp
(
TDB_MAINDB_NAME
"-journal"
,
name
,
16
)
==
0
)
{
if
(
tdbPagerRestore
(
pPager
,
name
)
<
0
)
{
char
jname
[
TD_PATH_MAX
]
=
{
0
};
int
dirLen
=
strlen
(
pPager
->
pEnv
->
dbName
);
memcpy
(
jname
,
pPager
->
pEnv
->
dbName
,
dirLen
);
jname
[
dirLen
]
=
'/'
;
memcpy
(
jname
+
dirLen
+
1
,
name
,
strlen
(
name
));
if
(
tdbPagerRestore
(
pPager
,
jname
)
<
0
)
{
tdbCloseDir
(
&
pDir
);
tdbError
(
"failed to restore file due to %s. jFileName:%s"
,
strerror
(
errno
),
name
);
...
...
@@ -969,7 +984,12 @@ int tdbPagerRollback(SPager *pPager) {
char
*
name
=
tdbDirEntryBaseName
(
tdbGetDirEntryName
(
pDirEntry
));
if
(
strncmp
(
TDB_MAINDB_NAME
"-journal"
,
name
,
16
)
==
0
)
{
if
(
tdbOsRemove
(
name
)
<
0
&&
errno
!=
ENOENT
)
{
char
jname
[
TD_PATH_MAX
]
=
{
0
};
int
dirLen
=
strlen
(
pPager
->
pEnv
->
dbName
);
memcpy
(
jname
,
pPager
->
pEnv
->
dbName
,
dirLen
);
jname
[
dirLen
]
=
'/'
;
memcpy
(
jname
+
dirLen
+
1
,
name
,
strlen
(
name
));
if
(
tdbOsRemove
(
jname
)
<
0
&&
errno
!=
ENOENT
)
{
tdbCloseDir
(
&
pDir
);
tdbError
(
"failed to remove file due to %s. jFileName:%s"
,
strerror
(
errno
),
name
);
...
...
source/libs/tdb/src/db/tdbTable.c
浏览文件 @
516cfe5c
...
...
@@ -108,13 +108,13 @@ int tdbTbOpen(const char *tbname, int keyLen, int valLen, tdb_cmpr_fn_t keyCmprF
ASSERT
(
pPager
!=
NULL
);
if
(
rollback
)
{
tdbPagerRollback
(
pPager
);
}
else
{
ret
=
tdbPagerRestoreJournals
(
pPager
);
if
(
ret
<
0
)
{
tdbOsFree
(
pTb
);
return
-
1
;
}
}
else
{
tdbPagerRollback
(
pPager
);
}
// pTb->pBt
...
...
source/util/src/terror.c
浏览文件 @
516cfe5c
...
...
@@ -405,6 +405,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_PROPOSE_NOT_READY, "Sync not ready for pr
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_STANDBY_NOT_READY
,
"Sync not ready for standby"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_BATCH_ERROR
,
"Sync batch error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_RESTORING
,
"Sync is restoring"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_INVALID_SNAPSHOT_MSG
,
"Sync invalid snapshot msg"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_INTERNAL_ERROR
,
"Sync internal error"
)
//tq
...
...
tests/script/sh/deploy.sh
浏览文件 @
516cfe5c
...
...
@@ -134,6 +134,7 @@ echo "mDebugFlag 143" >> $TAOS_CFG
echo
"wDebugFlag 143"
>>
$TAOS_CFG
echo
"sDebugFlag 143"
>>
$TAOS_CFG
echo
"tsdbDebugFlag 143"
>>
$TAOS_CFG
echo
"tdbDebugFlag 143"
>>
$TAOS_CFG
echo
"tqDebugFlag 143"
>>
$TAOS_CFG
echo
"fsDebugFlag 143"
>>
$TAOS_CFG
echo
"idxDebugFlag 143"
>>
$TAOS_CFG
...
...
tests/script/test.sh
浏览文件 @
516cfe5c
...
...
@@ -10,13 +10,11 @@ set +e
#set -x
FILE_NAME
=
RELEASE
=
0
ASYNC
=
0
VALGRIND
=
0
UNIQUE
=
0
TEST
=
0
UNAME_BIN
=
`
which
uname
`
OS_TYPE
=
`
$UNAME_BIN
`
while
getopts
"f:
agvum
"
arg
while
getopts
"f:
tgv
"
arg
do
case
$arg
in
f
)
...
...
@@ -25,8 +23,8 @@ do
v
)
VALGRIND
=
1
;;
u
)
UNIQUE
=
1
t
)
TEST
=
1
;;
g
)
VALGRIND
=
2
...
...
@@ -140,6 +138,11 @@ if [ -n "$FILE_NAME" ]; then
result
=
$?
echo
"Execute result:"
$result
if
[
$TEST
-eq
1
]
;
then
echo
"Exit without check asan errors"
exit
1
fi
if
[
$result
-eq
0
]
;
then
$CODE_DIR
/sh/sigint_stop_dnodes.sh
$CODE_DIR
/sh/checkAsan.sh
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录