Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4db6b043
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4db6b043
编写于
12月 14, 2022
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix: reduce the frequency of retry in sync not ready case while alter db
上级
6a7c1d6b
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
60 addition
and
35 deletion
+60
-35
source/dnode/mnode/impl/inc/mndTrans.h
source/dnode/mnode/impl/inc/mndTrans.h
+1
-0
source/dnode/mnode/impl/src/mndDb.c
source/dnode/mnode/impl/src/mndDb.c
+12
-6
source/dnode/mnode/impl/src/mndTrans.c
source/dnode/mnode/impl/src/mndTrans.c
+15
-3
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+27
-25
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+5
-1
未找到文件。
source/dnode/mnode/impl/inc/mndTrans.h
浏览文件 @
4db6b043
...
...
@@ -75,6 +75,7 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, voi
void
mndTransSetDbName
(
STrans
*
pTrans
,
const
char
*
dbname
,
const
char
*
stbname
);
void
mndTransSetSerial
(
STrans
*
pTrans
);
void
mndTransSetOper
(
STrans
*
pTrans
,
EOperType
oper
);
int32_t
mndTrancCheckConflict
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
int32_t
mndTransPrepare
(
SMnode
*
pMnode
,
STrans
*
pTrans
);
int32_t
mndTransProcessRsp
(
SRpcMsg
*
pRsp
);
...
...
source/dnode/mnode/impl/src/mndDb.c
浏览文件 @
4db6b043
...
...
@@ -776,6 +776,8 @@ static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *p
int32_t
code
=
-
1
;
mndTransSetDbName
(
pTrans
,
pOld
->
name
,
NULL
);
if
(
mndTrancCheckConflict
(
pMnode
,
pTrans
)
!=
0
)
return
-
1
;
if
(
mndSetAlterDbRedoLogs
(
pMnode
,
pTrans
,
pOld
,
pNew
)
!=
0
)
goto
_OVER
;
if
(
mndSetAlterDbCommitLogs
(
pMnode
,
pTrans
,
pOld
,
pNew
)
!=
0
)
goto
_OVER
;
if
(
mndSetAlterDbRedoActions
(
pMnode
,
pTrans
,
pOld
,
pNew
)
!=
0
)
goto
_OVER
;
...
...
@@ -835,12 +837,14 @@ static int32_t mndProcessAlterDbReq(SRpcMsg *pReq) {
_OVER:
if
(
code
!=
0
&&
code
!=
TSDB_CODE_ACTION_IN_PROGRESS
)
{
if
(
terrno
!=
0
)
code
=
terrno
;
mError
(
"db:%s, failed to alter since %s"
,
alterReq
.
db
,
terrstr
());
}
mndReleaseDb
(
pMnode
,
pDb
);
taosArrayDestroy
(
dbObj
.
cfg
.
pRetensions
);
terrno
=
code
;
return
code
;
}
...
...
@@ -1183,7 +1187,8 @@ int32_t mndExtractDbInfo(SMnode *pMnode, SDbObj *pDb, SUseDbRsp *pRsp, const SUs
int32_t
numOfTable
=
mndGetDBTableNum
(
pDb
,
pMnode
);
if
(
pReq
==
NULL
||
pReq
->
vgVersion
<
pDb
->
vgVersion
||
pReq
->
dbId
!=
pDb
->
uid
||
numOfTable
!=
pReq
->
numOfTable
||
pReq
->
stateTs
<
pDb
->
stateTs
)
{
if
(
pReq
==
NULL
||
pReq
->
vgVersion
<
pDb
->
vgVersion
||
pReq
->
dbId
!=
pDb
->
uid
||
numOfTable
!=
pReq
->
numOfTable
||
pReq
->
stateTs
<
pDb
->
stateTs
)
{
mndBuildDBVgroupInfo
(
pDb
,
pMnode
,
pRsp
->
pVgroupInfos
);
}
...
...
@@ -1298,21 +1303,22 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
SUseDbRsp
usedbRsp
=
{
0
};
if
((
0
==
strcasecmp
(
pDbVgVersion
->
dbFName
,
TSDB_INFORMATION_SCHEMA_DB
)
||
(
0
==
strcasecmp
(
pDbVgVersion
->
dbFName
,
TSDB_PERFORMANCE_SCHEMA_DB
))))
{
if
((
0
==
strcasecmp
(
pDbVgVersion
->
dbFName
,
TSDB_INFORMATION_SCHEMA_DB
)
||
(
0
==
strcasecmp
(
pDbVgVersion
->
dbFName
,
TSDB_PERFORMANCE_SCHEMA_DB
))))
{
memcpy
(
usedbRsp
.
db
,
pDbVgVersion
->
dbFName
,
TSDB_DB_FNAME_LEN
);
int32_t
vgVersion
=
mndGetGlobalVgroupVersion
(
pMnode
);
if
(
pDbVgVersion
->
vgVersion
<
vgVersion
)
{
usedbRsp
.
pVgroupInfos
=
taosArrayInit
(
10
,
sizeof
(
SVgroupInfo
));
mndBuildDBVgroupInfo
(
NULL
,
pMnode
,
usedbRsp
.
pVgroupInfos
);
usedbRsp
.
vgVersion
=
vgVersion
++
;
}
else
{
usedbRsp
.
vgVersion
=
pDbVgVersion
->
vgVersion
;
}
usedbRsp
.
vgNum
=
taosArrayGetSize
(
usedbRsp
.
pVgroupInfos
);
taosArrayPush
(
batchUseRsp
.
pArray
,
&
usedbRsp
);
continue
;
}
...
...
@@ -1328,7 +1334,7 @@ int32_t mndValidateDbInfo(SMnode *pMnode, SDbVgVersion *pDbs, int32_t numOfDbs,
int32_t
numOfTable
=
mndGetDBTableNum
(
pDb
,
pMnode
);
if
(
pDbVgVersion
->
vgVersion
>=
pDb
->
vgVersion
&&
numOfTable
==
pDbVgVersion
->
numOfTable
&&
if
(
pDbVgVersion
->
vgVersion
>=
pDb
->
vgVersion
&&
numOfTable
==
pDbVgVersion
->
numOfTable
&&
pDbVgVersion
->
stateTs
==
pDb
->
stateTs
)
{
mTrace
(
"db:%s, valid dbinfo, vgVersion:%d stateTs:%"
PRId64
" numOfTables:%d, not changed vgVersion:%d stateTs:%"
PRId64
" numOfTables:%d"
,
...
...
source/dnode/mnode/impl/src/mndTrans.c
浏览文件 @
4db6b043
...
...
@@ -838,7 +838,7 @@ static bool mndCheckTransConflict(SMnode *pMnode, STrans *pNew) {
return
conflict
;
}
int32_t
mndTran
sPrepare
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
int32_t
mndTran
cCheckConflict
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
pTrans
->
conflict
==
TRN_CONFLICT_DB
||
pTrans
->
conflict
==
TRN_CONFLICT_DB_INSIDE
)
{
if
(
strlen
(
pTrans
->
dbname
)
==
0
&&
strlen
(
pTrans
->
stbname
)
==
0
)
{
terrno
=
TSDB_CODE_MND_TRANS_CONFLICT
;
...
...
@@ -853,6 +853,14 @@ int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans) {
return
-
1
;
}
return
0
;
}
int32_t
mndTransPrepare
(
SMnode
*
pMnode
,
STrans
*
pTrans
)
{
if
(
mndTrancCheckConflict
(
pMnode
,
pTrans
)
!=
0
)
{
return
-
1
;
}
if
(
taosArrayGetSize
(
pTrans
->
commitActions
)
<=
0
)
{
terrno
=
TSDB_CODE_MND_TRANS_CLOG_IS_NULL
;
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
...
...
@@ -1027,6 +1035,7 @@ int32_t mndTransProcessRsp(SRpcMsg *pRsp) {
if
(
pAction
!=
NULL
)
{
pAction
->
msgReceived
=
1
;
pAction
->
errCode
=
pRsp
->
code
;
pTrans
->
lastErrorNo
=
pRsp
->
code
;
}
mInfo
(
"trans:%d, %s:%d response is received, code:0x%x, accept:0x%x retry:0x%x"
,
transId
,
mndTransStr
(
pAction
->
stage
),
...
...
@@ -1238,7 +1247,7 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
if
(
numOfActions
==
0
)
return
code
;
if
(
pTrans
->
redoActionPos
>=
numOfActions
)
return
code
;
mInfo
(
"trans:%d, execute %d actions serial
"
,
pTrans
->
id
,
numOfAction
s
);
mInfo
(
"trans:%d, execute %d actions serial
, current redoAction:%d"
,
pTrans
->
id
,
numOfActions
,
pTrans
->
redoActionPo
s
);
for
(
int32_t
action
=
pTrans
->
redoActionPos
;
action
<
numOfActions
;
++
action
)
{
STransAction
*
pAction
=
taosArrayGet
(
pTrans
->
redoActions
,
pTrans
->
redoActionPos
);
...
...
@@ -1289,13 +1298,16 @@ static int32_t mndTransExecuteRedoActionsSerial(SMnode *pMnode, STrans *pTrans)
}
else
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
mInfo
(
"trans:%d, %s:%d is in progress and wait it finish"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
);
break
;
}
else
if
(
code
==
pAction
->
retryCode
)
{
}
else
if
(
code
==
pAction
->
retryCode
||
code
==
TSDB_CODE_SYN_PROPOSE_NOT_READY
||
code
==
TSDB_CODE_SYN_RESTORING
||
code
==
TSDB_CODE_SYN_NOT_LEADER
)
{
mInfo
(
"trans:%d, %s:%d receive code:0x%x and retry"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
,
code
);
pTrans
->
lastErrorNo
=
code
;
taosMsleep
(
300
);
action
--
;
continue
;
}
else
{
terrno
=
code
;
pTrans
->
lastErrorNo
=
code
;
pTrans
->
code
=
code
;
mInfo
(
"trans:%d, %s:%d receive code:0x%x and wait another schedule, failedTimes:%d"
,
pTrans
->
id
,
mndTransStr
(
pAction
->
stage
),
pAction
->
id
,
code
,
pTrans
->
failedTimes
);
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
4db6b043
...
...
@@ -322,32 +322,34 @@ struct STsdbKeepCfg {
};
struct
SVnode
{
char
*
path
;
SVnodeCfg
config
;
SVState
state
;
SVStatis
statis
;
STfs
*
pTfs
;
SMsgCb
msgCb
;
TdThreadMutex
mutex
;
TdThreadCond
poolNotEmpty
;
SVBufPool
*
pPool
;
SVBufPool
*
inUse
;
SMeta
*
pMeta
;
SSma
*
pSma
;
STsdb
*
pTsdb
;
SWal
*
pWal
;
STQ
*
pTq
;
SSink
*
pSink
;
tsem_t
canCommit
;
int64_t
sync
;
TdThreadMutex
lock
;
bool
blocked
;
bool
restored
;
tsem_t
syncSem
;
int32_t
blockSec
;
int64_t
blockSeq
;
char
*
path
;
SVnodeCfg
config
;
SVState
state
;
SVStatis
statis
;
STfs
*
pTfs
;
SMsgCb
msgCb
;
TdThreadMutex
mutex
;
TdThreadCond
poolNotEmpty
;
SVBufPool
*
pPool
;
SVBufPool
*
inUse
;
SMeta
*
pMeta
;
SSma
*
pSma
;
STsdb
*
pTsdb
;
SWal
*
pWal
;
STQ
*
pTq
;
SSink
*
pSink
;
tsem_t
canCommit
;
int64_t
sync
;
TdThreadMutex
lock
;
bool
blocked
;
bool
restored
;
tsem_t
syncSem
;
int32_t
blockSec
;
int64_t
blockSeq
;
SQHandle
*
pQuery
;
#if 0
SRpcHandleInfo blockInfo;
SQHandle
*
pQuery
;
#endif
};
#define TD_VID(PVNODE) ((PVNODE)->config.vgId)
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
4db6b043
...
...
@@ -216,7 +216,9 @@ static int32_t inline vnodeProposeMsg(SVnode *pVnode, SRpcMsg *pMsg, bool isWeak
pVnode
->
blocked
=
true
;
pVnode
->
blockSec
=
taosGetTimestampSec
();
pVnode
->
blockSeq
=
seq
;
#if 0
pVnode->blockInfo = pMsg->info;
#endif
}
taosThreadMutexUnlock
(
&
pVnode
->
lock
);
...
...
@@ -628,10 +630,12 @@ void vnodeSyncCheckTimeout(SVnode *pVnode) {
vError
(
"vgId:%d, failed to propose since timeout and post block, start:%d cur:%d delta:%d seq:%"
PRId64
,
pVnode
->
config
.
vgId
,
pVnode
->
blockSec
,
curSec
,
delta
,
pVnode
->
blockSeq
);
if
(
syncSendTimeoutRsp
(
pVnode
->
sync
,
pVnode
->
blockSeq
)
!=
0
)
{
#if 0
SRpcMsg rpcMsg = {.code = TSDB_CODE_SYN_TIMEOUT, .info = pVnode->blockInfo};
v
Info
(
"send timeout response since its applyed, seq:%"
PRId64
" handle:%p ahandle:%p"
,
pVnode
->
blockSeq
,
v
Error
("send timeout response since its applyed, seq:%" PRId64 " handle:%p ahandle:%p", pVnode->blockSeq,
rpcMsg.info.handle, rpcMsg.info.ahandle);
rpcSendResponse(&rpcMsg);
#endif
}
pVnode
->
blocked
=
false
;
pVnode
->
blockSec
=
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录