Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
acd35eaf
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
acd35eaf
编写于
10月 30, 2020
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
[TD-225] merge develop
上级
f311440d
acbfa5de
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
388 addition
and
119 deletion
+388
-119
src/sync/src/syncMain.c
src/sync/src/syncMain.c
+153
-102
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+4
-4
src/tsdb/src/tsdbRead.c
src/tsdb/src/tsdbRead.c
+2
-2
src/util/inc/tref.h
src/util/inc/tref.h
+30
-3
src/util/src/tref.c
src/util/src/tref.c
+69
-6
src/util/tests/trefTest.c
src/util/tests/trefTest.c
+17
-1
tests/pytest/fulltest.sh
tests/pytest/fulltest.sh
+1
-0
tests/pytest/query/dataLossTest.py
tests/pytest/query/dataLossTest.py
+76
-0
tests/pytest/util/dnodes.py
tests/pytest/util/dnodes.py
+35
-0
tests/pytest/util/sql.py
tests/pytest/util/sql.py
+1
-1
未找到文件。
src/sync/src/syncMain.c
浏览文件 @
acd35eaf
...
...
@@ -20,6 +20,7 @@
#include "tlog.h"
#include "tutil.h"
#include "ttimer.h"
#include "tref.h"
#include "tsocket.h"
#include "tglobal.h"
#include "taoserror.h"
...
...
@@ -43,6 +44,7 @@ char tsNodeFqdn[TSDB_FQDN_LEN];
static
ttpool_h
tsTcpPool
;
static
void
*
syncTmrCtrl
=
NULL
;
static
void
*
vgIdHash
;
static
int
tsSyncRefId
=
-
1
;
// local functions
static
void
syncProcessSyncRequest
(
char
*
pMsg
,
SSyncPeer
*
pPeer
);
...
...
@@ -54,13 +56,13 @@ static int syncProcessPeerMsg(void *param, void *buffer);
static
void
syncProcessIncommingConnection
(
int
connFd
,
uint32_t
sourceIp
);
static
void
syncRemovePeer
(
SSyncPeer
*
pPeer
);
static
void
syncAddArbitrator
(
SSyncNode
*
pNode
);
static
void
syncAddNodeRef
(
SSyncNode
*
pNode
);
static
void
syncDecNodeRef
(
SSyncNode
*
pNode
);
static
void
syncFreeNode
(
void
*
);
static
void
syncRemoveConfirmedFwdInfo
(
SSyncNode
*
pNode
);
static
void
syncMonitorFwdInfos
(
void
*
param
,
void
*
tmrId
);
static
void
syncProcessFwdAck
(
SSyncNode
*
pNode
,
SFwdInfo
*
pFwdInfo
,
int32_t
code
);
static
void
syncSaveFwdInfo
(
SSyncNode
*
pNode
,
uint64_t
version
,
void
*
mhandle
);
static
void
syncRestartPeer
(
SSyncPeer
*
pPeer
);
static
int32_t
syncForwardToPeerImpl
(
SSyncNode
*
pNode
,
void
*
data
,
void
*
mhandle
,
int
qtyp
);
static
SSyncPeer
*
syncAddPeer
(
SSyncNode
*
pNode
,
const
SNodeInfo
*
pInfo
);
char
*
syncRole
[]
=
{
...
...
@@ -106,6 +108,12 @@ int32_t syncInit() {
return
-
1
;
}
tsSyncRefId
=
taosOpenRef
(
200
,
syncFreeNode
);
if
(
tsSyncRefId
<
0
)
{
syncCleanUp
();
return
-
1
;
}
tstrncpy
(
tsNodeFqdn
,
tsLocalFqdn
,
sizeof
(
tsNodeFqdn
));
sInfo
(
"sync module initialized successfully"
);
...
...
@@ -128,6 +136,9 @@ void syncCleanUp() {
vgIdHash
=
NULL
;
}
taosCloseRef
(
tsSyncRefId
);
tsSyncRefId
=
-
1
;
sInfo
(
"sync module is cleaned up"
);
}
...
...
@@ -159,6 +170,12 @@ void *syncStart(const SSyncInfo *pInfo) {
pNode
->
quorum
=
pCfg
->
quorum
;
if
(
pNode
->
quorum
>
pNode
->
replica
)
pNode
->
quorum
=
pNode
->
replica
;
int
ret
=
taosAddRef
(
tsSyncRefId
,
pNode
);
if
(
ret
<
0
)
{
syncFreeNode
(
pNode
);
return
NULL
;
}
for
(
int
i
=
0
;
i
<
pCfg
->
replica
;
++
i
)
{
const
SNodeInfo
*
pNodeInfo
=
pCfg
->
nodeInfo
+
i
;
pNode
->
peerInfo
[
i
]
=
syncAddPeer
(
pNode
,
pNodeInfo
);
...
...
@@ -167,8 +184,6 @@ void *syncStart(const SSyncInfo *pInfo) {
}
}
syncAddNodeRef
(
pNode
);
if
(
pNode
->
selfIndex
<
0
)
{
sInfo
(
"vgId:%d, this node is not configured"
,
pNode
->
vgId
);
terrno
=
TSDB_CODE_SYN_INVALID_CONFIG
;
...
...
@@ -210,7 +225,9 @@ void syncStop(void *param) {
SSyncNode
*
pNode
=
param
;
SSyncPeer
*
pPeer
;
if
(
pNode
==
NULL
)
return
;
int
ret
=
taosAcquireRef
(
tsSyncRefId
,
pNode
);
if
(
ret
<
0
)
return
;
sInfo
(
"vgId:%d, cleanup sync"
,
pNode
->
vgId
);
pthread_mutex_lock
(
&
(
pNode
->
mutex
));
...
...
@@ -228,14 +245,17 @@ void syncStop(void *param) {
pthread_mutex_unlock
(
&
(
pNode
->
mutex
));
syncDecNodeRef
(
pNode
);
taosReleaseRef
(
tsSyncRefId
,
pNode
);
taosRemoveRef
(
tsSyncRefId
,
pNode
);
}
int32_t
syncReconfig
(
void
*
param
,
const
SSyncCfg
*
pNewCfg
)
{
SSyncNode
*
pNode
=
param
;
int
i
,
j
;
if
(
pNode
==
NULL
)
return
TSDB_CODE_SYN_INVALID_CONFIG
;
int
ret
=
taosAcquireRef
(
tsSyncRefId
,
pNode
);
if
(
ret
<
0
)
return
TSDB_CODE_SYN_INVALID_CONFIG
;
sInfo
(
"vgId:%d, reconfig, role:%s replica:%d old:%d"
,
pNode
->
vgId
,
syncRole
[
nodeRole
],
pNewCfg
->
replica
,
pNode
->
replica
);
...
...
@@ -298,105 +318,63 @@ int32_t syncReconfig(void *param, const SSyncCfg *pNewCfg) {
syncRole
[
nodeRole
]);
syncBroadcastStatus
(
pNode
);
taosReleaseRef
(
tsSyncRefId
,
pNode
);
return
0
;
}
int32_t
syncForwardToPeer
(
void
*
param
,
void
*
data
,
void
*
mhandle
,
int
qtype
)
{
SSyncNode
*
pNode
=
param
;
SSyncPeer
*
pPeer
;
SSyncHead
*
pSyncHead
;
SWalHead
*
pWalHead
=
data
;
int
fwdLen
;
int
code
=
0
;
if
(
pNode
==
NULL
)
return
0
;
if
(
nodeRole
==
TAOS_SYNC_ROLE_SLAVE
&&
pWalHead
->
version
!=
nodeVersion
+
1
)
{
sError
(
"vgId:%d, received ver:%"
PRIu64
", inconsistent with last ver:%"
PRIu64
", restart connection"
,
pNode
->
vgId
,
pWalHead
->
version
,
nodeVersion
);
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
pPeer
=
pNode
->
peerInfo
[
i
];
syncRestartConnection
(
pPeer
);
}
return
TSDB_CODE_SYN_INVALID_VERSION
;
}
// always update version
nodeVersion
=
pWalHead
->
version
;
sDebug
(
"vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%"
PRIu64
,
pNode
->
vgId
,
pNode
->
replica
,
syncRole
[
nodeRole
],
qtype
,
pWalHead
->
version
);
if
(
pNode
->
replica
==
1
||
nodeRole
!=
TAOS_SYNC_ROLE_MASTER
)
return
0
;
// only pkt from RPC or CQ can be forwarded
if
(
qtype
!=
TAOS_QTYPE_RPC
&&
qtype
!=
TAOS_QTYPE_CQ
)
return
0
;
// a hacker way to improve the performance
pSyncHead
=
(
SSyncHead
*
)(((
char
*
)
pWalHead
)
-
sizeof
(
SSyncHead
));
pSyncHead
->
type
=
TAOS_SMSG_FORWARD
;
pSyncHead
->
pversion
=
0
;
pSyncHead
->
len
=
sizeof
(
SWalHead
)
+
pWalHead
->
len
;
fwdLen
=
pSyncHead
->
len
+
sizeof
(
SSyncHead
);
// include the WAL and SYNC head
int
ret
=
taosAcquireRef
(
tsSyncRefId
,
pNode
);
if
(
ret
<
0
)
return
0
;
pthread_mutex_lock
(
&
(
pNode
->
mutex
)
);
int32_t
code
=
syncForwardToPeerImpl
(
pNode
,
data
,
mhandle
,
qtype
);
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
pPeer
=
pNode
->
peerInfo
[
i
];
if
(
pPeer
==
NULL
||
pPeer
->
peerFd
<
0
)
continue
;
if
(
pPeer
->
role
!=
TAOS_SYNC_ROLE_SLAVE
&&
pPeer
->
sstatus
!=
TAOS_SYNC_STATUS_CACHE
)
continue
;
if
(
pNode
->
quorum
>
1
&&
code
==
0
)
{
syncSaveFwdInfo
(
pNode
,
pWalHead
->
version
,
mhandle
);
code
=
1
;
}
int
retLen
=
write
(
pPeer
->
peerFd
,
pSyncHead
,
fwdLen
);
if
(
retLen
==
fwdLen
)
{
sDebug
(
"%s, forward is sent, ver:%"
PRIu64
" contLen:%d"
,
pPeer
->
id
,
pWalHead
->
version
,
pWalHead
->
len
);
}
else
{
sError
(
"%s, failed to forward, ver:%"
PRIu64
" retLen:%d"
,
pPeer
->
id
,
pWalHead
->
version
,
retLen
);
syncRestartConnection
(
pPeer
);
}
}
pthread_mutex_unlock
(
&
(
pNode
->
mutex
));
taosReleaseRef
(
tsSyncRefId
,
pNode
);
return
code
;
}
void
syncConfirmForward
(
void
*
param
,
uint64_t
version
,
int32_t
code
)
{
SSyncNode
*
pNode
=
param
;
if
(
pNode
==
NULL
)
return
;
if
(
pNode
->
quorum
<=
1
)
return
;
SSyncPeer
*
pPeer
=
pNode
->
pMaster
;
if
(
pPeer
==
NULL
)
return
;
int
ret
=
taosAcquireRef
(
tsSyncRefId
,
pNode
)
;
if
(
ret
<
0
)
return
;
char
msg
[
sizeof
(
SSyncHead
)
+
sizeof
(
SFwdRsp
)]
=
{
0
};
SSyncPeer
*
pPeer
=
pNode
->
pMaster
;
if
(
pPeer
&&
pNode
->
quorum
>
1
)
{
char
msg
[
sizeof
(
SSyncHead
)
+
sizeof
(
SFwdRsp
)]
=
{
0
};
SSyncHead
*
pHead
=
(
SSyncHead
*
)
msg
;
pHead
->
type
=
TAOS_SMSG_FORWARD_RSP
;
pHead
->
len
=
sizeof
(
SFwdRsp
);
SSyncHead
*
pHead
=
(
SSyncHead
*
)
msg
;
pHead
->
type
=
TAOS_SMSG_FORWARD_RSP
;
pHead
->
len
=
sizeof
(
SFwdRsp
);
SFwdRsp
*
pFwdRsp
=
(
SFwdRsp
*
)(
msg
+
sizeof
(
SSyncHead
));
pFwdRsp
->
version
=
version
;
pFwdRsp
->
code
=
code
;
SFwdRsp
*
pFwdRsp
=
(
SFwdRsp
*
)(
msg
+
sizeof
(
SSyncHead
));
pFwdRsp
->
version
=
version
;
pFwdRsp
->
code
=
code
;
int
msgLen
=
sizeof
(
SSyncHead
)
+
sizeof
(
SFwdRsp
);
int
retLen
=
write
(
pPeer
->
peerFd
,
msg
,
msgLen
);
int
msgLen
=
sizeof
(
SSyncHead
)
+
sizeof
(
SFwdRsp
);
int
retLen
=
write
(
pPeer
->
peerFd
,
msg
,
msgLen
);
if
(
retLen
==
msgLen
)
{
sDebug
(
"%s, forward-rsp is sent, ver:%"
PRIu64
,
pPeer
->
id
,
version
);
}
else
{
sDebug
(
"%s, failed to send forward ack, restart"
,
pPeer
->
id
);
syncRestartConnection
(
pPeer
);
if
(
retLen
==
msgLen
)
{
sDebug
(
"%s, forward-rsp is sent, ver:%"
PRIu64
,
pPeer
->
id
,
version
);
}
else
{
sDebug
(
"%s, failed to send forward ack, restart"
,
pPeer
->
id
);
syncRestartConnection
(
pPeer
);
}
}
taosReleaseRef
(
tsSyncRefId
,
pNode
);
}
void
syncRecover
(
void
*
param
)
{
SSyncNode
*
pNode
=
param
;
SSyncPeer
*
pPeer
;
int
ret
=
taosAcquireRef
(
tsSyncRefId
,
pNode
);
if
(
ret
<
0
)
return
;
// to do: add a few lines to check if recover is OK
// if take this node to unsync state, the whole system may not work
...
...
@@ -414,17 +392,24 @@ void syncRecover(void *param) {
}
pthread_mutex_unlock
(
&
(
pNode
->
mutex
));
taosReleaseRef
(
tsSyncRefId
,
pNode
);
}
int
syncGetNodesRole
(
void
*
param
,
SNodesRole
*
pNodesRole
)
{
SSyncNode
*
pNode
=
param
;
int
ret
=
taosAcquireRef
(
tsSyncRefId
,
pNode
);
if
(
ret
<
0
)
return
-
1
;
pNodesRole
->
selfIndex
=
pNode
->
selfIndex
;
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
pNodesRole
->
nodeId
[
i
]
=
pNode
->
peerInfo
[
i
]
->
nodeId
;
pNodesRole
->
role
[
i
]
=
pNode
->
peerInfo
[
i
]
->
role
;
}
taosReleaseRef
(
tsSyncRefId
,
pNode
);
return
0
;
}
...
...
@@ -457,22 +442,20 @@ static void syncAddArbitrator(SSyncNode *pNode) {
pNode
->
peerInfo
[
TAOS_SYNC_MAX_REPLICA
]
=
syncAddPeer
(
pNode
,
&
nodeInfo
);
}
static
void
syncAddNodeRef
(
SSyncNode
*
pNode
)
{
atomic_add_fetch_8
(
&
pNode
->
refCount
,
1
);
}
static
void
syncFreeNode
(
void
*
param
)
{
SSyncNode
*
pNode
=
param
;
static
void
syncDecNodeRef
(
SSyncNode
*
pNode
)
{
if
(
atomic_sub_fetch_8
(
&
pNode
->
refCount
,
1
)
==
0
)
{
pthread_mutex_destroy
(
&
pNode
->
mutex
);
taosTFree
(
pNode
->
pRecv
);
taosTFree
(
pNode
->
pSyncFwds
);
taosTFree
(
pNode
);
}
pthread_mutex_destroy
(
&
pNode
->
mutex
);
taosTFree
(
pNode
->
pRecv
);
taosTFree
(
pNode
->
pSyncFwds
);
taosTFree
(
pNode
);
}
void
syncAddPeerRef
(
SSyncPeer
*
pPeer
)
{
atomic_add_fetch_8
(
&
pPeer
->
refCount
,
1
);
}
int
syncDecPeerRef
(
SSyncPeer
*
pPeer
)
{
if
(
atomic_sub_fetch_8
(
&
pPeer
->
refCount
,
1
)
==
0
)
{
syncDecNodeRef
(
pPeer
->
pSyncNode
);
taosReleaseRef
(
tsSyncRefId
,
pPeer
->
pSyncNode
);
sDebug
(
"%s, resource is freed"
,
pPeer
->
id
);
taosTFree
(
pPeer
->
watchFd
);
...
...
@@ -529,7 +512,7 @@ static SSyncPeer *syncAddPeer(SSyncNode *pNode, const SNodeInfo *pInfo) {
taosTmrReset
(
syncCheckPeerConnection
,
checkMs
,
pPeer
,
syncTmrCtrl
,
&
pPeer
->
timer
);
}
syncAddNodeRef
(
pNode
);
taosAcquireRef
(
tsSyncRefId
,
pNode
);
return
pPeer
;
}
...
...
@@ -1122,7 +1105,7 @@ static void syncProcessBrokenLink(void *param) {
SSyncPeer
*
pPeer
=
param
;
SSyncNode
*
pNode
=
pPeer
->
pSyncNode
;
syncAddNodeRef
(
pNode
)
;
if
(
taosAcquireRef
(
tsSyncRefId
,
pNode
)
<
0
)
return
;
pthread_mutex_lock
(
&
(
pNode
->
mutex
));
sDebug
(
"%s, TCP link is broken(%s)"
,
pPeer
->
id
,
strerror
(
errno
));
...
...
@@ -1133,7 +1116,7 @@ static void syncProcessBrokenLink(void *param) {
}
pthread_mutex_unlock
(
&
(
pNode
->
mutex
));
syncDecNodeRef
(
pNode
);
taosReleaseRef
(
tsSyncRefId
,
pNode
);
}
static
void
syncSaveFwdInfo
(
SSyncNode
*
pNode
,
uint64_t
version
,
void
*
mhandle
)
{
...
...
@@ -1202,22 +1185,90 @@ static void syncProcessFwdAck(SSyncNode *pNode, SFwdInfo *pFwdInfo, int32_t code
static
void
syncMonitorFwdInfos
(
void
*
param
,
void
*
tmrId
)
{
SSyncNode
*
pNode
=
param
;
int
ret
=
taosAcquireRef
(
tsSyncRefId
,
pNode
);
if
(
ret
<
0
)
return
;
SSyncFwds
*
pSyncFwds
=
pNode
->
pSyncFwds
;
if
(
pSyncFwds
==
NULL
)
return
;
uint64_t
time
=
taosGetTimestampMs
();
if
(
pSyncFwds
)
{;
uint64_t
time
=
taosGetTimestampMs
();
if
(
pSyncFwds
->
fwds
>
0
)
{
pthread_mutex_lock
(
&
(
pNode
->
mutex
));
for
(
int
i
=
0
;
i
<
pSyncFwds
->
fwds
;
++
i
)
{
SFwdInfo
*
pFwdInfo
=
pSyncFwds
->
fwdInfo
+
(
pSyncFwds
->
first
+
i
)
%
tsMaxFwdInfo
;
if
(
time
-
pFwdInfo
->
time
<
2000
)
break
;
syncProcessFwdAck
(
pNode
,
pFwdInfo
,
TSDB_CODE_RPC_NETWORK_UNAVAIL
);
if
(
pSyncFwds
->
fwds
>
0
)
{
pthread_mutex_lock
(
&
(
pNode
->
mutex
));
for
(
int
i
=
0
;
i
<
pSyncFwds
->
fwds
;
++
i
)
{
SFwdInfo
*
pFwdInfo
=
pSyncFwds
->
fwdInfo
+
(
pSyncFwds
->
first
+
i
)
%
tsMaxFwdInfo
;
if
(
time
-
pFwdInfo
->
time
<
2000
)
break
;
syncProcessFwdAck
(
pNode
,
pFwdInfo
,
TSDB_CODE_RPC_NETWORK_UNAVAIL
);
}
syncRemoveConfirmedFwdInfo
(
pNode
);
pthread_mutex_unlock
(
&
(
pNode
->
mutex
));
}
syncRemoveConfirmedFwdInfo
(
pNode
);
pthread_mutex_unlock
(
&
(
pNode
->
mutex
));
pNode
->
pFwdTimer
=
taosTmrStart
(
syncMonitorFwdInfos
,
300
,
pNode
,
syncTmrCtrl
);
}
pNode
->
pFwdTimer
=
taosTmrStart
(
syncMonitorFwdInfos
,
300
,
pNode
,
syncTmrCtrl
);
taosReleaseRef
(
tsSyncRefId
,
pNode
);
}
static
int32_t
syncForwardToPeerImpl
(
SSyncNode
*
pNode
,
void
*
data
,
void
*
mhandle
,
int
qtype
)
{
SSyncPeer
*
pPeer
;
SSyncHead
*
pSyncHead
;
SWalHead
*
pWalHead
=
data
;
int
fwdLen
;
int32_t
code
=
0
;
if
(
nodeRole
==
TAOS_SYNC_ROLE_SLAVE
&&
pWalHead
->
version
!=
nodeVersion
+
1
)
{
sError
(
"vgId:%d, received ver:%"
PRIu64
", inconsistent with last ver:%"
PRIu64
", restart connection"
,
pNode
->
vgId
,
pWalHead
->
version
,
nodeVersion
);
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
pPeer
=
pNode
->
peerInfo
[
i
];
syncRestartConnection
(
pPeer
);
}
return
TSDB_CODE_SYN_INVALID_VERSION
;
}
// always update version
nodeVersion
=
pWalHead
->
version
;
sDebug
(
"vgId:%d, replica:%d nodeRole:%s qtype:%d ver:%"
PRIu64
,
pNode
->
vgId
,
pNode
->
replica
,
syncRole
[
nodeRole
],
qtype
,
pWalHead
->
version
);
if
(
pNode
->
replica
==
1
||
nodeRole
!=
TAOS_SYNC_ROLE_MASTER
)
return
0
;
// only pkt from RPC or CQ can be forwarded
if
(
qtype
!=
TAOS_QTYPE_RPC
&&
qtype
!=
TAOS_QTYPE_CQ
)
return
0
;
// a hacker way to improve the performance
pSyncHead
=
(
SSyncHead
*
)(((
char
*
)
pWalHead
)
-
sizeof
(
SSyncHead
));
pSyncHead
->
type
=
TAOS_SMSG_FORWARD
;
pSyncHead
->
pversion
=
0
;
pSyncHead
->
len
=
sizeof
(
SWalHead
)
+
pWalHead
->
len
;
fwdLen
=
pSyncHead
->
len
+
sizeof
(
SSyncHead
);
// include the WAL and SYNC head
pthread_mutex_lock
(
&
(
pNode
->
mutex
));
for
(
int
i
=
0
;
i
<
pNode
->
replica
;
++
i
)
{
pPeer
=
pNode
->
peerInfo
[
i
];
if
(
pPeer
==
NULL
||
pPeer
->
peerFd
<
0
)
continue
;
if
(
pPeer
->
role
!=
TAOS_SYNC_ROLE_SLAVE
&&
pPeer
->
sstatus
!=
TAOS_SYNC_STATUS_CACHE
)
continue
;
if
(
pNode
->
quorum
>
1
&&
code
==
0
)
{
syncSaveFwdInfo
(
pNode
,
pWalHead
->
version
,
mhandle
);
code
=
1
;
}
int
retLen
=
write
(
pPeer
->
peerFd
,
pSyncHead
,
fwdLen
);
if
(
retLen
==
fwdLen
)
{
sDebug
(
"%s, forward is sent, ver:%"
PRIu64
" contLen:%d"
,
pPeer
->
id
,
pWalHead
->
version
,
pWalHead
->
len
);
}
else
{
sError
(
"%s, failed to forward, ver:%"
PRIu64
" retLen:%d"
,
pPeer
->
id
,
pWalHead
->
version
,
retLen
);
syncRestartConnection
(
pPeer
);
}
}
pthread_mutex_unlock
(
&
(
pNode
->
mutex
));
return
code
;
}
src/tsdb/src/tsdbMeta.c
浏览文件 @
acd35eaf
...
...
@@ -562,12 +562,12 @@ int tsdbUnlockRepoMeta(STsdbRepo *pRepo) {
void
tsdbRefTable
(
STable
*
pTable
)
{
int32_t
ref
=
T_REF_INC
(
pTable
);
UNUSED
(
ref
);
// tsdbDebug("ref table %"PRIu64", tid:%d, refCount:%d"
, TABLE_UID(pTable), TABLE_TID(pTable), ref);
tsdbDebug
(
"ref table %s uid %"
PRIu64
" tid:%d, refCount:%d"
,
TABLE_CHAR_NAME
(
pTable
)
,
TABLE_UID
(
pTable
),
TABLE_TID
(
pTable
),
ref
);
}
void
tsdbUnRefTable
(
STable
*
pTable
)
{
int32_t
ref
=
T_REF_DEC
(
pTable
);
tsdbDebug
(
"unref table
uid:%"
PRIu64
", tid:%d, refCount:%d"
,
TABLE_UID
(
pTable
),
TABLE_TID
(
pTable
),
ref
);
tsdbDebug
(
"unref table
%s uid:%"
PRIu64
" tid:%d, refCount:%d"
,
TABLE_CHAR_NAME
(
pTable
)
,
TABLE_UID
(
pTable
),
TABLE_TID
(
pTable
),
ref
);
if
(
ref
==
0
)
{
// tsdbDebug("destory table name:%s uid:%"PRIu64", tid:%d", TABLE_CHAR_NAME(pTable), TABLE_UID(pTable), TABLE_TID(pTable));
...
...
@@ -745,7 +745,7 @@ static STable *tsdbCreateTableFromCfg(STableCfg *pCfg, bool isSuper) {
T_REF_INC
(
pTable
);
tsdb
Trace
(
"table %s tid %d uid %"
PRIu64
" is created"
,
TABLE_CHAR_NAME
(
pTable
),
TABLE_TID
(
pTable
),
tsdb
Debug
(
"table %s tid %d uid %"
PRIu64
" is created"
,
TABLE_CHAR_NAME
(
pTable
),
TABLE_TID
(
pTable
),
TABLE_UID
(
pTable
));
return
pTable
;
...
...
@@ -889,7 +889,7 @@ static void tsdbRemoveTableFromMeta(STsdbRepo *pRepo, STable *pTable, bool rmFro
}
if
(
lock
)
tsdbUnlockRepoMeta
(
pRepo
);
tsdbDebug
(
"vgId:%d table %s
is removed from meta"
,
REPO_ID
(
pRepo
),
TABLE_CHAR_NAME
(
pTable
));
tsdbDebug
(
"vgId:%d table %s
uid %"
PRIu64
" is removed from meta"
,
REPO_ID
(
pRepo
),
TABLE_CHAR_NAME
(
pTable
),
TABLE_UID
(
pTable
));
tsdbUnRefTable
(
pTable
);
}
...
...
src/tsdb/src/tsdbRead.c
浏览文件 @
acd35eaf
...
...
@@ -2122,8 +2122,8 @@ STimeWindow changeTableGroupByLastrow(STableGroupInfo *groupList) {
}
// clear current group, unref unused table
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
++
i
)
{
STableKeyInfo
*
pKeyInfo
=
(
STableKeyInfo
*
)
taosArrayGet
(
pGroup
,
i
);
for
(
int32_t
i
=
0
;
i
<
numOfTables
;
++
i
)
{
STableKeyInfo
*
pKeyInfo
=
(
STableKeyInfo
*
)
taosArrayGet
(
pGroup
,
i
);
// keyInfo.pTable may be NULL here.
if
(
pKeyInfo
->
pTable
!=
keyInfo
.
pTable
)
{
...
...
src/util/inc/tref.h
浏览文件 @
acd35eaf
...
...
@@ -21,15 +21,42 @@
extern
"C"
{
#endif
int
taosOpenRef
(
int
max
,
void
(
*
fp
)(
void
*
));
// return refId which will be used by other APIs
// open an instance, return refId which will be used by other APIs
int
taosOpenRef
(
int
max
,
void
(
*
fp
)(
void
*
));
// close the Ref instance
void
taosCloseRef
(
int
refId
);
int
taosListRef
();
// return the number of references in system
// add ref, p is the pointer to resource or pointer ID
int
taosAddRef
(
int
refId
,
void
*
p
);
#define taosRemoveRef taosReleaseRef
// acquire ref, p is the pointer to resource or pointer ID
int
taosAcquireRef
(
int
refId
,
void
*
p
);
// release ref, p is the pointer to resource or pinter ID
void
taosReleaseRef
(
int
refId
,
void
*
p
);
#define taosRemoveRef taosReleaseRef
// return the first if p is null, otherwise return the next after p
void
*
taosIterateRef
(
int
refId
,
void
*
p
);
// return the number of references in system
int
taosListRef
();
/* sample code to iterate the refs
void demoIterateRefs(int refId) {
void *p = taosIterateRef(refId, NULL);
while (p) {
// process P
p = taosIterateRef(refId, p);
}
}
*/
#ifdef __cplusplus
}
...
...
src/util/src/tref.c
浏览文件 @
acd35eaf
...
...
@@ -143,8 +143,6 @@ int taosAddRef(int refId, void *p)
return
TSDB_CODE_REF_INVALID_ID
;
}
uTrace
(
"refId:%d p:%p try to add"
,
refId
,
p
);
pSet
=
tsRefSetList
+
refId
;
taosIncRefCount
(
pSet
);
if
(
pSet
->
state
!=
TSDB_REF_STATE_ACTIVE
)
{
...
...
@@ -203,8 +201,6 @@ int taosAcquireRef(int refId, void *p)
return
TSDB_CODE_REF_INVALID_ID
;
}
uTrace
(
"refId:%d p:%p try to acquire"
,
refId
,
p
);
pSet
=
tsRefSetList
+
refId
;
taosIncRefCount
(
pSet
);
if
(
pSet
->
state
!=
TSDB_REF_STATE_ACTIVE
)
{
...
...
@@ -254,8 +250,6 @@ void taosReleaseRef(int refId, void *p)
return
;
}
uTrace
(
"refId:%d p:%p try to release"
,
refId
,
p
);
pSet
=
tsRefSetList
+
refId
;
if
(
pSet
->
state
==
TSDB_REF_STATE_EMPTY
)
{
uTrace
(
"refId:%d p:%p failed to release, cleaned"
,
refId
,
p
);
...
...
@@ -305,6 +299,75 @@ void taosReleaseRef(int refId, void *p)
if
(
released
)
taosDecRefCount
(
pSet
);
}
// if p is NULL, return the first p in hash list, otherwise, return the next after p
void
*
taosIterateRef
(
int
refId
,
void
*
p
)
{
SRefNode
*
pNode
=
NULL
;
SRefSet
*
pSet
;
if
(
refId
<
0
||
refId
>=
TSDB_REF_OBJECTS
)
{
uTrace
(
"refId:%d p:%p failed to iterate, refId not valid"
,
refId
,
p
);
return
NULL
;
}
pSet
=
tsRefSetList
+
refId
;
taosIncRefCount
(
pSet
);
if
(
pSet
->
state
!=
TSDB_REF_STATE_ACTIVE
)
{
uTrace
(
"refId:%d p:%p failed to iterate, not active"
,
refId
,
p
);
taosDecRefCount
(
pSet
);
return
NULL
;
}
int
hash
=
0
;
if
(
p
)
{
hash
=
taosHashRef
(
pSet
,
p
);
taosLockList
(
pSet
->
lockedBy
+
hash
);
pNode
=
pSet
->
nodeList
[
hash
];
while
(
pNode
)
{
if
(
pNode
->
p
==
p
)
break
;
pNode
=
pNode
->
next
;
}
if
(
pNode
==
NULL
)
{
uError
(
"refId:%d p:%p not there, quit"
,
refId
,
p
);
taosUnlockList
(
pSet
->
lockedBy
+
hash
);
return
NULL
;
}
// p is there
pNode
=
pNode
->
next
;
if
(
pNode
==
NULL
)
{
taosUnlockList
(
pSet
->
lockedBy
+
hash
);
hash
++
;
}
}
if
(
pNode
==
NULL
)
{
for
(;
hash
<
pSet
->
max
;
++
hash
)
{
taosLockList
(
pSet
->
lockedBy
+
hash
);
pNode
=
pSet
->
nodeList
[
hash
];
if
(
pNode
)
break
;
taosUnlockList
(
pSet
->
lockedBy
+
hash
);
}
}
void
*
newP
=
NULL
;
if
(
pNode
)
{
pNode
->
count
++
;
// acquire it
newP
=
pNode
->
p
;
taosUnlockList
(
pSet
->
lockedBy
+
hash
);
uTrace
(
"refId:%d p:%p is returned"
,
refId
,
p
);
}
else
{
uTrace
(
"refId:%d p:%p the list is over"
,
refId
,
p
);
}
if
(
p
)
taosReleaseRef
(
refId
,
p
);
// release the current one
taosDecRefCount
(
pSet
);
return
newP
;
}
int
taosListRef
()
{
SRefSet
*
pSet
;
SRefNode
*
pNode
;
...
...
src/util/tests/trefTest.c
浏览文件 @
acd35eaf
...
...
@@ -17,6 +17,19 @@ typedef struct {
void
**
p
;
}
SRefSpace
;
void
iterateRefs
(
int
refId
)
{
int
count
=
0
;
void
*
p
=
taosIterateRef
(
refId
,
NULL
);
while
(
p
)
{
// process P
count
++
;
p
=
taosIterateRef
(
refId
,
p
);
}
printf
(
" %d "
,
count
);
}
void
*
takeRefActions
(
void
*
param
)
{
SRefSpace
*
pSpace
=
(
SRefSpace
*
)
param
;
int
code
,
id
;
...
...
@@ -44,6 +57,9 @@ void *takeRefActions(void *param) {
usleep
(
id
%
5
+
1
);
taosReleaseRef
(
pSpace
->
refId
,
pSpace
->
p
[
id
]);
}
id
=
random
()
%
pSpace
->
refNum
;
iterateRefs
(
id
);
}
for
(
int
i
=
0
;
i
<
pSpace
->
refNum
;
++
i
)
{
...
...
@@ -63,7 +79,7 @@ void *openRefSpace(void *param) {
SRefSpace
*
pSpace
=
(
SRefSpace
*
)
param
;
printf
(
"c"
);
pSpace
->
refId
=
taosOpenRef
(
1000
0
,
myfree
);
pSpace
->
refId
=
taosOpenRef
(
5
0
,
myfree
);
if
(
pSpace
->
refId
<
0
)
{
printf
(
"failed to open ref, reson:%s
\n
"
,
tstrerror
(
pSpace
->
refId
));
...
...
tests/pytest/fulltest.sh
浏览文件 @
acd35eaf
...
...
@@ -154,6 +154,7 @@ python3 ./test.py -f query/queryConnection.py
python3 ./test.py
-f
query/queryCountCSVData.py
python3 ./test.py
-f
query/natualInterval.py
python3 ./test.py
-f
query/bug1471.py
python3 ./test.py
-f
query/dataLossTest.py
#stream
python3 ./test.py
-f
stream/metric_1.py
...
...
tests/pytest/query/dataLossTest.py
0 → 100644
浏览文件 @
acd35eaf
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import
sys
import
taos
import
os
from
util.log
import
*
from
util.cases
import
*
from
util.sql
import
*
from
util.dnodes
import
*
import
inspect
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
())
self
.
numberOfTables
=
240
self
.
numberOfRecords
=
10000
def
run
(
self
):
tdSql
.
prepare
()
os
.
system
(
"yes | taosdemo -t %d -n %d"
%
(
self
.
numberOfTables
,
self
.
numberOfRecords
))
print
(
"==============step1"
)
tdSql
.
execute
(
"use test"
)
sql
=
"select count(*) from meters"
tdSql
.
query
(
sql
)
rows
=
tdSql
.
getData
(
0
,
0
)
print
(
"number of records: %d"
%
rows
)
newRows
=
rows
for
i
in
range
(
10000
):
print
(
"kill taosd"
)
time
.
sleep
(
10
)
os
.
system
(
"sudo kill -9 $(pgrep taosd)"
)
tdDnodes
.
startWithoutSleep
(
1
)
while
True
:
try
:
tdSql
.
query
(
sql
)
newRows
=
tdSql
.
getData
(
0
,
0
)
print
(
"numer of records after kill taosd %d"
%
newRows
)
time
.
sleep
(
10
)
break
except
Exception
as
e
:
pass
continue
if
newRows
<
rows
:
caller
=
inspect
.
getframeinfo
(
inspect
.
stack
()[
1
][
0
])
args
=
(
caller
.
filename
,
caller
.
lineno
,
sql
,
newRows
,
rows
)
tdLog
.
exit
(
"%s(%d) failed: sql:%s, queryRows:%d != expect:%d"
%
args
)
break
tdSql
.
query
(
sql
)
tdSql
.
checkData
(
0
,
0
,
rows
)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tests/pytest/util/dnodes.py
浏览文件 @
acd35eaf
...
...
@@ -15,6 +15,7 @@ import sys
import
os
import
os.path
import
subprocess
from
time
import
sleep
from
util.log
import
*
...
...
@@ -210,6 +211,7 @@ class TDDnode:
(
self
.
index
,
self
.
cfgPath
))
def
getBuildPath
(
self
):
buildPath
=
""
selfPath
=
os
.
path
.
dirname
(
os
.
path
.
realpath
(
__file__
))
if
(
"community"
in
selfPath
):
...
...
@@ -256,6 +258,35 @@ class TDDnode:
tdLog
.
debug
(
"wait 5 seconds for the dnode:%d to start."
%
(
self
.
index
))
time
.
sleep
(
5
)
def
startWithoutSleep
(
self
):
buildPath
=
self
.
getBuildPath
()
if
(
buildPath
==
""
):
tdLog
.
exit
(
"taosd not found!"
)
else
:
tdLog
.
info
(
"taosd found in %s"
%
buildPath
)
binPath
=
buildPath
+
"/build/bin/taosd"
if
self
.
deployed
==
0
:
tdLog
.
exit
(
"dnode:%d is not deployed"
%
(
self
.
index
))
if
self
.
valgrind
==
0
:
cmd
=
"nohup %s -c %s > /dev/null 2>&1 & "
%
(
binPath
,
self
.
cfgDir
)
else
:
valgrindCmdline
=
"valgrind --tool=memcheck --leak-check=full --show-reachable=no --track-origins=yes --show-leak-kinds=all -v --workaround-gcc296-bugs=yes"
cmd
=
"nohup %s %s -c %s 2>&1 & "
%
(
valgrindCmdline
,
binPath
,
self
.
cfgDir
)
print
(
cmd
)
if
os
.
system
(
cmd
)
!=
0
:
tdLog
.
exit
(
cmd
)
self
.
running
=
1
tdLog
.
debug
(
"dnode:%d is running with %s "
%
(
self
.
index
,
cmd
))
def
stop
(
self
):
if
self
.
valgrind
==
0
:
...
...
@@ -425,6 +456,10 @@ class TDDnodes:
def
start
(
self
,
index
):
self
.
check
(
index
)
self
.
dnodes
[
index
-
1
].
start
()
def
startWithoutSleep
(
self
,
index
):
self
.
check
(
index
)
self
.
dnodes
[
index
-
1
].
startWithoutSleep
()
def
stop
(
self
,
index
):
self
.
check
(
index
)
...
...
tests/pytest/util/sql.py
浏览文件 @
acd35eaf
...
...
@@ -25,7 +25,7 @@ class TDSql:
self
.
queryCols
=
0
self
.
affectedRows
=
0
def
init
(
self
,
cursor
,
log
=
Tru
e
):
def
init
(
self
,
cursor
,
log
=
Fals
e
):
self
.
cursor
=
cursor
if
(
log
):
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录