Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
feb2270a
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
feb2270a
编写于
11月 27, 2020
作者:
S
Shengliang Guan
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
change log format
上级
09e58c78
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
28 addition
and
18 deletion
+28
-18
src/common/src/tglobal.c
src/common/src/tglobal.c
+1
-1
src/sync/src/syncMain.c
src/sync/src/syncMain.c
+2
-3
src/sync/src/syncRestore.c
src/sync/src/syncRestore.c
+2
-2
src/sync/src/syncRetrieve.c
src/sync/src/syncRetrieve.c
+6
-5
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+5
-2
src/vnode/src/vnodeWrite.c
src/vnode/src/vnodeWrite.c
+5
-3
src/wal/src/walWrite.c
src/wal/src/walWrite.c
+2
-2
tests/script/general/wal/sync.sim
tests/script/general/wal/sync.sim
+1
-0
tests/script/tmp/mnodes.sim
tests/script/tmp/mnodes.sim
+4
-0
未找到文件。
src/common/src/tglobal.c
浏览文件 @
feb2270a
...
@@ -200,7 +200,7 @@ int32_t tsNumOfLogLines = 10000000;
...
@@ -200,7 +200,7 @@ int32_t tsNumOfLogLines = 10000000;
int32_t
mDebugFlag
=
135
;
int32_t
mDebugFlag
=
135
;
int32_t
sdbDebugFlag
=
135
;
int32_t
sdbDebugFlag
=
135
;
int32_t
dDebugFlag
=
135
;
int32_t
dDebugFlag
=
135
;
int32_t
vDebugFlag
=
13
1
;
int32_t
vDebugFlag
=
13
5
;
int32_t
cDebugFlag
=
131
;
int32_t
cDebugFlag
=
131
;
int32_t
jniDebugFlag
=
131
;
int32_t
jniDebugFlag
=
131
;
int32_t
odbcDebugFlag
=
131
;
int32_t
odbcDebugFlag
=
131
;
...
...
src/sync/src/syncMain.c
浏览文件 @
feb2270a
...
@@ -541,7 +541,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
...
@@ -541,7 +541,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
pPeer
->
ip
=
ip
;
pPeer
->
ip
=
ip
;
pPeer
->
port
=
pInfo
->
nodePort
;
pPeer
->
port
=
pInfo
->
nodePort
;
pPeer
->
fqdn
[
sizeof
(
pPeer
->
fqdn
)
-
1
]
=
0
;
pPeer
->
fqdn
[
sizeof
(
pPeer
->
fqdn
)
-
1
]
=
0
;
snprintf
(
pPeer
->
id
,
sizeof
(
pPeer
->
id
),
"vgId:%d,
peer:%s:%u"
,
pNode
->
vgId
,
pPeer
->
fqdn
,
pPeer
->
port
);
snprintf
(
pPeer
->
id
,
sizeof
(
pPeer
->
id
),
"vgId:%d,
nodeId:%d"
,
pNode
->
vgId
,
pPeer
->
nodeId
);
pPeer
->
peerFd
=
-
1
;
pPeer
->
peerFd
=
-
1
;
pPeer
->
syncFd
=
-
1
;
pPeer
->
syncFd
=
-
1
;
...
@@ -1144,8 +1144,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
...
@@ -1144,8 +1144,7 @@ static void syncProcessIncommingConnection(int32_t connFd, uint32_t sourceIp) {
pPeer
->
syncFd
=
connFd
;
pPeer
->
syncFd
=
connFd
;
syncCreateRestoreDataThread
(
pPeer
);
syncCreateRestoreDataThread
(
pPeer
);
}
else
{
}
else
{
sDebug
(
"%s, TCP connection is already up(pfd:%d), close one, new pfd:%d sfd:%d"
,
pPeer
->
id
,
pPeer
->
peerFd
,
connFd
,
sDebug
(
"%s, TCP connection is up, pfd:%d sfd:%d, old pfd:%d"
,
pPeer
->
id
,
connFd
,
pPeer
->
syncFd
,
pPeer
->
peerFd
);
pPeer
->
syncFd
);
syncClosePeerConn
(
pPeer
);
syncClosePeerConn
(
pPeer
);
pPeer
->
peerFd
=
connFd
;
pPeer
->
peerFd
=
connFd
;
pPeer
->
pConn
=
taosAllocateTcpConn
(
tsTcpPool
,
pPeer
,
connFd
);
pPeer
->
pConn
=
taosAllocateTcpConn
(
tsTcpPool
,
pPeer
,
connFd
);
...
...
src/sync/src/syncRestore.c
浏览文件 @
feb2270a
...
@@ -134,7 +134,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
...
@@ -134,7 +134,7 @@ static int32_t syncRestoreFile(SSyncPeer *pPeer, uint64_t *fversion) {
// data file is changed, code shall be set to 1
// data file is changed, code shall be set to 1
*
fversion
=
minfo
.
fversion
;
*
fversion
=
minfo
.
fversion
;
code
=
1
;
code
=
1
;
sDebug
(
"%s, file changed
while restore file"
,
pPeer
->
id
);
sDebug
(
"%s, file changed
after restore file, fver:%"
PRIu64
,
pPeer
->
id
,
*
fversion
);
}
}
if
(
code
<
0
)
{
if
(
code
<
0
)
{
...
@@ -160,7 +160,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
...
@@ -160,7 +160,7 @@ static int32_t syncRestoreWal(SSyncPeer *pPeer) {
}
}
if
(
pHead
->
len
==
0
)
{
if
(
pHead
->
len
==
0
)
{
sDebug
(
"%s, wal is synced over
"
,
pPeer
->
id
);
sDebug
(
"%s, wal is synced over
, last wver:%"
PRIu64
,
pPeer
->
id
,
lastVer
);
code
=
0
;
code
=
0
;
break
;
break
;
}
// wal sync over
}
// wal sync over
...
...
src/sync/src/syncRetrieve.c
浏览文件 @
feb2270a
...
@@ -58,6 +58,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
...
@@ -58,6 +58,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
fileInfo
.
magic
=
(
*
pNode
->
getFileInfo
)(
pNode
->
vgId
,
fileInfo
.
name
,
&
fileInfo
.
index
,
TAOS_SYNC_MAX_INDEX
,
fileInfo
.
magic
=
(
*
pNode
->
getFileInfo
)(
pNode
->
vgId
,
fileInfo
.
name
,
&
fileInfo
.
index
,
TAOS_SYNC_MAX_INDEX
,
&
fileInfo
.
size
,
&
fileInfo
.
fversion
);
&
fileInfo
.
size
,
&
fileInfo
.
fversion
);
// fileInfo.size = htonl(size);
// fileInfo.size = htonl(size);
sDebug
(
"%s, file:%s info will be sent, size:%"
PRId64
,
pPeer
->
id
,
fileInfo
.
name
,
fileInfo
.
size
);
// send the file info
// send the file info
int32_t
ret
=
taosWriteMsg
(
pPeer
->
syncFd
,
&
(
fileInfo
),
sizeof
(
fileInfo
));
int32_t
ret
=
taosWriteMsg
(
pPeer
->
syncFd
,
&
(
fileInfo
),
sizeof
(
fileInfo
));
...
@@ -107,7 +108,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
...
@@ -107,7 +108,7 @@ static int32_t syncRetrieveFile(SSyncPeer *pPeer) {
break
;
break
;
}
}
sDebug
(
"%s,
%s is sent, size:%"
PRId64
,
pPeer
->
id
,
name
,
fileInfo
.
size
);
sDebug
(
"%s,
file:%s is sent, size:%"
PRId64
,
pPeer
->
id
,
fileInfo
.
name
,
fileInfo
.
size
);
fileInfo
.
index
++
;
fileInfo
.
index
++
;
// check if processed files are modified
// check if processed files are modified
...
@@ -419,18 +420,18 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
...
@@ -419,18 +420,18 @@ static int32_t syncRetrieveDataStepByStep(SSyncPeer *pPeer) {
pPeer
->
sversion
=
0
;
pPeer
->
sversion
=
0
;
pPeer
->
sstatus
=
TAOS_SYNC_STATUS_FILE
;
pPeer
->
sstatus
=
TAOS_SYNC_STATUS_FILE
;
sInfo
(
"%s, start to retrieve file, set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
pPeer
->
sstatus
]);
sInfo
(
"%s, start to retrieve file
s
, set sstatus:%s"
,
pPeer
->
id
,
syncStatus
[
pPeer
->
sstatus
]);
if
(
syncRetrieveFile
(
pPeer
)
<
0
)
{
if
(
syncRetrieveFile
(
pPeer
)
<
0
)
{
sError
(
"%s, failed to retrieve file"
,
pPeer
->
id
);
sError
(
"%s, failed to retrieve file
s
"
,
pPeer
->
id
);
return
-
1
;
return
-
1
;
}
}
// if no files are synced, there must be wal to sync, sversion must be larger than one
// if no files are synced, there must be wal to sync, sversion must be larger than one
if
(
pPeer
->
sversion
==
0
)
pPeer
->
sversion
=
1
;
if
(
pPeer
->
sversion
==
0
)
pPeer
->
sversion
=
1
;
sInfo
(
"%s, start to retrieve wal"
,
pPeer
->
id
);
sInfo
(
"%s, start to retrieve wal
s
"
,
pPeer
->
id
);
if
(
syncRetrieveWal
(
pPeer
)
<
0
)
{
if
(
syncRetrieveWal
(
pPeer
)
<
0
)
{
sError
(
"%s, failed to retrieve wal"
,
pPeer
->
id
);
sError
(
"%s, failed to retrieve wal
s
"
,
pPeer
->
id
);
return
-
1
;
return
-
1
;
}
}
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
feb2270a
...
@@ -688,8 +688,10 @@ static void vnodeCtrlFlow(int32_t vgId, int32_t level) {
...
@@ -688,8 +688,10 @@ static void vnodeCtrlFlow(int32_t vgId, int32_t level) {
return
;
return
;
}
}
pVnode
->
flowctrlLevel
=
level
;
if
(
pVnode
->
flowctrlLevel
!=
level
)
{
vDebug
(
"vgId:%d, set flowctrl level:%d"
,
pVnode
->
vgId
,
level
);
vDebug
(
"vgId:%d, set flowctrl level from %d to %d"
,
pVnode
->
vgId
,
pVnode
->
flowctrlLevel
,
level
);
pVnode
->
flowctrlLevel
=
level
;
}
vnodeRelease
(
pVnode
);
vnodeRelease
(
pVnode
);
}
}
...
@@ -779,6 +781,7 @@ static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver) {
...
@@ -779,6 +781,7 @@ static int32_t vnodeGetFileVersion(int32_t vgId, uint64_t *fver) {
int32_t
code
=
0
;
int32_t
code
=
0
;
if
(
pVnode
->
isCommiting
)
{
if
(
pVnode
->
isCommiting
)
{
vDebug
(
"vgId:%d, vnode is commiting while get file version"
,
vgId
);
code
=
-
1
;
code
=
-
1
;
}
else
{
}
else
{
*
fver
=
pVnode
->
fversion
;
*
fver
=
pVnode
->
fversion
;
...
...
src/vnode/src/vnodeWrite.c
浏览文件 @
feb2270a
...
@@ -282,13 +282,15 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
...
@@ -282,13 +282,15 @@ static void vnodeFlowCtrlMsgToWQueue(void *param, void *tmrId) {
pWrite
->
processedCount
++
;
pWrite
->
processedCount
++
;
if
(
pWrite
->
processedCount
>
100
)
{
if
(
pWrite
->
processedCount
>
100
)
{
vError
(
"vgId:%d, msg:%p, failed to process since %s"
,
pVnode
->
vgId
,
pWrite
,
tstrerror
(
code
));
vError
(
"vgId:%d, msg:%p, failed to process since %s, retry:%d"
,
pVnode
->
vgId
,
pWrite
,
tstrerror
(
code
),
pWrite
->
processedCount
);
pWrite
->
processedCount
=
1
;
pWrite
->
processedCount
=
1
;
dnodeSendRpcVWriteRsp
(
pWrite
->
pVnode
,
pWrite
,
code
);
dnodeSendRpcVWriteRsp
(
pWrite
->
pVnode
,
pWrite
,
code
);
}
else
{
}
else
{
code
=
vnodePerformFlowCtrl
(
pWrite
);
code
=
vnodePerformFlowCtrl
(
pWrite
);
if
(
code
==
0
)
{
if
(
code
==
0
)
{
vDebug
(
"vgId:%d, write into vwqueue after flowctrl"
,
pVnode
->
vgId
);
vDebug
(
"vgId:%d, msg:%p, write into vwqueue after flowctrl, retry:%d"
,
pVnode
->
vgId
,
pWrite
,
pWrite
->
processedCount
);
pWrite
->
processedCount
=
0
;
pWrite
->
processedCount
=
0
;
taosWriteQitem
(
pVnode
->
wqueue
,
pWrite
->
qtype
,
pWrite
);
taosWriteQitem
(
pVnode
->
wqueue
,
pWrite
->
qtype
,
pWrite
);
}
}
...
@@ -310,7 +312,7 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
...
@@ -310,7 +312,7 @@ static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite) {
void
*
unUsed
=
NULL
;
void
*
unUsed
=
NULL
;
taosTmrReset
(
vnodeFlowCtrlMsgToWQueue
,
100
,
pWrite
,
tsDnodeTmr
,
&
unUsed
);
taosTmrReset
(
vnodeFlowCtrlMsgToWQueue
,
100
,
pWrite
,
tsDnodeTmr
,
&
unUsed
);
vTrace
(
"vgId:%d, msg:%p, app:%p, perform flowctrl,
count
:%d"
,
pVnode
->
vgId
,
pWrite
,
pWrite
->
rpcMsg
.
ahandle
,
vTrace
(
"vgId:%d, msg:%p, app:%p, perform flowctrl,
retry
:%d"
,
pVnode
->
vgId
,
pWrite
,
pWrite
->
rpcMsg
.
ahandle
,
pWrite
->
processedCount
);
pWrite
->
processedCount
);
return
TSDB_CODE_VND_ACTION_IN_PROGRESS
;
return
TSDB_CODE_VND_ACTION_IN_PROGRESS
;
}
}
...
...
src/wal/src/walWrite.c
浏览文件 @
feb2270a
...
@@ -211,7 +211,7 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
...
@@ -211,7 +211,7 @@ int32_t walGetWalFile(void *handle, char *fileName, int64_t *fileId) {
code
=
(
*
fileId
==
pWal
->
fileId
)
?
0
:
1
;
code
=
(
*
fileId
==
pWal
->
fileId
)
?
0
:
1
;
}
}
w
Trace
(
"vgId:%d, get wal file, code:%d curId:%"
PRId64
" outId:%"
PRId64
,
pWal
->
vgId
,
code
,
pWal
->
fileId
,
*
fileId
);
w
Debug
(
"vgId:%d, get wal file, code:%d curId:%"
PRId64
" outId:%"
PRId64
,
pWal
->
vgId
,
code
,
pWal
->
fileId
,
*
fileId
);
pthread_mutex_unlock
(
&
(
pWal
->
mutex
));
pthread_mutex_unlock
(
&
(
pWal
->
mutex
));
return
code
;
return
code
;
...
@@ -325,7 +325,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
...
@@ -325,7 +325,7 @@ static int32_t walRestoreWalFile(SWal *pWal, void *pVnode, FWalWrite writeFp, ch
offset
=
offset
+
sizeof
(
SWalHead
)
+
pHead
->
len
;
offset
=
offset
+
sizeof
(
SWalHead
)
+
pHead
->
len
;
w
Trace
(
"vgId:%d, restore wal, fileId:%"
PRId64
" hver:%"
PRIu64
" wver:%"
PRIu64
" len:%d"
,
pWal
->
vgId
,
w
Debug
(
"vgId:%d, restore wal, fileId:%"
PRId64
" hver:%"
PRIu64
" wver:%"
PRIu64
" len:%d"
,
pWal
->
vgId
,
fileId
,
pHead
->
version
,
pWal
->
version
,
pHead
->
len
);
fileId
,
pHead
->
version
,
pWal
->
version
,
pHead
->
len
);
pWal
->
version
=
pHead
->
version
;
pWal
->
version
=
pHead
->
version
;
...
...
tests/script/general/wal/sync.sim
浏览文件 @
feb2270a
...
@@ -82,6 +82,7 @@ restful d1 table_rest 1591772800 30000
...
@@ -82,6 +82,7 @@ restful d1 table_rest 1591772800 30000
restful d1 table_rest 1591872800 30000
restful d1 table_rest 1591872800 30000
restful d1 table_rest 1591972800 30000
restful d1 table_rest 1591972800 30000
sleep 1000
sql select * from table_rest;
sql select * from table_rest;
print rows: $rows
print rows: $rows
if $rows != 300000 then
if $rows != 300000 then
...
...
tests/script/tmp/mnodes.sim
浏览文件 @
feb2270a
...
@@ -20,6 +20,10 @@ system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 20000
...
@@ -20,6 +20,10 @@ system sh/cfg.sh -n dnode1 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode2 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode3 -c maxTablesPerVnode -v 20000
system sh/cfg.sh -n dnode1 -c minTablesPerVnode -v 1000
system sh/cfg.sh -n dnode2 -c minTablesPerVnode -v 1000
system sh/cfg.sh -n dnode3 -c minTablesPerVnode -v 1000
system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 20
system sh/cfg.sh -n dnode1 -c maxVgroupsPerDb -v 20
system sh/cfg.sh -n dnode2 -c maxVgroupsPerDb -v 20
system sh/cfg.sh -n dnode2 -c maxVgroupsPerDb -v 20
system sh/cfg.sh -n dnode3 -c maxVgroupsPerDb -v 20
system sh/cfg.sh -n dnode3 -c maxVgroupsPerDb -v 20
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录