Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
42cb8e39
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
42cb8e39
编写于
1月 03, 2021
作者:
S
Shengliang Guan
提交者:
GitHub
1月 03, 2021
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #4795 from taosdata/feature/sim
TD-2605
上级
c1d19998
e288233c
变更
2
隐藏空白更改
内联
并排
Showing
2 changed file
with
49 addition
and
30 deletion
+49
-30
src/balance/src/bnMain.c
src/balance/src/bnMain.c
+8
-8
src/vnode/src/vnodeWrite.c
src/vnode/src/vnodeWrite.c
+41
-22
未找到文件。
src/balance/src/bnMain.c
浏览文件 @
42cb8e39
...
...
@@ -237,21 +237,21 @@ static bool bnCheckVgroupReady(SVgObj *pVgroup, SVnodeGid *pRmVnode) {
bool
isReady
=
false
;
for
(
int32_t
i
=
0
;
i
<
pVgroup
->
numOfVnodes
;
++
i
)
{
SVnodeGid
*
pVnode
=
pVgroup
->
vnodeGid
+
i
;
SDnodeObj
*
pDnode
=
pVnode
->
pDnode
;
if
(
pVnode
==
pRmVnode
)
continue
;
int32_t
vver
=
mnodeGetVgidVer
(
pVnode
->
vver
);
mTrace
(
"vgId:%d, check vgroup status, vindex:%d dnode:%d status:%s role:%s vver:%d, rmvver:%d"
,
pVgroup
->
vgId
,
i
,
pVnode
->
dnodeId
,
dnodeStatus
[
p
Vnode
->
p
Dnode
->
status
],
syncRole
[
pVnode
->
role
],
vver
,
rmVnodeVer
);
if
(
p
Vnode
->
p
Dnode
->
status
==
TAOS_DN_STATUS_DROPPING
)
continue
;
if
(
p
Vnode
->
p
Dnode
->
status
==
TAOS_DN_STATUS_OFFLINE
)
continue
;
mTrace
(
"vgId:%d, check vgroup status, vindex:%d dnode:%d status:%s role:%s vver:%d, rmvver:%d"
,
pVgroup
->
vgId
,
i
,
pVnode
->
dnodeId
,
dnodeStatus
[
pDnode
->
status
],
syncRole
[
pVnode
->
role
],
vver
,
rmVnodeVer
);
if
(
pDnode
->
status
==
TAOS_DN_STATUS_DROPPING
)
continue
;
if
(
pDnode
->
status
==
TAOS_DN_STATUS_OFFLINE
)
continue
;
if
(
pVnode
->
role
!=
TAOS_SYNC_ROLE_SLAVE
&&
pVnode
->
role
!=
TAOS_SYNC_ROLE_MASTER
)
continue
;
if
(
rmVnodeVer
==
0
||
vver
>=
rmVnodeVer
)
{
mInfo
(
"vgId:%d, is ready for vindex:%d in dnode:%d status:%s role:%s vver:%d larger than rmvver:%d"
,
pVgroup
->
vgId
,
i
,
pVnode
->
dnodeId
,
dnodeStatus
[
pVnode
->
pDnode
->
status
],
syncRole
[
pVnode
->
role
],
vver
,
rmVnodeVer
);
mInfo
(
"vgId:%d, is ready for vindex:%d in dnode:%d status:%s role:%s vver:%d larger than rmvver:%d"
,
pVgroup
->
vgId
,
i
,
pVnode
->
dnodeId
,
dnodeStatus
[
pDnode
->
status
],
syncRole
[
pVnode
->
role
],
vver
,
rmVnodeVer
);
isReady
=
true
;
}
isReady
=
true
;
}
return
isReady
;
...
...
src/vnode/src/vnodeWrite.c
浏览文件 @
42cb8e39
...
...
@@ -101,8 +101,7 @@ int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rpara
return
syncCode
;
}
static
int32_t
vnodeCheckWrite
(
void
*
vparam
)
{
SVnodeObj
*
pVnode
=
vparam
;
static
int32_t
vnodeCheckWrite
(
SVnodeObj
*
pVnode
)
{
if
(
!
(
pVnode
->
accessState
&
TSDB_VN_WRITE_ACCCESS
))
{
vDebug
(
"vgId:%d, no write auth, refCount:%d pVnode:%p"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
);
return
TSDB_CODE_VND_NO_WRITE_AUTH
;
...
...
@@ -216,29 +215,21 @@ static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspR
return
TSDB_CODE_SUCCESS
;
}
int32_t
vnodeWriteToWQueue
(
void
*
vparam
,
void
*
wparam
,
int32_t
qtype
,
void
*
rparam
)
{
SVnodeObj
*
pVnode
=
vparam
;
SWalHead
*
pHead
=
wparam
;
int32_t
code
=
0
;
if
(
qtype
==
TAOS_QTYPE_RPC
)
{
code
=
vnodeCheckWrite
(
pVnode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
return
code
;
}
static
SVWriteMsg
*
vnodeBuildVWriteMsg
(
SVnodeObj
*
pVnode
,
SWalHead
*
pHead
,
int32_t
qtype
,
SRpcMsg
*
pRpcMsg
)
{
if
(
pHead
->
len
>
TSDB_MAX_WAL_SIZE
)
{
vError
(
"vgId:%d, wal len:%d exceeds limit, hver:%"
PRIu64
,
pVnode
->
vgId
,
pHead
->
len
,
pHead
->
version
);
return
TSDB_CODE_WAL_SIZE_LIMIT
;
terrno
=
TSDB_CODE_WAL_SIZE_LIMIT
;
return
NULL
;
}
int32_t
size
=
sizeof
(
SVWriteMsg
)
+
sizeof
(
SWalHead
)
+
pHead
->
len
;
SVWriteMsg
*
pWrite
=
taosAllocateQitem
(
size
);
if
(
pWrite
==
NULL
)
{
return
TSDB_CODE_VND_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_VND_OUT_OF_MEMORY
;
return
NULL
;
}
if
(
rparam
!=
NULL
)
{
SRpcMsg
*
pRpcMsg
=
rparam
;
if
(
pRpcMsg
!=
NULL
)
{
pWrite
->
rpcMsg
=
*
pRpcMsg
;
}
...
...
@@ -248,6 +239,21 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
return
pWrite
;
}
static
int32_t
vnodeWriteToWQueueImp
(
SVWriteMsg
*
pWrite
)
{
SVnodeObj
*
pVnode
=
pWrite
->
pVnode
;
if
(
pWrite
->
qtype
==
TAOS_QTYPE_RPC
)
{
int32_t
code
=
vnodeCheckWrite
(
pVnode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
taosFreeQitem
(
pWrite
);
vnodeRelease
(
pVnode
);
return
code
;
}
}
int32_t
queued
=
atomic_add_fetch_32
(
&
pVnode
->
queuedWMsg
,
1
);
if
(
queued
>
MAX_QUEUED_MSG_NUM
)
{
int32_t
ms
=
(
queued
/
MAX_QUEUED_MSG_NUM
)
*
10
+
3
;
...
...
@@ -256,15 +262,25 @@ int32_t vnodeWriteToWQueue(void *vparam, void *wparam, int32_t qtype, void *rpar
taosMsleep
(
ms
);
}
code
=
vnodePerformFlowCtrl
(
pWrite
);
if
(
code
!=
0
)
return
0
;
vTrace
(
"vgId:%d, write into vwqueue, refCount:%d queued:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
,
pVnode
->
queuedWMsg
);
taosWriteQitem
(
pVnode
->
wqueue
,
qtype
,
pWrite
);
taosWriteQitem
(
pVnode
->
wqueue
,
pWrite
->
qtype
,
pWrite
);
return
TSDB_CODE_SUCCESS
;
}
int32_t
vnodeWriteToWQueue
(
void
*
vparam
,
void
*
wparam
,
int32_t
qtype
,
void
*
rparam
)
{
SVWriteMsg
*
pWrite
=
vnodeBuildVWriteMsg
(
vparam
,
wparam
,
qtype
,
rparam
);
if
(
pWrite
==
NULL
)
{
assert
(
terrno
!=
0
);
return
terrno
;
}
int32_t
code
=
vnodePerformFlowCtrl
(
pWrite
);
if
(
code
!=
0
)
return
0
;
return
vnodeWriteToWQueueImp
(
pWrite
);
}
void
vnodeFreeFromWQueue
(
void
*
vparam
,
SVWriteMsg
*
pWrite
)
{
SVnodeObj
*
pVnode
=
vparam
;
...
...
@@ -294,7 +310,10 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
vDebug
(
"vgId:%d, msg:%p, write into vwqueue after flowctrl, retry:%d"
,
pVnode
->
vgId
,
pWrite
,
pWrite
->
processedCount
);
pWrite
->
processedCount
=
0
;
taosWriteQitem
(
pVnode
->
wqueue
,
pWrite
->
qtype
,
pWrite
);
code
=
vnodeWriteToWQueueImp
(
pWrite
);
if
(
code
!=
0
)
{
dnodeSendRpcVWriteRsp
(
pWrite
->
pVnode
,
pWrite
,
code
);
}
}
}
}
...
...
@@ -318,4 +337,4 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
pWrite
->
processedCount
);
return
TSDB_CODE_VND_ACTION_IN_PROGRESS
;
}
}
}
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录