Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
a0c043c4
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
a0c043c4
编写于
6月 27, 2022
作者:
C
cpwu
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into cpwu/3.0
上级
1627a536
fb50977b
变更
29
隐藏空白更改
内联
并排
Showing
29 changed file
with
410 addition
and
120 deletion
+410
-120
Jenkinsfile2
Jenkinsfile2
+8
-0
include/client/taos.h
include/client/taos.h
+1
-0
include/libs/sync/sync.h
include/libs/sync/sync.h
+2
-2
include/libs/sync/syncTools.h
include/libs/sync/syncTools.h
+2
-2
source/client/src/clientMain.c
source/client/src/clientMain.c
+13
-0
source/dnode/mnode/impl/src/mndMain.c
source/dnode/mnode/impl/src/mndMain.c
+2
-2
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+37
-32
source/libs/executor/inc/executorimpl.h
source/libs/executor/inc/executorimpl.h
+2
-2
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+2
-2
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+3
-3
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+26
-6
source/libs/sync/inc/syncEnv.h
source/libs/sync/inc/syncEnv.h
+6
-6
source/libs/sync/inc/syncIO.h
source/libs/sync/inc/syncIO.h
+1
-1
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+2
-1
source/libs/sync/inc/syncRaftCfg.h
source/libs/sync/inc/syncRaftCfg.h
+5
-5
source/libs/sync/inc/syncRaftLog.h
source/libs/sync/inc/syncRaftLog.h
+5
-2
source/libs/sync/src/syncCommit.c
source/libs/sync/src/syncCommit.c
+1
-0
source/libs/sync/src/syncIO.c
source/libs/sync/src/syncIO.c
+4
-4
source/libs/sync/src/syncIndexMgr.c
source/libs/sync/src/syncIndexMgr.c
+1
-1
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+68
-21
source/libs/sync/src/syncRaftCfg.c
source/libs/sync/src/syncRaftCfg.c
+4
-4
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+13
-0
source/libs/sync/src/syncRaftStore.c
source/libs/sync/src/syncRaftStore.c
+1
-1
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+5
-5
source/libs/sync/src/syncVoteMgr.c
source/libs/sync/src/syncVoteMgr.c
+2
-2
source/libs/sync/test/syncRaftLogTest2.cpp
source/libs/sync/test/syncRaftLogTest2.cpp
+12
-15
source/libs/sync/test/syncRaftLogTest3.cpp
source/libs/sync/test/syncRaftLogTest3.cpp
+1
-1
tests/system-test/7-tmq/tmqCheckData1.py
tests/system-test/7-tmq/tmqCheckData1.py
+180
-0
tests/system-test/fulltest.sh
tests/system-test/fulltest.sh
+1
-0
未找到文件。
Jenkinsfile2
浏览文件 @
a0c043c4
...
...
@@ -338,6 +338,14 @@ pipeline {
changeRequest()
}
steps {
script {
def linux_node_ip = sh (
script: 'ip addr|grep 192|grep -v virbr|awk "{print \\\$2}"|sed "s/\\/.*//"',
returnStdout: true
).trim()
echo "${linux_node_ip}"
echo "${WKDIR}/restore.sh -p ${BRANCH_NAME} -n ${BUILD_ID} -c {container name}"
}
catchError(buildResult: 'FAILURE', stageResult: 'FAILURE') {
timeout(time: 120, unit: 'MINUTES'){
pre_test()
...
...
include/client/taos.h
浏览文件 @
a0c043c4
...
...
@@ -128,6 +128,7 @@ typedef struct setConfRet {
DLL_EXPORT
void
taos_cleanup
(
void
);
DLL_EXPORT
int
taos_options
(
TSDB_OPTION
option
,
const
void
*
arg
,
...);
DLL_EXPORT
setConfRet
taos_set_config
(
const
char
*
config
);
DLL_EXPORT
int
taos_init
(
void
);
DLL_EXPORT
TAOS
*
taos_connect
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
db
,
uint16_t
port
);
DLL_EXPORT
TAOS
*
taos_connect_auth
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
);
...
...
include/libs/sync/sync.h
浏览文件 @
a0c043c4
...
...
@@ -26,9 +26,9 @@ extern "C" {
extern
bool
gRaftDetailLog
;
#define SYNC_INDEX_BEGIN
0
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
#define SYNC_TERM_INVALID
0xFFFFFFFFFFFFFFFF
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
typedef
uint64_t
SyncNodeId
;
typedef
int32_t
SyncGroupId
;
...
...
include/libs/sync/syncTools.h
浏览文件 @
a0c043c4
...
...
@@ -524,7 +524,7 @@ void syncReconfigFinishLog2(char* s, const SyncReconfigFinish* pMsg);
int32_t
syncNodeOnPingCb
(
SSyncNode
*
ths
,
SyncPing
*
pMsg
);
int32_t
syncNodeOnPingReplyCb
(
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
);
int32_t
syncNodeOnTimeoutCb
(
SSyncNode
*
ths
,
SyncTimeout
*
pMsg
);
int32_t
syncNodeOnClientRequestCb
(
SSyncNode
*
ths
,
SyncClientRequest
*
pMsg
);
int32_t
syncNodeOnClientRequestCb
(
SSyncNode
*
ths
,
SyncClientRequest
*
pMsg
,
SyncIndex
*
pRetIndex
);
int32_t
syncNodeOnRequestVoteCb
(
SSyncNode
*
ths
,
SyncRequestVote
*
pMsg
);
int32_t
syncNodeOnRequestVoteReplyCb
(
SSyncNode
*
ths
,
SyncRequestVoteReply
*
pMsg
);
int32_t
syncNodeOnAppendEntriesCb
(
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
);
...
...
@@ -541,7 +541,7 @@ int32_t syncNodeOnSnapshotRspCb(SSyncNode* ths, SyncSnapshotRsp* pMsg);
// -----------------------------------------
typedef
int32_t
(
*
FpOnPingCb
)(
SSyncNode
*
ths
,
SyncPing
*
pMsg
);
typedef
int32_t
(
*
FpOnPingReplyCb
)(
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
);
typedef
int32_t
(
*
FpOnClientRequestCb
)(
SSyncNode
*
ths
,
SyncClientRequest
*
pMsg
);
typedef
int32_t
(
*
FpOnClientRequestCb
)(
SSyncNode
*
ths
,
SyncClientRequest
*
pMsg
,
SyncIndex
*
pRetIndex
);
typedef
int32_t
(
*
FpOnRequestVoteCb
)(
SSyncNode
*
ths
,
SyncRequestVote
*
pMsg
);
typedef
int32_t
(
*
FpOnRequestVoteReplyCb
)(
SSyncNode
*
ths
,
SyncRequestVoteReply
*
pMsg
);
typedef
int32_t
(
*
FpOnAppendEntriesCb
)(
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
);
...
...
source/client/src/clientMain.c
浏览文件 @
a0c043c4
...
...
@@ -81,6 +81,19 @@ void taos_cleanup(void) {
taosCloseLog
();
}
static
setConfRet
taos_set_config_imp
(
const
char
*
config
){
setConfRet
ret
=
{
SET_CONF_RET_SUCC
,
{
0
}};
// TODO: need re-implementation
return
ret
;
}
setConfRet
taos_set_config
(
const
char
*
config
){
// TODO pthread_mutex_lock(&setConfMutex);
setConfRet
ret
=
taos_set_config_imp
(
config
);
// pthread_mutex_unlock(&setConfMutex);
return
ret
;
}
TAOS
*
taos_connect
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
db
,
uint16_t
port
)
{
tscDebug
(
"try to connect to %s:%u, user:%s db:%s"
,
ip
,
port
,
user
,
db
);
if
(
user
==
NULL
)
{
...
...
source/dnode/mnode/impl/src/mndMain.c
浏览文件 @
a0c043c4
...
...
@@ -442,7 +442,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
syncPingReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_CLIENT_REQUEST
)
{
SyncClientRequest
*
pSyncMsg
=
syncClientRequestFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnClientRequestCb
(
pSyncNode
,
pSyncMsg
);
code
=
syncNodeOnClientRequestCb
(
pSyncNode
,
pSyncMsg
,
NULL
);
syncClientRequestDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE
)
{
SyncRequestVote
*
pSyncMsg
=
syncRequestVoteFromRpcMsg2
(
pMsg
);
...
...
@@ -491,7 +491,7 @@ int32_t mndProcessSyncMsg(SRpcMsg *pMsg) {
syncPingReplyDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_CLIENT_REQUEST
)
{
SyncClientRequest
*
pSyncMsg
=
syncClientRequestFromRpcMsg2
(
pMsg
);
code
=
syncNodeOnClientRequestCb
(
pSyncNode
,
pSyncMsg
);
code
=
syncNodeOnClientRequestCb
(
pSyncNode
,
pSyncMsg
,
NULL
);
syncClientRequestDestroy
(
pSyncMsg
);
}
else
if
(
pMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE
)
{
SyncRequestVote
*
pSyncMsg
=
syncRequestVoteFromRpcMsg2
(
pMsg
);
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
a0c043c4
...
...
@@ -137,6 +137,25 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
vError
(
"vgId:%d, failed to pre-process msg:%p since %s"
,
vgId
,
pMsg
,
terrstr
());
}
else
{
code
=
syncPropose
(
pVnode
->
sync
,
pMsg
,
vnodeIsMsgWeak
(
pMsg
->
msgType
));
if
(
code
==
1
)
{
do
{
static
int32_t
cnt
=
0
;
if
(
cnt
++
%
1000
==
1
)
{
vInfo
(
"vgId:%d, msg:%p apply right now, apply index:%ld, msgtype:%s,%d"
,
vgId
,
pMsg
,
pMsg
->
info
.
conn
.
applyIndex
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
}
}
while
(
0
);
SRpcMsg
rsp
=
{.
code
=
pMsg
->
code
,
.
info
=
pMsg
->
info
};
if
(
vnodeProcessWriteReq
(
pVnode
,
pMsg
,
pMsg
->
info
.
conn
.
applyIndex
,
&
rsp
)
<
0
)
{
rsp
.
code
=
terrno
;
vInfo
(
"vgId:%d, msg:%p failed to apply right now since %s"
,
vgId
,
pMsg
,
terrstr
());
}
if
(
rsp
.
info
.
handle
!=
NULL
)
{
tmsgSendRsp
(
&
rsp
);
}
}
}
}
...
...
@@ -163,10 +182,12 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
SRpcMsg
rsp
=
{.
code
=
TSDB_CODE_RPC_REDIRECT
,
.
info
=
pMsg
->
info
};
tmsgSendRedirectRsp
(
&
rsp
,
&
newEpSet
);
}
else
{
if
(
terrno
!=
0
)
code
=
terrno
;
vError
(
"vgId:%d, msg:%p failed to propose since %s, code:0x%x"
,
vgId
,
pMsg
,
tstrerror
(
code
),
code
);
SRpcMsg
rsp
=
{.
code
=
code
,
.
info
=
pMsg
->
info
};
tmsgSendRsp
(
&
rsp
);
if
(
code
!=
1
)
{
if
(
terrno
!=
0
)
code
=
terrno
;
vError
(
"vgId:%d, msg:%p failed to propose since %s, code:0x%x"
,
vgId
,
pMsg
,
tstrerror
(
code
),
code
);
SRpcMsg
rsp
=
{.
code
=
code
,
.
info
=
pMsg
->
info
};
tmsgSendRsp
(
&
rsp
);
}
}
vGTrace
(
"vgId:%d, msg:%p is freed, code:0x%x"
,
vgId
,
pMsg
,
code
);
...
...
@@ -260,7 +281,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
SyncClientRequest
*
pSyncMsg
=
syncClientRequestFromRpcMsg2
(
pRpcMsg
);
assert
(
pSyncMsg
!=
NULL
);
ret
=
syncNodeOnClientRequestCb
(
pSyncNode
,
pSyncMsg
);
ret
=
syncNodeOnClientRequestCb
(
pSyncNode
,
pSyncMsg
,
NULL
);
syncClientRequestDestroy
(
pSyncMsg
);
}
else
if
(
pRpcMsg
->
msgType
==
TDMT_SYNC_REQUEST_VOTE
)
{
...
...
@@ -359,34 +380,18 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
SyncIndex
beginIndex
=
SYNC_INDEX_INVALID
;
char
logBuf
[
256
]
=
{
0
};
if
(
pFsm
->
FpGetSnapshotInfo
!=
NULL
)
{
(
*
pFsm
->
FpGetSnapshotInfo
)(
pFsm
,
&
snapshot
);
beginIndex
=
snapshot
.
lastApplyIndex
;
}
if
(
cbMeta
.
index
>
beginIndex
)
{
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncUtilState2String
(
cbMeta
.
state
),
beginIndex
);
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
SRpcMsg
rpcMsg
=
{.
msgType
=
pMsg
->
msgType
,
.
contLen
=
pMsg
->
contLen
};
rpcMsg
.
pCont
=
rpcMallocCont
(
rpcMsg
.
contLen
);
memcpy
(
rpcMsg
.
pCont
,
pMsg
->
pCont
,
pMsg
->
contLen
);
syncGetAndDelRespRpc
(
pVnode
->
sync
,
cbMeta
.
seqNum
,
&
rpcMsg
.
info
);
rpcMsg
.
info
.
conn
.
applyIndex
=
cbMeta
.
index
;
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
APPLY_QUEUE
,
&
rpcMsg
);
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==CommitCb== execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, beginIndex :%ld
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncUtilState2String
(
cbMeta
.
state
),
beginIndex
);
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
else
{
char
logBuf
[
256
]
=
{
0
};
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==CommitCb== do not execute, pFsm:%p, index:%ld, isWeak:%d, code:%d, state:%d %s, "
"beginIndex :%ld
\n
"
,
pFsm
,
cbMeta
.
index
,
cbMeta
.
isWeak
,
cbMeta
.
code
,
cbMeta
.
state
,
syncUtilState2String
(
cbMeta
.
state
),
beginIndex
);
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
SRpcMsg
rpcMsg
=
{.
msgType
=
pMsg
->
msgType
,
.
contLen
=
pMsg
->
contLen
};
rpcMsg
.
pCont
=
rpcMallocCont
(
rpcMsg
.
contLen
);
memcpy
(
rpcMsg
.
pCont
,
pMsg
->
pCont
,
pMsg
->
contLen
);
syncGetAndDelRespRpc
(
pVnode
->
sync
,
cbMeta
.
seqNum
,
&
rpcMsg
.
info
);
rpcMsg
.
info
.
conn
.
applyIndex
=
cbMeta
.
index
;
tmsgPutToQueue
(
&
pVnode
->
msgCb
,
APPLY_QUEUE
,
&
rpcMsg
);
}
static
void
vnodeSyncPreCommitMsg
(
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
...
...
source/libs/executor/inc/executorimpl.h
浏览文件 @
a0c043c4
...
...
@@ -403,7 +403,7 @@ typedef struct SIntervalAggOperatorInfo {
// SOptrBasicInfo should be first, SAggSupporter should be second for stream encode
SOptrBasicInfo
binfo
;
// basic info
SAggSupporter
aggSup
;
// aggregate supporter
SExprSupp
scalarSupp
;
// supporter for perform scalar function
SGroupResInfo
groupResInfo
;
// multiple results build supporter
SInterval
interval
;
// interval info
int32_t
primaryTsIndex
;
// primary time stamp slot id from result of downstream operator.
...
...
@@ -738,7 +738,7 @@ SOperatorInfo* createSortedMergeOperatorInfo(SOperatorInfo** downstream, int32_t
SOperatorInfo
*
createIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
,
bool
isStream
);
STimeWindowAggSupp
*
pTwAggSupp
,
SIntervalPhysiNode
*
pPhyNode
,
SExecTaskInfo
*
pTaskInfo
,
bool
isStream
);
SOperatorInfo
*
createMergeIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
a0c043c4
...
...
@@ -4224,8 +4224,8 @@ SOperatorInfo* createOperatorTree(SPhysiNode* pPhyNode, SExecTaskInfo* pTaskInfo
int32_t
tsSlotId
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
bool
isStream
=
(
QUERY_NODE_PHYSICAL_PLAN_STREAM_INTERVAL
==
type
);
pOptr
=
createIntervalOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
interval
,
tsSlotId
,
&
as
,
pTaskInfo
,
isStream
);
pOptr
=
createIntervalOperatorInfo
(
ops
[
0
],
pExprInfo
,
num
,
pResBlock
,
&
interval
,
tsSlotId
,
&
as
,
pIntervalPhyNode
,
pTaskInfo
,
isStream
);
}
else
if
(
QUERY_NODE_PHYSICAL_PLAN_MERGE_ALIGNED_INTERVAL
==
type
)
{
SMergeAlignedIntervalPhysiNode
*
pIntervalPhyNode
=
(
SMergeAlignedIntervalPhysiNode
*
)
pPhyNode
;
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
a0c043c4
...
...
@@ -448,7 +448,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
qDebug
(
"%s start to repeat ascending order scan data blocks due to query func required"
,
GET_TASKID
(
pTaskInfo
));
for
(
int32_t
i
=
0
;
i
<
pTableScanInfo
->
cond
.
numOfTWindows
;
++
i
)
{
STimeWindow
*
pWin
=
&
pTableScanInfo
->
cond
.
twindows
[
i
];
qDebug
(
"%s
\t
qrange:%"
PRId64
"-%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pWin
->
skey
,
pWin
->
ekey
);
qDebug
(
"%s qrange:%"
PRId64
"-%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pWin
->
skey
,
pWin
->
ekey
);
}
// do prepare for the next round table scan operation
tsdbResetReadHandle
(
pTableScanInfo
->
dataReader
,
&
pTableScanInfo
->
cond
,
0
);
...
...
@@ -467,7 +467,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
qDebug
(
"%s start to descending order scan data blocks due to query func required"
,
GET_TASKID
(
pTaskInfo
));
for
(
int32_t
i
=
0
;
i
<
pTableScanInfo
->
cond
.
numOfTWindows
;
++
i
)
{
STimeWindow
*
pWin
=
&
pTableScanInfo
->
cond
.
twindows
[
i
];
qDebug
(
"%s
\t
qrange:%"
PRId64
"-%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pWin
->
skey
,
pWin
->
ekey
);
qDebug
(
"%s qrange:%"
PRId64
"-%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pWin
->
skey
,
pWin
->
ekey
);
}
while
(
pTableScanInfo
->
scanTimes
<
total
)
{
...
...
@@ -492,7 +492,7 @@ static SSDataBlock* doTableScanGroup(SOperatorInfo* pOperator) {
GET_TASKID
(
pTaskInfo
));
for
(
int32_t
i
=
0
;
i
<
pTableScanInfo
->
cond
.
numOfTWindows
;
++
i
)
{
STimeWindow
*
pWin
=
&
pTableScanInfo
->
cond
.
twindows
[
i
];
qDebug
(
"%s
\t
qrange:%"
PRId64
"-%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pWin
->
skey
,
pWin
->
ekey
);
qDebug
(
"%s qrange:%"
PRId64
"-%"
PRId64
,
GET_TASKID
(
pTaskInfo
),
pWin
->
skey
,
pWin
->
ekey
);
}
tsdbResetReadHandle
(
pTableScanInfo
->
dataReader
,
&
pTableScanInfo
->
cond
,
0
);
pTableScanInfo
->
curTWinIdx
=
0
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
a0c043c4
...
...
@@ -969,6 +969,12 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
getTableScanInfo
(
pOperator
,
&
pInfo
->
order
,
&
scanFlag
);
if
(
pInfo
->
scalarSupp
.
pExprInfo
!=
NULL
)
{
SExprSupp
*
pExprSup
=&
pInfo
->
scalarSupp
;
projectApplyFunctions
(
pExprSup
->
pExprInfo
,
pBlock
,
pBlock
,
pExprSup
->
pCtx
,
pExprSup
->
numOfExprs
,
NULL
);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
pInfo
->
order
,
scanFlag
,
true
);
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
NULL
);
...
...
@@ -1381,6 +1387,11 @@ static SSDataBlock* doStreamIntervalAgg(SOperatorInfo* pOperator) {
continue
;
}
if
(
pInfo
->
scalarSupp
.
pExprInfo
!=
NULL
)
{
SExprSupp
*
pExprSup
=
&
pInfo
->
scalarSupp
;
projectApplyFunctions
(
pExprSup
->
pExprInfo
,
pBlock
,
pBlock
,
pExprSup
->
pCtx
,
pExprSup
->
numOfExprs
,
NULL
);
}
// The timewindow that overlaps the timestamps of the input pBlock need to be recalculated and return to the
// caller. Note that all the time window are not close till now.
// the pDataBlock are always the same one, no need to call this again
...
...
@@ -1498,18 +1509,27 @@ void increaseTs(SqlFunctionCtx* pCtx) {
SOperatorInfo
*
createIntervalOperatorInfo
(
SOperatorInfo
*
downstream
,
SExprInfo
*
pExprInfo
,
int32_t
numOfCols
,
SSDataBlock
*
pResBlock
,
SInterval
*
pInterval
,
int32_t
primaryTsSlotId
,
STimeWindowAggSupp
*
pTwAggSupp
,
SExecTaskInfo
*
pTaskInfo
,
bool
isStream
)
{
STimeWindowAggSupp
*
pTwAggSupp
,
S
IntervalPhysiNode
*
pPhyNode
,
S
ExecTaskInfo
*
pTaskInfo
,
bool
isStream
)
{
SIntervalAggOperatorInfo
*
pInfo
=
taosMemoryCalloc
(
1
,
sizeof
(
SIntervalAggOperatorInfo
));
SOperatorInfo
*
pOperator
=
taosMemoryCalloc
(
1
,
sizeof
(
SOperatorInfo
));
if
(
pInfo
==
NULL
||
pOperator
==
NULL
)
{
goto
_error
;
}
pInfo
->
win
=
pTaskInfo
->
window
;
pInfo
->
order
=
TSDB_ORDER_ASC
;
pInfo
->
interval
=
*
pInterval
;
pInfo
->
win
=
pTaskInfo
->
window
;
pInfo
->
order
=
TSDB_ORDER_ASC
;
pInfo
->
interval
=
*
pInterval
;
pInfo
->
execModel
=
pTaskInfo
->
execModel
;
pInfo
->
twAggSup
=
*
pTwAggSupp
;
pInfo
->
twAggSup
=
*
pTwAggSupp
;
if
(
pPhyNode
->
window
.
pExprs
!=
NULL
)
{
int32_t
numOfScalar
=
0
;
SExprInfo
*
pScalarExprInfo
=
createExprInfo
(
pPhyNode
->
window
.
pExprs
,
NULL
,
&
numOfScalar
);
int32_t
code
=
initExprSupp
(
&
pInfo
->
scalarSupp
,
pScalarExprInfo
,
numOfScalar
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
}
}
pInfo
->
primaryTsIndex
=
primaryTsSlotId
;
...
...
@@ -2473,7 +2493,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
if
(
IS_FINAL_OP
(
pInfo
))
{
int32_t
childIndex
=
getChildIndex
(
pBlock
);
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
childIndex
);
S
IntervalAgg
OperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
S
StreamFinalInterval
OperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
SExprSupp
*
pChildSup
=
&
pChildOp
->
exprSupp
;
doClearWindows
(
&
pChildInfo
->
aggSup
,
pChildSup
,
&
pChildInfo
->
interval
,
pChildInfo
->
primaryTsIndex
,
...
...
source/libs/sync/inc/syncEnv.h
浏览文件 @
a0c043c4
...
...
@@ -28,13 +28,13 @@ extern "C" {
#include "trpc.h"
#include "ttimer.h"
#define TIMER_MAX_MS
0x7FFFFFFF
#define ENV_TICK_TIMER_MS
1000
#define PING_TIMER_MS
1000
#define ELECT_TIMER_MS_MIN
1300
#define ELECT_TIMER_MS_MAX
(ELECT_TIMER_MS_MIN * 2)
#define TIMER_MAX_MS 0x7FFFFFFF
#define ENV_TICK_TIMER_MS 1000
#define PING_TIMER_MS 1000
#define ELECT_TIMER_MS_MIN 1300
#define ELECT_TIMER_MS_MAX (ELECT_TIMER_MS_MIN * 2)
#define ELECT_TIMER_MS_RANGE (ELECT_TIMER_MS_MAX - ELECT_TIMER_MS_MIN)
#define HEARTBEAT_TIMER_MS
900
#define HEARTBEAT_TIMER_MS 900
#define EMPTY_RAFT_ID ((SRaftId){.addr = 0, .vgId = 0})
...
...
source/libs/sync/inc/syncIO.h
浏览文件 @
a0c043c4
...
...
@@ -50,7 +50,7 @@ typedef struct SSyncIO {
void
*
pSyncNode
;
int32_t
(
*
FpOnSyncPing
)(
SSyncNode
*
pSyncNode
,
SyncPing
*
pMsg
);
int32_t
(
*
FpOnSyncPingReply
)(
SSyncNode
*
pSyncNode
,
SyncPingReply
*
pMsg
);
int32_t
(
*
FpOnSyncClientRequest
)(
SSyncNode
*
pSyncNode
,
SyncClientRequest
*
pMsg
);
int32_t
(
*
FpOnSyncClientRequest
)(
SSyncNode
*
pSyncNode
,
SyncClientRequest
*
pMsg
,
SyncIndex
*
pRetIndex
);
int32_t
(
*
FpOnSyncRequestVote
)(
SSyncNode
*
pSyncNode
,
SyncRequestVote
*
pMsg
);
int32_t
(
*
FpOnSyncRequestVoteReply
)(
SSyncNode
*
pSyncNode
,
SyncRequestVoteReply
*
pMsg
);
int32_t
(
*
FpOnSyncAppendEntries
)(
SSyncNode
*
pSyncNode
,
SyncAppendEntries
*
pMsg
);
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
a0c043c4
...
...
@@ -169,7 +169,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo);
void
syncNodeStart
(
SSyncNode
*
pSyncNode
);
void
syncNodeStartStandBy
(
SSyncNode
*
pSyncNode
);
void
syncNodeClose
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodePropose
(
SSyncNode
*
pSyncNode
,
const
SRpcMsg
*
pMsg
,
bool
isWeak
);
int32_t
syncNodePropose
(
SSyncNode
*
pSyncNode
,
SRpcMsg
*
pMsg
,
bool
isWeak
);
// option
bool
syncNodeSnapshotEnable
(
SSyncNode
*
pSyncNode
);
...
...
@@ -233,6 +233,7 @@ SyncIndex syncNodeGetPreIndex(SSyncNode* pSyncNode, SyncIndex index);
SyncTerm
syncNodeGetPreTerm
(
SSyncNode
*
pSyncNode
,
SyncIndex
index
);
int32_t
syncNodeGetPreIndexTerm
(
SSyncNode
*
pSyncNode
,
SyncIndex
index
,
SyncIndex
*
pPreIndex
,
SyncTerm
*
pPreTerm
);
bool
syncNodeIsOptimizedOneReplica
(
SSyncNode
*
ths
,
SRpcMsg
*
pMsg
);
int32_t
syncNodeCommit
(
SSyncNode
*
ths
,
SyncIndex
beginIndex
,
SyncIndex
endIndex
,
uint64_t
flag
);
int32_t
syncNodeUpdateNewConfigIndex
(
SSyncNode
*
ths
,
SSyncCfg
*
pNewCfg
);
...
...
source/libs/sync/inc/syncRaftCfg.h
浏览文件 @
a0c043c4
...
...
@@ -49,14 +49,14 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg);
int32_t
raftCfgPersist
(
SRaftCfg
*
pRaftCfg
);
int32_t
raftCfgAddConfigIndex
(
SRaftCfg
*
pRaftCfg
,
SyncIndex
configIndex
);
cJSON
*
syncCfg2Json
(
SSyncCfg
*
pSyncCfg
);
char
*
syncCfg2Str
(
SSyncCfg
*
pSyncCfg
);
char
*
syncCfg2SimpleStr
(
SSyncCfg
*
pSyncCfg
);
cJSON
*
syncCfg2Json
(
SSyncCfg
*
pSyncCfg
);
char
*
syncCfg2Str
(
SSyncCfg
*
pSyncCfg
);
char
*
syncCfg2SimpleStr
(
SSyncCfg
*
pSyncCfg
);
int32_t
syncCfgFromJson
(
const
cJSON
*
pRoot
,
SSyncCfg
*
pSyncCfg
);
int32_t
syncCfgFromStr
(
const
char
*
s
,
SSyncCfg
*
pSyncCfg
);
cJSON
*
raftCfg2Json
(
SRaftCfg
*
pRaftCfg
);
char
*
raftCfg2Str
(
SRaftCfg
*
pRaftCfg
);
cJSON
*
raftCfg2Json
(
SRaftCfg
*
pRaftCfg
);
char
*
raftCfg2Str
(
SRaftCfg
*
pRaftCfg
);
int32_t
raftCfgFromJson
(
const
cJSON
*
pRoot
,
SRaftCfg
*
pRaftCfg
);
int32_t
raftCfgFromStr
(
const
char
*
s
,
SRaftCfg
*
pRaftCfg
);
...
...
source/libs/sync/inc/syncRaftLog.h
浏览文件 @
a0c043c4
...
...
@@ -29,9 +29,12 @@ extern "C" {
#include "wal.h"
typedef
struct
SSyncLogStoreData
{
SSyncNode
*
pSyncNode
;
SWal
*
pWal
;
SSyncNode
*
pSyncNode
;
SWal
*
pWal
;
TdThreadMutex
mutex
;
SWalReadHandle
*
pWalHandle
;
// SyncIndex beginIndex; // valid begin index, default 0, may be set beginIndex > 0
}
SSyncLogStoreData
;
...
...
source/libs/sync/src/syncCommit.c
浏览文件 @
a0c043c4
...
...
@@ -102,6 +102,7 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
}
}
// maybe execute fsm
if
(
newCommitIndex
>
pSyncNode
->
commitIndex
)
{
SyncIndex
beginIndex
=
pSyncNode
->
commitIndex
+
1
;
SyncIndex
endIndex
=
newCommitIndex
;
...
...
source/libs/sync/src/syncIO.c
浏览文件 @
a0c043c4
...
...
@@ -30,7 +30,7 @@ static int32_t syncIODestroy(SSyncIO *io);
static
int32_t
syncIOStartInternal
(
SSyncIO
*
io
);
static
int32_t
syncIOStopInternal
(
SSyncIO
*
io
);
static
void
*
syncIOConsumerFunc
(
void
*
param
);
static
void
*
syncIOConsumerFunc
(
void
*
param
);
static
void
syncIOProcessRequest
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
void
syncIOProcessReply
(
void
*
pParent
,
SRpcMsg
*
pMsg
,
SEpSet
*
pEpSet
);
static
int32_t
syncIOAuth
(
void
*
parent
,
char
*
meterId
,
char
*
spi
,
char
*
encrypt
,
char
*
secret
,
char
*
ckey
);
...
...
@@ -242,9 +242,9 @@ static int32_t syncIOStopInternal(SSyncIO *io) {
}
static
void
*
syncIOConsumerFunc
(
void
*
param
)
{
SSyncIO
*
io
=
param
;
SSyncIO
*
io
=
param
;
STaosQall
*
qall
;
SRpcMsg
*
pRpcMsg
,
rpcMsg
;
SRpcMsg
*
pRpcMsg
,
rpcMsg
;
qall
=
taosAllocateQall
();
while
(
1
)
{
...
...
@@ -281,7 +281,7 @@ static void *syncIOConsumerFunc(void *param) {
if
(
io
->
FpOnSyncClientRequest
!=
NULL
)
{
SyncClientRequest
*
pSyncMsg
=
syncClientRequestFromRpcMsg2
(
pRpcMsg
);
ASSERT
(
pSyncMsg
!=
NULL
);
io
->
FpOnSyncClientRequest
(
io
->
pSyncNode
,
pSyncMsg
);
io
->
FpOnSyncClientRequest
(
io
->
pSyncNode
,
pSyncMsg
,
NULL
);
syncClientRequestDestroy
(
pSyncMsg
);
}
...
...
source/libs/sync/src/syncIndexMgr.c
浏览文件 @
a0c043c4
...
...
@@ -126,7 +126,7 @@ cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
char
*
syncIndexMgr2Str
(
SSyncIndexMgr
*
pSyncIndexMgr
)
{
cJSON
*
pJson
=
syncIndexMgr2Json
(
pSyncIndexMgr
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
a0c043c4
...
...
@@ -50,7 +50,7 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths);
// process message ----
int32_t
syncNodeOnPingCb
(
SSyncNode
*
ths
,
SyncPing
*
pMsg
);
int32_t
syncNodeOnPingReplyCb
(
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
);
int32_t
syncNodeOnClientRequestCb
(
SSyncNode
*
ths
,
SyncClientRequest
*
pMsg
);
int32_t
syncNodeOnClientRequestCb
(
SSyncNode
*
ths
,
SyncClientRequest
*
pMsg
,
SyncIndex
*
pRetIndex
);
// life cycle
static
void
syncFreeNode
(
void
*
param
);
...
...
@@ -627,7 +627,7 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
return
ret
;
}
int32_t
syncNodePropose
(
SSyncNode
*
pSyncNode
,
const
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
syncNodePropose
(
SSyncNode
*
pSyncNode
,
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
ret
=
0
;
char
eventLog
[
128
];
...
...
@@ -664,13 +664,34 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak)
SRpcMsg
rpcMsg
;
syncClientRequest2RpcMsg
(
pSyncMsg
,
&
rpcMsg
);
if
(
pSyncNode
->
FpEqMsg
!=
NULL
&&
(
*
pSyncNode
->
FpEqMsg
)(
pSyncNode
->
msgcb
,
&
rpcMsg
)
==
0
)
{
ret
=
0
;
// optimized one replica
if
(
syncNodeIsOptimizedOneReplica
(
pSyncNode
,
pMsg
))
{
SyncIndex
retIndex
;
int32_t
code
=
syncNodeOnClientRequestCb
(
pSyncNode
,
pSyncMsg
,
&
retIndex
);
if
(
code
==
0
)
{
pMsg
->
info
.
conn
.
applyIndex
=
retIndex
;
rpcFreeCont
(
rpcMsg
.
pCont
);
syncRespMgrDel
(
pSyncNode
->
pSyncRespMgr
,
seqNum
);
ret
=
1
;
sDebug
(
"vgId:%d optimized index:%ld success, msgtype:%s,%d"
,
pSyncNode
->
vgId
,
retIndex
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
}
else
{
ret
=
-
1
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
sError
(
"vgId:%d optimized index:%ld error, msgtype:%s,%d"
,
pSyncNode
->
vgId
,
retIndex
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
}
}
else
{
ret
=
-
1
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
sError
(
"syncPropose pSyncNode->FpEqMsg is NULL"
);
if
(
pSyncNode
->
FpEqMsg
!=
NULL
&&
(
*
pSyncNode
->
FpEqMsg
)(
pSyncNode
->
msgcb
,
&
rpcMsg
)
==
0
)
{
ret
=
0
;
}
else
{
ret
=
-
1
;
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
sError
(
"enqueue msg error, FpEqMsg is NULL"
);
}
}
syncClientRequestDestroy
(
pSyncMsg
);
goto
_END
;
...
...
@@ -2377,7 +2398,7 @@ int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
// /\ UNCHANGED <<messages, serverVars, candidateVars,
// leaderVars, commitIndex>>
//
int32_t
syncNodeOnClientRequestCb
(
SSyncNode
*
ths
,
SyncClientRequest
*
pMsg
)
{
int32_t
syncNodeOnClientRequestCb
(
SSyncNode
*
ths
,
SyncClientRequest
*
pMsg
,
SyncIndex
*
pRetIndex
)
{
int32_t
ret
=
0
;
syncClientRequestLog2
(
"==syncNodeOnClientRequestCb=="
,
pMsg
);
...
...
@@ -2436,6 +2457,14 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg) {
rpcFreeCont
(
rpcMsg
.
pCont
);
}
if
(
pRetIndex
!=
NULL
)
{
if
(
ret
==
0
&&
pEntry
!=
NULL
)
{
*
pRetIndex
=
pEntry
->
index
;
}
else
{
*
pRetIndex
=
SYNC_INDEX_INVALID
;
}
}
syncEntryDestory
(
pEntry
);
return
ret
;
}
...
...
@@ -2600,6 +2629,10 @@ static int32_t syncNodeProposeConfigChangeFinish(SSyncNode* ths, SyncReconfigFin
return
0
;
}
bool
syncNodeIsOptimizedOneReplica
(
SSyncNode
*
ths
,
SRpcMsg
*
pMsg
)
{
return
(
ths
->
replicaNum
==
1
&&
syncUtilUserCommit
(
pMsg
->
msgType
)
&&
ths
->
vgId
!=
1
);
}
int32_t
syncNodeCommit
(
SSyncNode
*
ths
,
SyncIndex
beginIndex
,
SyncIndex
endIndex
,
uint64_t
flag
)
{
int32_t
code
=
0
;
ESyncState
state
=
flag
;
...
...
@@ -2621,19 +2654,33 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
syncEntry2OriginalRpc
(
pEntry
,
&
rpcMsg
);
// user commit
if
(
ths
->
pFsm
->
FpCommitCb
!=
NULL
&&
syncUtilUserCommit
(
pEntry
->
originalRpcType
))
{
SFsmCbMeta
cbMeta
=
{
0
};
cbMeta
.
index
=
pEntry
->
index
;
cbMeta
.
lastConfigIndex
=
syncNodeGetSnapshotConfigIndex
(
ths
,
cbMeta
.
index
);
cbMeta
.
isWeak
=
pEntry
->
isWeak
;
cbMeta
.
code
=
0
;
cbMeta
.
state
=
ths
->
state
;
cbMeta
.
seqNum
=
pEntry
->
seqNum
;
cbMeta
.
term
=
pEntry
->
term
;
cbMeta
.
currentTerm
=
ths
->
pRaftStore
->
currentTerm
;
cbMeta
.
flag
=
flag
;
ths
->
pFsm
->
FpCommitCb
(
ths
->
pFsm
,
&
rpcMsg
,
cbMeta
);
if
((
ths
->
pFsm
->
FpCommitCb
!=
NULL
)
&&
syncUtilUserCommit
(
pEntry
->
originalRpcType
))
{
bool
internalExecute
=
true
;
if
((
ths
->
replicaNum
==
1
)
&&
ths
->
restoreFinish
&&
(
ths
->
vgId
!=
1
))
{
internalExecute
=
false
;
}
do
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"index:%ld, internalExecute:%d"
,
i
,
internalExecute
);
syncNodeEventLog
(
ths
,
logBuf
);
}
while
(
0
);
// execute fsm in apply thread, or execute outside syncPropose
if
(
internalExecute
)
{
SFsmCbMeta
cbMeta
=
{
0
};
cbMeta
.
index
=
pEntry
->
index
;
cbMeta
.
lastConfigIndex
=
syncNodeGetSnapshotConfigIndex
(
ths
,
cbMeta
.
index
);
cbMeta
.
isWeak
=
pEntry
->
isWeak
;
cbMeta
.
code
=
0
;
cbMeta
.
state
=
ths
->
state
;
cbMeta
.
seqNum
=
pEntry
->
seqNum
;
cbMeta
.
term
=
pEntry
->
term
;
cbMeta
.
currentTerm
=
ths
->
pRaftStore
->
currentTerm
;
cbMeta
.
flag
=
flag
;
ths
->
pFsm
->
FpCommitCb
(
ths
->
pFsm
,
&
rpcMsg
,
cbMeta
);
}
}
// config change
...
...
source/libs/sync/src/syncRaftCfg.c
浏览文件 @
a0c043c4
...
...
@@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
char
*
syncCfg2Str
(
SSyncCfg
*
pSyncCfg
)
{
cJSON
*
pJson
=
syncCfg2Json
(
pSyncCfg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
...
...
@@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
char
*
syncCfg2SimpleStr
(
SSyncCfg
*
pSyncCfg
)
{
if
(
pSyncCfg
!=
NULL
)
{
int32_t
len
=
512
;
char
*
s
=
taosMemoryMalloc
(
len
);
char
*
s
=
taosMemoryMalloc
(
len
);
memset
(
s
,
0
,
len
);
snprintf
(
s
,
len
,
"{replica-num:%d, my-index:%d, "
,
pSyncCfg
->
replicaNum
,
pSyncCfg
->
myIndex
);
...
...
@@ -205,7 +205,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char
*
raftCfg2Str
(
SRaftCfg
*
pRaftCfg
)
{
cJSON
*
pJson
=
raftCfg2Json
(
pRaftCfg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
...
...
@@ -271,7 +271,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
(
pRaftCfg
->
configIndexArr
)[
i
]
=
atoll
(
pIndex
->
valuestring
);
}
cJSON
*
pJsonSyncCfg
=
cJSON_GetObjectItem
(
pJson
,
"SSyncCfg"
);
cJSON
*
pJsonSyncCfg
=
cJSON_GetObjectItem
(
pJson
,
"SSyncCfg"
);
int32_t
code
=
syncCfgFromJson
(
pJsonSyncCfg
,
&
(
pRaftCfg
->
cfg
));
ASSERT
(
code
==
0
);
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
a0c043c4
...
...
@@ -257,6 +257,8 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
return
-
1
;
}
taosThreadMutexLock
(
&
(
pData
->
mutex
));
code
=
walReadWithHandle
(
pWalHandle
,
index
);
if
(
code
!=
0
)
{
int32_t
err
=
terrno
;
...
...
@@ -281,6 +283,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
terrno = saveErr;
*/
taosThreadMutexUnlock
(
&
(
pData
->
mutex
));
return
code
;
}
...
...
@@ -301,6 +304,7 @@ static int32_t raftLogGetEntry(struct SSyncLogStore* pLogStore, SyncIndex index,
terrno = saveErr;
*/
taosThreadMutexUnlock
(
&
(
pData
->
mutex
));
return
code
;
}
...
...
@@ -364,6 +368,7 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
pData
->
pWal
=
pSyncNode
->
pWal
;
ASSERT
(
pData
->
pWal
!=
NULL
);
taosThreadMutexInit
(
&
(
pData
->
mutex
),
NULL
);
pData
->
pWalHandle
=
walOpenReadHandle
(
pData
->
pWal
);
ASSERT
(
pData
->
pWalHandle
!=
NULL
);
...
...
@@ -408,9 +413,14 @@ SSyncLogStore* logStoreCreate(SSyncNode* pSyncNode) {
void
logStoreDestory
(
SSyncLogStore
*
pLogStore
)
{
if
(
pLogStore
!=
NULL
)
{
SSyncLogStoreData
*
pData
=
pLogStore
->
data
;
taosThreadMutexLock
(
&
(
pData
->
mutex
));
if
(
pData
->
pWalHandle
!=
NULL
)
{
walCloseReadHandle
(
pData
->
pWalHandle
);
pData
->
pWalHandle
=
NULL
;
}
taosThreadMutexUnlock
(
&
(
pData
->
mutex
));
taosThreadMutexDestroy
(
&
(
pData
->
mutex
));
taosMemoryFree
(
pLogStore
->
data
);
taosMemoryFree
(
pLogStore
);
...
...
@@ -460,6 +470,8 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
SWal
*
pWal
=
pData
->
pWal
;
if
(
index
>=
SYNC_INDEX_BEGIN
&&
index
<=
logStoreLastIndex
(
pLogStore
))
{
taosThreadMutexLock
(
&
(
pData
->
mutex
));
// SWalReadHandle* pWalHandle = walOpenReadHandle(pWal);
SWalReadHandle
*
pWalHandle
=
pData
->
pWalHandle
;
ASSERT
(
pWalHandle
!=
NULL
);
...
...
@@ -503,6 +515,7 @@ SSyncRaftEntry* logStoreGetEntry(SSyncLogStore* pLogStore, SyncIndex index) {
terrno = saveErr;
*/
taosThreadMutexUnlock
(
&
(
pData
->
mutex
));
return
pEntry
;
}
else
{
...
...
source/libs/sync/src/syncRaftStore.c
浏览文件 @
a0c043c4
...
...
@@ -216,7 +216,7 @@ cJSON *raftStore2Json(SRaftStore *pRaftStore) {
char
*
raftStore2Str
(
SRaftStore
*
pRaftStore
)
{
cJSON
*
pJson
=
raftStore2Json
(
pRaftStore
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
a0c043c4
...
...
@@ -314,14 +314,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char
*
snapshotSender2Str
(
SSyncSnapshotSender
*
pSender
)
{
cJSON
*
pJson
=
snapshotSender2Json
(
pSender
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
char
*
snapshotSender2SimpleStr
(
SSyncSnapshotSender
*
pSender
,
char
*
event
)
{
int32_t
len
=
256
;
char
*
s
=
taosMemoryMalloc
(
len
);
char
*
s
=
taosMemoryMalloc
(
len
);
SRaftId
destId
=
pSender
->
pSyncNode
->
replicasId
[
pSender
->
replicaIndex
];
char
host
[
128
];
...
...
@@ -461,7 +461,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject
(
pFromId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pReceiver
->
fromId
.
addr
;
cJSON
*
pTmp
=
pFromId
;
cJSON
*
pTmp
=
pFromId
;
char
host
[
128
]
=
{
0
};
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
...
...
@@ -494,14 +494,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char
*
snapshotReceiver2Str
(
SSyncSnapshotReceiver
*
pReceiver
)
{
cJSON
*
pJson
=
snapshotReceiver2Json
(
pReceiver
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
char
*
snapshotReceiver2SimpleStr
(
SSyncSnapshotReceiver
*
pReceiver
,
char
*
event
)
{
int32_t
len
=
256
;
char
*
s
=
taosMemoryMalloc
(
len
);
char
*
s
=
taosMemoryMalloc
(
len
);
SRaftId
fromId
=
pReceiver
->
fromId
;
char
host
[
128
];
...
...
source/libs/sync/src/syncVoteMgr.c
浏览文件 @
a0c043c4
...
...
@@ -127,7 +127,7 @@ cJSON *voteGranted2Json(SVotesGranted *pVotesGranted) {
char
*
voteGranted2Str
(
SVotesGranted
*
pVotesGranted
)
{
cJSON
*
pJson
=
voteGranted2Json
(
pVotesGranted
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
...
...
@@ -256,7 +256,7 @@ cJSON *votesRespond2Json(SVotesRespond *pVotesRespond) {
char
*
votesRespond2Str
(
SVotesRespond
*
pVotesRespond
)
{
cJSON
*
pJson
=
votesRespond2Json
(
pVotesRespond
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
...
...
source/libs/sync/test/syncRaftLogTest2.cpp
浏览文件 @
a0c043c4
...
...
@@ -113,7 +113,7 @@ void test2() {
pLogStore
=
logStoreCreate
(
pSyncNode
);
assert
(
pLogStore
);
pSyncNode
->
pLogStore
=
pLogStore
;
//pLogStore->syncLogSetBeginIndex(pLogStore, 5);
//
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore
->
syncLogRestoreFromSnapshot
(
pLogStore
,
4
);
logStoreLog2
((
char
*
)
"
\n\n\n
test2 ----- "
,
pLogStore
);
...
...
@@ -229,7 +229,7 @@ void test4() {
assert
(
pLogStore
);
pSyncNode
->
pLogStore
=
pLogStore
;
logStoreLog2
((
char
*
)
"
\n\n\n
test4 ----- "
,
pLogStore
);
//pLogStore->syncLogSetBeginIndex(pLogStore, 5);
//
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore
->
syncLogRestoreFromSnapshot
(
pLogStore
,
4
);
for
(
int
i
=
5
;
i
<=
9
;
++
i
)
{
...
...
@@ -291,7 +291,7 @@ void test5() {
assert
(
pLogStore
);
pSyncNode
->
pLogStore
=
pLogStore
;
logStoreLog2
((
char
*
)
"
\n\n\n
test5 ----- "
,
pLogStore
);
//pLogStore->syncLogSetBeginIndex(pLogStore, 5);
//
pLogStore->syncLogSetBeginIndex(pLogStore, 5);
pLogStore
->
syncLogRestoreFromSnapshot
(
pLogStore
,
4
);
for
(
int
i
=
5
;
i
<=
9
;
++
i
)
{
...
...
@@ -412,26 +412,23 @@ void test6() {
do
{
SyncIndex
firstVer
=
walGetFirstVer
(
pWal
);
SyncIndex
lastVer
=
walGetLastVer
(
pWal
);
bool
isEmpty
=
walIsEmpty
(
pWal
);
bool
isEmpty
=
walIsEmpty
(
pWal
);
printf
(
"before -------- firstVer:%ld lastVer:%ld isEmpty:%d
\n
"
,
firstVer
,
lastVer
,
isEmpty
);
}
while
(
0
);
logStoreDestory
(
pLogStore
);
cleanup
();
// restart
init
();
pLogStore
=
logStoreCreate
(
pSyncNode
);
assert
(
pLogStore
);
pSyncNode
->
pLogStore
=
pLogStore
;
do
{
SyncIndex
firstVer
=
walGetFirstVer
(
pWal
);
SyncIndex
lastVer
=
walGetLastVer
(
pWal
);
bool
isEmpty
=
walIsEmpty
(
pWal
);
bool
isEmpty
=
walIsEmpty
(
pWal
);
printf
(
"after -------- firstVer:%ld lastVer:%ld isEmpty:%d
\n
"
,
firstVer
,
lastVer
,
isEmpty
);
}
while
(
0
);
...
...
@@ -461,13 +458,13 @@ int main(int argc, char** argv) {
}
sTrace
(
"gAssert : %d"
,
gAssert
);
/*
test1();
test2();
test3();
test4();
test5();
*/
/*
test1();
test2();
test3();
test4();
test5();
*/
test6
();
return
0
;
...
...
source/libs/sync/test/syncRaftLogTest3.cpp
浏览文件 @
a0c043c4
...
...
@@ -312,7 +312,7 @@ void test5() {
pSyncNode
->
pLogStore
=
pLogStore
;
logStoreLog2
((
char
*
)
"
\n\n\n
test5 ----- "
,
pLogStore
);
//pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, 6);
//
pSyncNode->pLogStore->syncLogSetBeginIndex(pSyncNode->pLogStore, 6);
pLogStore
->
syncLogRestoreFromSnapshot
(
pSyncNode
->
pLogStore
,
5
);
for
(
int
i
=
6
;
i
<=
10
;
++
i
)
{
int32_t
dataLen
=
10
;
...
...
tests/system-test/7-tmq/tmqCheckData1.py
0 → 100644
浏览文件 @
a0c043c4
import
taos
import
sys
import
time
import
socket
import
os
import
threading
from
util.log
import
*
from
util.sql
import
*
from
util.cases
import
*
from
util.dnodes
import
*
from
util.common
import
*
sys
.
path
.
append
(
"./7-tmq"
)
from
tmqCommon
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
f
"start to excute
{
__file__
}
"
)
tdSql
.
init
(
conn
.
cursor
())
#tdSql.init(conn.cursor(), logSql) # output sql.txt file
def
checkFileContent
(
self
,
consumerId
,
queryString
):
buildPath
=
tdCom
.
getBuildPath
()
cfgPath
=
tdCom
.
getClientCfgPath
()
dstFile
=
'%s/../log/dstrows_%d.txt'
%
(
cfgPath
,
consumerId
)
cmdStr
=
'%s/build/bin/taos -c %s -s "%s >> %s"'
%
(
buildPath
,
cfgPath
,
queryString
,
dstFile
)
tdLog
.
info
(
cmdStr
)
os
.
system
(
cmdStr
)
consumeRowsFile
=
'%s/../log/consumerid_%d.txt'
%
(
cfgPath
,
consumerId
)
tdLog
.
info
(
"rows file: %s, %s"
%
(
consumeRowsFile
,
dstFile
))
consumeFile
=
open
(
consumeRowsFile
,
mode
=
'r'
)
queryFile
=
open
(
dstFile
,
mode
=
'r'
)
# skip first line for it is schema
queryFile
.
readline
()
while
True
:
dst
=
queryFile
.
readline
()
src
=
consumeFile
.
readline
()
if
dst
:
if
dst
!=
src
:
tdLog
.
exit
(
"consumerId %d consume rows is not match the rows by direct query"
%
consumerId
)
else
:
break
return
def
tmqCase1
(
self
):
tdLog
.
printNoPrefix
(
"======== test case 1: "
)
paraDict
=
{
'dbName'
:
'db1'
,
'dropFlag'
:
1
,
'event'
:
''
,
'vgroups'
:
4
,
'stbName'
:
'stb'
,
'colPrefix'
:
'c'
,
'tagPrefix'
:
't'
,
'colSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'tagSchema'
:
[{
'type'
:
'INT'
,
'count'
:
1
},
{
'type'
:
'binary'
,
'len'
:
20
,
'count'
:
1
}],
'ctbPrefix'
:
'ctb'
,
'ctbNum'
:
1
,
'rowsPerTbl'
:
10000
,
'batchNum'
:
10
,
'startTs'
:
1640966400000
,
# 2022-01-01 00:00:00.000
'pollDelay'
:
10
,
'showMsg'
:
1
,
'showRow'
:
1
}
topicNameList
=
[
'topic1'
,
'topic2'
,
'topic3'
]
expectRowsList
=
[]
tmqCom
.
initConsumerTable
()
tdCom
.
create_database
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"dropFlag"
],
vgroups
=
4
,
replica
=
1
)
tdLog
.
info
(
"create stb"
)
tdCom
.
create_stable
(
tdSql
,
dbname
=
paraDict
[
"dbName"
],
stbname
=
paraDict
[
"stbName"
],
column_elm_list
=
paraDict
[
'colSchema'
],
tag_elm_list
=
paraDict
[
'tagSchema'
])
tdLog
.
info
(
"create ctb"
)
tdCom
.
create_ctable
(
tdSql
,
dbname
=
paraDict
[
"dbName"
],
stbname
=
paraDict
[
"stbName"
],
tag_elm_list
=
paraDict
[
'tagSchema'
],
count
=
paraDict
[
"ctbNum"
],
default_ctbname_prefix
=
paraDict
[
'ctbPrefix'
])
tdLog
.
info
(
"insert data"
)
tmqCom
.
insert_data
(
tdSql
,
paraDict
[
"dbName"
],
paraDict
[
"ctbPrefix"
],
paraDict
[
"ctbNum"
],
paraDict
[
"rowsPerTbl"
],
paraDict
[
"batchNum"
],
paraDict
[
"startTs"
])
tdLog
.
info
(
"create topics from stb with filter"
)
queryString
=
"select ts,c1,c2 from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as stable %s.%s"
%
(
topicNameList
[
0
],
paraDict
[
"dbName"
],
paraDict
[
"stbName"
])
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
tdSql
.
query
(
queryString
)
expectRowsList
.
append
(
tdSql
.
getRows
())
# init consume info, and start tmq_sim, then check consume result
tdLog
.
info
(
"insert consume info to consume processor"
)
consumerId
=
0
expectrowcnt
=
paraDict
[
"rowsPerTbl"
]
*
paraDict
[
"ctbNum"
]
topicList
=
topicNameList
[
0
]
ifcheckdata
=
1
ifManualCommit
=
1
keyList
=
'group.id:cgrp1, enable.auto.commit:false, auto.commit.interval.ms:6000, auto.offset.reset:earliest'
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
paraDict
[
'pollDelay'
],
paraDict
[
"dbName"
],
paraDict
[
'showMsg'
],
paraDict
[
'showRow'
])
tdLog
.
info
(
"wait the consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
if
expectRowsList
[
0
]
!=
resultList
[
0
]:
tdLog
.
info
(
"expect consume rows: %d, act consume rows: %d"
%
(
expectRowsList
[
0
],
resultList
[
0
]))
tdLog
.
exit
(
"0 tmq consume rows error!"
)
self
.
checkFileContent
(
consumerId
,
queryString
)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom
.
initConsumerTable
()
queryString
=
"select ts, c1, c2 from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as database %s"
%
(
topicNameList
[
1
],
paraDict
[
'dbName'
])
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
tdSql
.
query
(
queryString
)
expectRowsList
.
append
(
tdSql
.
getRows
())
consumerId
=
1
topicList
=
topicNameList
[
1
]
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
paraDict
[
'pollDelay'
],
paraDict
[
"dbName"
],
paraDict
[
'showMsg'
],
paraDict
[
'showRow'
])
tdLog
.
info
(
"wait the consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
if
expectRowsList
[
1
]
!=
resultList
[
0
]:
tdLog
.
info
(
"expect consume rows: %d, act consume rows: %d"
%
(
expectRowsList
[
1
],
resultList
[
0
]))
tdLog
.
exit
(
"1 tmq consume rows error!"
)
self
.
checkFileContent
(
consumerId
,
queryString
)
# reinit consume info, and start tmq_sim, then check consume result
tmqCom
.
initConsumerTable
()
queryString
=
"select * from %s.%s"
%
(
paraDict
[
'dbName'
],
paraDict
[
'stbName'
])
sqlString
=
"create topic %s as %s"
%
(
topicNameList
[
2
],
queryString
)
tdLog
.
info
(
"create topic sql: %s"
%
sqlString
)
tdSql
.
execute
(
sqlString
)
tdSql
.
query
(
queryString
)
expectRowsList
.
append
(
tdSql
.
getRows
())
consumerId
=
2
topicList
=
topicNameList
[
2
]
tmqCom
.
insertConsumerInfo
(
consumerId
,
expectrowcnt
,
topicList
,
keyList
,
ifcheckdata
,
ifManualCommit
)
tdLog
.
info
(
"start consume processor"
)
tmqCom
.
startTmqSimProcess
(
paraDict
[
'pollDelay'
],
paraDict
[
"dbName"
],
paraDict
[
'showMsg'
],
paraDict
[
'showRow'
])
tdLog
.
info
(
"wait the consume result"
)
expectRows
=
1
resultList
=
tmqCom
.
selectConsumeResult
(
expectRows
)
if
expectRowsList
[
2
]
!=
resultList
[
0
]:
tdLog
.
info
(
"expect consume rows: %d, act consume rows: %d"
%
(
expectRowsList
[
2
],
resultList
[
0
]))
tdLog
.
exit
(
"2 tmq consume rows error!"
)
self
.
checkFileContent
(
consumerId
,
queryString
)
time
.
sleep
(
10
)
for
i
in
range
(
len
(
topicNameList
)):
tdSql
.
query
(
"drop topic %s"
%
topicNameList
[
i
])
tdLog
.
printNoPrefix
(
"======== test case 1 end ...... "
)
def
run
(
self
):
tdSql
.
prepare
()
self
.
tmqCase1
()
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
f
"
{
__file__
}
successfully executed"
)
event
=
threading
.
Event
()
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tests/system-test/fulltest.sh
浏览文件 @
a0c043c4
...
...
@@ -141,6 +141,7 @@ python3 ./test.py -f 7-tmq/tmqError.py
python3 ./test.py
-f
7-tmq/schema.py
python3 ./test.py
-f
7-tmq/stbFilter.py
python3 ./test.py
-f
7-tmq/tmqCheckData.py
python3 ./test.py
-f
7-tmq/tmqCheckData1.py
python3 ./test.py
-f
7-tmq/tmqUdf.py
#python3 ./test.py -f 7-tmq/tmq3mnodeSwitch.py -N 5
python3 ./test.py
-f
7-tmq/tmqConsumerGroup.py
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录