Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
1b907e84
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
1b907e84
编写于
6月 17, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
other: merge 3.0
上级
af7f3e32
5837c141
变更
26
隐藏空白更改
内联
并排
Showing
26 changed file
with
550 addition
and
241 deletion
+550
-241
include/libs/sync/sync.h
include/libs/sync/sync.h
+2
-1
include/os/osSysinfo.h
include/os/osSysinfo.h
+1
-0
source/client/src/clientHb.c
source/client/src/clientHb.c
+3
-1
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+27
-17
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+6
-6
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+1
-0
source/dnode/vnode/src/sma/smaRollup.c
source/dnode/vnode/src/sma/smaRollup.c
+11
-3
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+4
-4
source/libs/catalog/src/ctgCache.c
source/libs/catalog/src/ctgCache.c
+3
-1
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+2
-1
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+89
-59
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+3
-0
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+4
-3
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+9
-8
source/libs/sync/src/syncCommit.c
source/libs/sync/src/syncCommit.c
+3
-2
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+128
-48
source/libs/sync/src/syncRaftLog.c
source/libs/sync/src/syncRaftLog.c
+11
-4
source/libs/sync/src/syncRespMgr.c
source/libs/sync/src/syncRespMgr.c
+10
-6
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+99
-67
source/libs/sync/test/syncTestTool.cpp
source/libs/sync/test/syncTestTool.cpp
+5
-3
source/libs/tdb/src/db/tdbPage.c
source/libs/tdb/src/db/tdbPage.c
+5
-2
source/os/src/osSysinfo.c
source/os/src/osSysinfo.c
+16
-0
source/util/src/tcache.c
source/util/src/tcache.c
+5
-1
tests/script/tsim/stream/distributeInterval0.sim
tests/script/tsim/stream/distributeInterval0.sim
+36
-1
tests/script/tsim/stream/partitionby.sim
tests/script/tsim/stream/partitionby.sim
+59
-3
tests/system-test/test.py
tests/system-test/test.py
+8
-0
未找到文件。
include/libs/sync/sync.h
浏览文件 @
1b907e84
...
...
@@ -24,7 +24,7 @@ extern "C" {
#include "tdef.h"
#include "tmsgcb.h"
#define SYNC_INDEX_BEGIN
0
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
typedef
uint64_t
SyncNodeId
;
...
...
@@ -182,6 +182,7 @@ void syncStart(int64_t rid);
void
syncStop
(
int64_t
rid
);
int32_t
syncSetStandby
(
int64_t
rid
);
ESyncState
syncGetMyRole
(
int64_t
rid
);
bool
syncIsReady
(
int64_t
rid
);
const
char
*
syncGetMyRoleStr
(
int64_t
rid
);
SyncTerm
syncGetMyTerm
(
int64_t
rid
);
void
syncGetEpSet
(
int64_t
rid
,
SEpSet
*
pEpSet
);
...
...
include/os/osSysinfo.h
浏览文件 @
1b907e84
...
...
@@ -68,6 +68,7 @@ typedef struct {
}
SysNameInfo
;
SysNameInfo
taosGetSysNameInfo
();
bool
taosCheckCurrentInDll
();
#ifdef __cplusplus
}
...
...
source/client/src/clientHb.c
浏览文件 @
1b907e84
...
...
@@ -592,7 +592,9 @@ void hbThreadFuncUnexpectedStopped(void) {
static
void
*
hbThreadFunc
(
void
*
param
)
{
setThreadName
(
"hb"
);
#ifdef WINDOWS
atexit
(
hbThreadFuncUnexpectedStopped
);
if
(
taosCheckCurrentInDll
())
{
atexit
(
hbThreadFuncUnexpectedStopped
);
}
#endif
while
(
1
)
{
int8_t
threadStop
=
atomic_val_compare_exchange_8
(
&
clientHbMgr
.
threadStop
,
1
,
2
);
...
...
source/common/src/tdatablock.c
浏览文件 @
1b907e84
...
...
@@ -1584,6 +1584,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
int32_t
rowSize
=
pDataBlock
->
info
.
rowSize
;
int64_t
groupId
=
pDataBlock
->
info
.
groupId
;
if
(
colNum
<=
1
)
{
// invalid if only with TS col
continue
;
}
if
(
rb
.
nCols
!=
colNum
)
{
tdSRowSetTpInfo
(
&
rb
,
colNum
,
pTSchema
->
flen
);
}
...
...
@@ -1680,23 +1685,28 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
msgLen
+=
pSubmitBlk
->
dataLen
;
}
(
*
pReq
)
->
length
=
msgLen
;
(
*
pReq
)
->
header
.
vgId
=
htonl
(
vgId
);
(
*
pReq
)
->
header
.
contLen
=
htonl
(
msgLen
);
(
*
pReq
)
->
length
=
(
*
pReq
)
->
header
.
contLen
;
(
*
pReq
)
->
numOfBlocks
=
htonl
(
numOfBlks
);
SSubmitBlk
*
blk
=
(
SSubmitBlk
*
)((
*
pReq
)
+
1
);
while
(
numOfBlks
--
)
{
int32_t
dataLen
=
blk
->
dataLen
;
blk
->
uid
=
htobe64
(
blk
->
uid
);
blk
->
suid
=
htobe64
(
blk
->
suid
);
blk
->
padding
=
htonl
(
blk
->
padding
);
blk
->
sversion
=
htonl
(
blk
->
sversion
);
blk
->
dataLen
=
htonl
(
blk
->
dataLen
);
blk
->
schemaLen
=
htonl
(
blk
->
schemaLen
);
blk
->
numOfRows
=
htons
(
blk
->
numOfRows
);
blk
=
(
SSubmitBlk
*
)(
blk
->
data
+
dataLen
);
if
(
numOfBlks
>
0
)
{
(
*
pReq
)
->
length
=
msgLen
;
(
*
pReq
)
->
header
.
vgId
=
htonl
(
vgId
);
(
*
pReq
)
->
header
.
contLen
=
htonl
(
msgLen
);
(
*
pReq
)
->
length
=
(
*
pReq
)
->
header
.
contLen
;
(
*
pReq
)
->
numOfBlocks
=
htonl
(
numOfBlks
);
SSubmitBlk
*
blk
=
(
SSubmitBlk
*
)((
*
pReq
)
+
1
);
while
(
numOfBlks
--
)
{
int32_t
dataLen
=
blk
->
dataLen
;
blk
->
uid
=
htobe64
(
blk
->
uid
);
blk
->
suid
=
htobe64
(
blk
->
suid
);
blk
->
padding
=
htonl
(
blk
->
padding
);
blk
->
sversion
=
htonl
(
blk
->
sversion
);
blk
->
dataLen
=
htonl
(
blk
->
dataLen
);
blk
->
schemaLen
=
htonl
(
blk
->
schemaLen
);
blk
->
numOfRows
=
htons
(
blk
->
numOfRows
);
blk
=
(
SSubmitBlk
*
)(
blk
->
data
+
dataLen
);
}
}
else
{
// no valid rows
taosMemoryFreeClear
(
*
pReq
);
}
return
TSDB_CODE_SUCCESS
;
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
1b907e84
...
...
@@ -221,17 +221,17 @@ void mndCleanupSync(SMnode *pMnode) {
int32_t
mndSyncPropose
(
SMnode
*
pMnode
,
SSdbRaw
*
pRaw
,
int32_t
transId
)
{
SSyncMgmt
*
pMgmt
=
&
pMnode
->
syncMgmt
;
SRpcMsg
r
sp
=
{.
cod
e
=
TDMT_MND_APPLY_MSG
,
.
contLen
=
sdbGetRawTotalSize
(
pRaw
)};
r
sp
.
pCont
=
rpcMallocCont
(
rsp
.
contLen
);
if
(
r
sp
.
pCont
==
NULL
)
return
-
1
;
memcpy
(
r
sp
.
pCont
,
pRaw
,
rsp
.
contLen
);
SRpcMsg
r
eq
=
{.
msgTyp
e
=
TDMT_MND_APPLY_MSG
,
.
contLen
=
sdbGetRawTotalSize
(
pRaw
)};
r
eq
.
pCont
=
rpcMallocCont
(
req
.
contLen
);
if
(
r
eq
.
pCont
==
NULL
)
return
-
1
;
memcpy
(
r
eq
.
pCont
,
pRaw
,
req
.
contLen
);
pMgmt
->
errCode
=
0
;
pMgmt
->
transId
=
transId
;
mTrace
(
"trans:%d, will be proposed"
,
pMgmt
->
transId
);
const
bool
isWeak
=
false
;
int32_t
code
=
syncPropose
(
pMgmt
->
sync
,
&
r
sp
,
isWeak
);
int32_t
code
=
syncPropose
(
pMgmt
->
sync
,
&
r
eq
,
isWeak
);
if
(
code
==
0
)
{
tsem_wait
(
&
pMgmt
->
syncSem
);
}
else
if
(
code
==
-
1
&&
terrno
==
TSDB_CODE_SYN_NOT_LEADER
)
{
...
...
@@ -242,7 +242,7 @@ int32_t mndSyncPropose(SMnode *pMnode, SSdbRaw *pRaw, int32_t transId) {
terrno
=
TSDB_CODE_APP_ERROR
;
}
rpcFreeCont
(
r
sp
.
pCont
);
rpcFreeCont
(
r
eq
.
pCont
);
if
(
code
!=
0
)
{
mError
(
"trans:%d, failed to propose, code:0x%x"
,
pMgmt
->
transId
,
code
);
return
code
;
...
...
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
1b907e84
...
...
@@ -260,6 +260,7 @@ struct SSma {
#define SMA_CFG(s) (&(s)->pVnode->config)
#define SMA_TSDB_CFG(s) (&(s)->pVnode->config.tsdbCfg)
#define SMA_RETENTION(s) ((SRetention *)&(s)->pVnode->config.tsdbCfg.retentions)
#define SMA_LOCKED(s) ((s)->locked)
#define SMA_META(s) ((s)->pVnode->pMeta)
#define SMA_VID(s) TD_VID((s)->pVnode)
...
...
source/dnode/vnode/src/sma/smaRollup.c
浏览文件 @
1b907e84
...
...
@@ -400,22 +400,24 @@ static FORCE_INLINE int32_t tdExecuteRSmaImpl(SSma *pSma, const void *pMsg, int3
}
if
(
taosArrayGetSize
(
pResult
)
>
0
)
{
#if
1
#if
0
char flag[10] = {0};
snprintf(flag, 10, "level %" PRIi8, level);
blockDebugShowData(pResult, flag);
#endif
STsdb
*
sinkTsdb
=
(
level
==
TSDB_RETENTION_L1
?
pSma
->
pRSmaTsdb1
:
pSma
->
pRSmaTsdb2
);
SSubmitReq
*
pReq
=
NULL
;
if
(
buildSubmitReqFromDataBlock
(
&
pReq
,
pResult
,
pTSchema
,
SMA_VID
(
pSma
),
suid
)
!=
0
)
{
if
(
buildSubmitReqFromDataBlock
(
&
pReq
,
pResult
,
pTSchema
,
SMA_VID
(
pSma
),
suid
)
<
0
)
{
taosArrayDestroy
(
pResult
);
return
TSDB_CODE_FAILED
;
}
if
(
tdProcessSubmitReq
(
sinkTsdb
,
INT64_MAX
,
pReq
)
!=
0
)
{
if
(
pReq
&&
tdProcessSubmitReq
(
sinkTsdb
,
INT64_MAX
,
pReq
)
<
0
)
{
taosArrayDestroy
(
pResult
);
taosMemoryFreeClear
(
pReq
);
return
TSDB_CODE_FAILED
;
}
taosMemoryFreeClear
(
pReq
);
}
else
{
smaDebug
(
"vgId:%d, no rsma % "
PRIi8
" data generated since %s"
,
SMA_VID
(
pSma
),
level
,
tstrerror
(
terrno
));
...
...
@@ -469,6 +471,12 @@ int32_t tdProcessRSmaSubmit(SSma *pSma, void *pMsg, int32_t inputType) {
return
TSDB_CODE_SUCCESS
;
}
SRetention
*
pRetention
=
SMA_RETENTION
(
pSma
);
if
(
!
RETENTION_VALID
(
pRetention
+
1
))
{
// return directly if retention level 1 is invalid
return
TSDB_CODE_SUCCESS
;
}
if
(
inputType
==
STREAM_DATA_TYPE_SUBMIT_BLOCK
)
{
STbUidStore
uidStore
=
{
0
};
tdFetchSubmitReqSuids
(
pMsg
,
&
uidStore
);
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
1b907e84
...
...
@@ -466,10 +466,10 @@ static int32_t vnodeProcessDropStbReq(SVnode *pVnode, int64_t version, void *pRe
}
// process request
//
if (metaDropSTable(pVnode->pMeta, version, &req) < 0) {
//
rcode = terrno;
//
goto _exit;
//
}
if
(
metaDropSTable
(
pVnode
->
pMeta
,
version
,
&
req
)
<
0
)
{
rcode
=
terrno
;
goto
_exit
;
}
// return rsp
_exit:
...
...
source/libs/catalog/src/ctgCache.c
浏览文件 @
1b907e84
...
...
@@ -1938,7 +1938,9 @@ void ctgCleanupCacheQueue(void) {
void
*
ctgUpdateThreadFunc
(
void
*
param
)
{
setThreadName
(
"catalog"
);
#ifdef WINDOWS
atexit
(
ctgUpdateThreadUnexpectedStopped
);
if
(
taosCheckCurrentInDll
())
{
atexit
(
ctgUpdateThreadUnexpectedStopped
);
}
#endif
qInfo
(
"catalog update thread started"
);
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
1b907e84
...
...
@@ -2304,6 +2304,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
int32_t
rowSize
=
pInfo
->
pResBlock
->
info
.
rowSize
;
pInfo
->
bufPageSize
=
getProperSortPageSize
(
rowSize
);
// todo the total available buffer should be determined by total capacity of buffer of this task.
// the additional one is reserved for merge result
pInfo
->
sortBufSize
=
pInfo
->
bufPageSize
*
(
taosArrayGetSize
(
dataReaders
)
+
1
);
pInfo
->
hasGroupId
=
false
;
...
...
@@ -2311,7 +2312,7 @@ SOperatorInfo* createTableMergeScanOperatorInfo(STableScanPhysiNode* pTableScanN
pOperator
->
name
=
"TableMergeScanOperator"
;
// TODO : change it
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
;
pOperator
->
operatorType
=
QUERY_NODE_PHYSICAL_PLAN_TABLE_
MERGE_
SCAN
;
pOperator
->
blocking
=
false
;
pOperator
->
status
=
OP_NOT_OPENED
;
pOperator
->
info
=
pInfo
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
1b907e84
...
...
@@ -1257,6 +1257,10 @@ static int32_t getAllIntervalWindow(SHashObj* pHashMap, SArray* resWins) {
return
TSDB_CODE_SUCCESS
;
}
bool
isCloseWindow
(
STimeWindow
*
pWin
,
STimeWindowAggSupp
*
pSup
)
{
return
pWin
->
ekey
<
pSup
->
maxTs
-
pSup
->
waterMark
;
}
static
int32_t
closeIntervalWindow
(
SHashObj
*
pHashMap
,
STimeWindowAggSupp
*
pSup
,
SInterval
*
pInterval
,
SArray
*
closeWins
)
{
void
*
pIte
=
NULL
;
...
...
@@ -1269,7 +1273,7 @@ static int32_t closeIntervalWindow(SHashObj* pHashMap, STimeWindowAggSupp* pSup,
SResultRowInfo
dumyInfo
;
dumyInfo
.
cur
.
pageId
=
-
1
;
STimeWindow
win
=
getActiveTimeWindow
(
NULL
,
&
dumyInfo
,
ts
,
pInterval
,
pInterval
->
precision
,
NULL
);
if
(
win
.
ekey
<
pSup
->
maxTs
-
pSup
->
waterMark
)
{
if
(
isCloseWindow
(
&
win
,
pSup
)
)
{
char
keyBuf
[
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
TSKEY
))];
SET_RES_WINDOW_KEY
(
keyBuf
,
&
ts
,
sizeof
(
TSKEY
),
groupId
);
taosHashRemove
(
pHashMap
,
keyBuf
,
keyLen
);
...
...
@@ -2036,7 +2040,55 @@ _error:
return
NULL
;
}
static
void
doHashInterval
(
SOperatorInfo
*
pOperatorInfo
,
SSDataBlock
*
pSDataBlock
,
int32_t
tableGroupId
,
bool
isFinalInterval
(
SStreamFinalIntervalOperatorInfo
*
pInfo
)
{
return
pInfo
->
pChildren
!=
NULL
;
}
void
compactFunctions
(
SqlFunctionCtx
*
pDestCtx
,
SqlFunctionCtx
*
pSourceCtx
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
)
{
for
(
int32_t
k
=
0
;
k
<
numOfOutput
;
++
k
)
{
if
(
fmIsWindowPseudoColumnFunc
(
pDestCtx
[
k
].
functionId
))
{
continue
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
functionNeedToExecute
(
&
pDestCtx
[
k
])
&&
pDestCtx
[
k
].
fpSet
.
combine
!=
NULL
)
{
code
=
pDestCtx
[
k
].
fpSet
.
combine
(
&
pDestCtx
[
k
],
&
pSourceCtx
[
k
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"%s apply functions error, code: %s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
code
));
pTaskInfo
->
code
=
code
;
longjmp
(
pTaskInfo
->
env
,
code
);
}
}
}
}
static
void
rebuildIntervalWindow
(
SStreamFinalIntervalOperatorInfo
*
pInfo
,
SArray
*
pWinArray
,
int32_t
groupId
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
)
{
int32_t
size
=
taosArrayGetSize
(
pWinArray
);
ASSERT
(
pInfo
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
STimeWindow
*
pParentWin
=
taosArrayGet
(
pWinArray
,
i
);
SResultRow
*
pCurResult
=
NULL
;
setTimeWindowOutputBuf
(
&
pInfo
->
binfo
.
resultRowInfo
,
pParentWin
,
true
,
&
pCurResult
,
0
,
pInfo
->
binfo
.
pCtx
,
numOfOutput
,
pInfo
->
binfo
.
rowCellInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
int32_t
numOfChildren
=
taosArrayGetSize
(
pInfo
->
pChildren
);
for
(
int32_t
j
=
0
;
j
<
numOfChildren
;
j
++
)
{
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
j
);
SIntervalAggOperatorInfo
*
pChInfo
=
pChildOp
->
info
;
SResultRow
*
pChResult
=
NULL
;
setTimeWindowOutputBuf
(
&
pChInfo
->
binfo
.
resultRowInfo
,
pParentWin
,
true
,
&
pChResult
,
0
,
pChInfo
->
binfo
.
pCtx
,
pChildOp
->
numOfExprs
,
pChInfo
->
binfo
.
rowCellInfoOffset
,
&
pChInfo
->
aggSup
,
pTaskInfo
);
compactFunctions
(
pInfo
->
binfo
.
pCtx
,
pChInfo
->
binfo
.
pCtx
,
numOfOutput
,
pTaskInfo
);
}
}
}
bool
isDeletedWindow
(
STimeWindow
*
pWin
,
uint64_t
groupId
,
SAggSupporter
*
pSup
)
{
SET_RES_WINDOW_KEY
(
pSup
->
keyBuf
,
&
pWin
->
skey
,
sizeof
(
int64_t
),
groupId
);
SResultRowPosition
*
p1
=
(
SResultRowPosition
*
)
taosHashGet
(
pSup
->
pResultRowHashTable
,
pSup
->
keyBuf
,
GET_RES_WINDOW_KEY_LEN
(
sizeof
(
int64_t
)));
return
p1
==
NULL
;
}
static
void
doHashInterval
(
SOperatorInfo
*
pOperatorInfo
,
SSDataBlock
*
pSDataBlock
,
uint64_t
tableGroupId
,
SArray
*
pUpdated
)
{
SStreamFinalIntervalOperatorInfo
*
pInfo
=
(
SStreamFinalIntervalOperatorInfo
*
)
pOperatorInfo
->
info
;
SResultRowInfo
*
pResultRowInfo
=
&
(
pInfo
->
binfo
.
resultRowInfo
);
...
...
@@ -2060,6 +2112,14 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
STimeWindow
nextWin
=
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
interval
.
precision
,
NULL
);
while
(
1
)
{
if
(
isFinalInterval
(
pInfo
)
&&
isCloseWindow
(
&
nextWin
,
&
pInfo
->
twAggSup
)
&&
isDeletedWindow
(
&
nextWin
,
tableGroupId
,
&
pInfo
->
aggSup
))
{
SArray
*
pUpWins
=
taosArrayInit
(
8
,
sizeof
(
STimeWindow
));
taosArrayPush
(
pUpWins
,
&
nextWin
);
rebuildIntervalWindow
(
pInfo
,
pUpWins
,
pInfo
->
binfo
.
pRes
->
info
.
groupId
,
pOperatorInfo
->
numOfExprs
,
pOperatorInfo
->
pTaskInfo
);
taosArrayDestroy
(
pUpWins
);
}
int32_t
code
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
nextWin
,
true
,
&
pResult
,
tableGroupId
,
pInfo
->
binfo
.
pCtx
,
numOfOutput
,
pInfo
->
binfo
.
rowCellInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
...
...
@@ -2089,47 +2149,6 @@ static void doHashInterval(SOperatorInfo* pOperatorInfo, SSDataBlock* pSDataBloc
}
}
bool
isFinalInterval
(
SStreamFinalIntervalOperatorInfo
*
pInfo
)
{
return
pInfo
->
pChildren
!=
NULL
;
}
void
compactFunctions
(
SqlFunctionCtx
*
pDestCtx
,
SqlFunctionCtx
*
pSourceCtx
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
)
{
for
(
int32_t
k
=
0
;
k
<
numOfOutput
;
++
k
)
{
if
(
fmIsWindowPseudoColumnFunc
(
pDestCtx
[
k
].
functionId
))
{
continue
;
}
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
functionNeedToExecute
(
&
pDestCtx
[
k
])
&&
pDestCtx
[
k
].
fpSet
.
combine
!=
NULL
)
{
code
=
pDestCtx
[
k
].
fpSet
.
combine
(
&
pDestCtx
[
k
],
&
pSourceCtx
[
k
]);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
qError
(
"%s apply functions error, code: %s"
,
GET_TASKID
(
pTaskInfo
),
tstrerror
(
code
));
pTaskInfo
->
code
=
code
;
longjmp
(
pTaskInfo
->
env
,
code
);
}
}
}
}
static
void
rebuildIntervalWindow
(
SStreamFinalIntervalOperatorInfo
*
pInfo
,
SArray
*
pWinArray
,
int32_t
groupId
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
)
{
int32_t
size
=
taosArrayGetSize
(
pWinArray
);
ASSERT
(
pInfo
->
pChildren
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
STimeWindow
*
pParentWin
=
taosArrayGet
(
pWinArray
,
i
);
SResultRow
*
pCurResult
=
NULL
;
setTimeWindowOutputBuf
(
&
pInfo
->
binfo
.
resultRowInfo
,
pParentWin
,
true
,
&
pCurResult
,
0
,
pInfo
->
binfo
.
pCtx
,
numOfOutput
,
pInfo
->
binfo
.
rowCellInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
int32_t
numOfChildren
=
taosArrayGetSize
(
pInfo
->
pChildren
);
for
(
int32_t
j
=
0
;
j
<
numOfChildren
;
j
++
)
{
SOperatorInfo
*
pChildOp
=
taosArrayGetP
(
pInfo
->
pChildren
,
j
);
SIntervalAggOperatorInfo
*
pChInfo
=
pChildOp
->
info
;
SResultRow
*
pChResult
=
NULL
;
setTimeWindowOutputBuf
(
&
pChInfo
->
binfo
.
resultRowInfo
,
pParentWin
,
true
,
&
pChResult
,
0
,
pChInfo
->
binfo
.
pCtx
,
pChildOp
->
numOfExprs
,
pChInfo
->
binfo
.
rowCellInfoOffset
,
&
pChInfo
->
aggSup
,
pTaskInfo
);
compactFunctions
(
pInfo
->
binfo
.
pCtx
,
pChInfo
->
binfo
.
pCtx
,
numOfOutput
,
pTaskInfo
);
}
}
}
static
void
clearStreamIntervalOperator
(
SStreamFinalIntervalOperatorInfo
*
pInfo
)
{
taosHashClear
(
pInfo
->
aggSup
.
pResultRowHashTable
);
clearDiskbasedBuf
(
pInfo
->
aggSup
.
pResultBuf
);
...
...
@@ -2169,6 +2188,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
SStreamFinalIntervalOperatorInfo
*
pInfo
=
pOperator
->
info
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
SArray
*
pUpdated
=
taosArrayInit
(
4
,
POINTER_BYTES
);
TSKEY
maxTs
=
INT64_MIN
;
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
...
...
@@ -2222,6 +2242,7 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
}
setInputDataBlock
(
pOperator
,
pInfo
->
binfo
.
pCtx
,
pBlock
,
pInfo
->
order
,
MAIN_SCAN
,
true
);
doHashInterval
(
pOperator
,
pBlock
,
pBlock
->
info
.
groupId
,
pUpdated
);
if
(
isFinalInterval
(
pInfo
))
{
int32_t
chIndex
=
getChildIndex
(
pBlock
);
int32_t
size
=
taosArrayGetSize
(
pInfo
->
pChildren
);
...
...
@@ -2238,10 +2259,10 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) {
setInputDataBlock
(
pChildOp
,
pChInfo
->
binfo
.
pCtx
,
pBlock
,
pChInfo
->
order
,
MAIN_SCAN
,
true
);
doHashInterval
(
pChildOp
,
pBlock
,
pBlock
->
info
.
groupId
,
NULL
);
}
doHashInterval
(
pOperator
,
pBlock
,
pBlock
->
info
.
groupId
,
pUpdated
);
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
pBlock
->
info
.
window
.
ekey
);
maxTs
=
TMAX
(
maxTs
,
pBlock
->
info
.
window
.
ekey
);
}
pInfo
->
twAggSup
.
maxTs
=
TMAX
(
pInfo
->
twAggSup
.
maxTs
,
maxTs
);
if
(
isFinalInterval
(
pInfo
))
{
closeIntervalWindow
(
pInfo
->
aggSup
.
pResultRowHashTable
,
&
pInfo
->
twAggSup
,
&
pInfo
->
interval
,
pUpdated
);
}
...
...
@@ -2564,7 +2585,7 @@ int32_t updateSessionWindowInfo(SResultWindowInfo* pWinInfo, TSKEY* pTs, int32_t
}
static
int32_t
setWindowOutputBuf
(
SResultWindowInfo
*
pWinInfo
,
SResultRow
**
pResult
,
SqlFunctionCtx
*
pCtx
,
int32
_t
groupId
,
int32_t
numOfOutput
,
int32_t
*
rowCellInfoOffset
,
uint64
_t
groupId
,
int32_t
numOfOutput
,
int32_t
*
rowCellInfoOffset
,
SStreamAggSupporter
*
pAggSup
,
SExecTaskInfo
*
pTaskInfo
)
{
assert
(
pWinInfo
->
win
.
skey
<=
pWinInfo
->
win
.
ekey
);
// too many time window in query
...
...
@@ -2642,7 +2663,7 @@ int32_t getNumCompactWindow(SArray* pWinInfos, int32_t startIndex, int64_t gap)
return
size
-
startIndex
-
1
;
}
void
compactTimeWindow
(
SStreamSessionAggOperatorInfo
*
pInfo
,
int32_t
startIndex
,
int32_t
num
,
int32
_t
groupId
,
void
compactTimeWindow
(
SStreamSessionAggOperatorInfo
*
pInfo
,
int32_t
startIndex
,
int32_t
num
,
uint64
_t
groupId
,
int32_t
numOfOutput
,
SExecTaskInfo
*
pTaskInfo
,
SHashObj
*
pStUpdated
,
SHashObj
*
pStDeleted
)
{
SResultWindowInfo
*
pCurWin
=
taosArrayGet
(
pInfo
->
streamAggSup
.
pCurWins
,
startIndex
);
SResultRow
*
pCurResult
=
NULL
;
...
...
@@ -2667,13 +2688,18 @@ void compactTimeWindow(SStreamSessionAggOperatorInfo* pInfo, int32_t startIndex,
}
}
typedef
struct
SWinRes
{
TSKEY
ts
;
uint64_t
groupId
;
}
SWinRes
;
static
void
doStreamSessionAggImpl
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pSDataBlock
,
SHashObj
*
pStUpdated
,
SHashObj
*
pStDeleted
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SStreamSessionAggOperatorInfo
*
pInfo
=
pOperator
->
info
;
bool
masterScan
=
true
;
int32_t
numOfOutput
=
pOperator
->
numOfExprs
;
int64_t
groupId
=
pSDataBlock
->
info
.
groupId
;
uint64_t
groupId
=
pSDataBlock
->
info
.
groupId
;
int64_t
gap
=
pInfo
->
gap
;
int64_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -2693,7 +2719,7 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
SStreamAggSupporter
*
pAggSup
=
&
pInfo
->
streamAggSup
;
for
(
int32_t
i
=
0
;
i
<
pSDataBlock
->
info
.
rows
;)
{
int32_t
winIndex
=
0
;
SResultWindowInfo
*
pCurWin
=
getSessionTimeWindow
(
pAggSup
,
tsCols
[
i
],
pSDataBlock
->
info
.
groupId
,
gap
,
&
winIndex
);
SResultWindowInfo
*
pCurWin
=
getSessionTimeWindow
(
pAggSup
,
tsCols
[
i
],
groupId
,
gap
,
&
winIndex
);
winRows
=
updateSessionWindowInfo
(
pCurWin
,
tsCols
,
pSDataBlock
->
info
.
rows
,
i
,
pInfo
->
gap
,
pStDeleted
);
code
=
doOneWindowAgg
(
pInfo
,
pSDataBlock
,
pCurWin
,
&
pResult
,
i
,
winRows
,
numOfOutput
,
pTaskInfo
);
if
(
code
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
...
...
@@ -2709,7 +2735,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
}
pCurWin
->
isClosed
=
false
;
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
)
{
code
=
taosHashPut
(
pStUpdated
,
&
pCurWin
->
pos
,
sizeof
(
SResultRowPosition
),
&
(
pCurWin
->
win
.
skey
),
sizeof
(
TSKEY
));
SWinRes
value
=
{.
ts
=
pCurWin
->
win
.
skey
,
.
groupId
=
groupId
};
code
=
taosHashPut
(
pStUpdated
,
&
pCurWin
->
pos
,
sizeof
(
SResultRowPosition
),
&
value
,
sizeof
(
SWinRes
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -2736,7 +2763,7 @@ static void doClearSessionWindows(SStreamAggSupporter* pAggSup, SOptrBasicInfo*
}
}
static
int32_t
copyUpdateResult
(
SHashObj
*
pStUpdated
,
SArray
*
pUpdated
,
int32_t
groupId
)
{
static
int32_t
copyUpdateResult
(
SHashObj
*
pStUpdated
,
SArray
*
pUpdated
)
{
void
*
pData
=
NULL
;
size_t
keyLen
=
0
;
while
((
pData
=
taosHashIterate
(
pStUpdated
,
pData
))
!=
NULL
)
{
...
...
@@ -2746,9 +2773,9 @@ static int32_t copyUpdateResult(SHashObj* pStUpdated, SArray* pUpdated, int32_t
if
(
pos
==
NULL
)
{
return
TSDB_CODE_QRY_OUT_OF_MEMORY
;
}
pos
->
groupId
=
groupId
;
pos
->
groupId
=
((
SWinRes
*
)
pData
)
->
groupId
;
pos
->
pos
=
*
(
SResultRowPosition
*
)
key
;
*
(
int64_t
*
)
pos
->
key
=
*
(
uint64_t
*
)
pData
;
*
(
int64_t
*
)
pos
->
key
=
((
SWinRes
*
)
pData
)
->
ts
;
taosArrayPush
(
pUpdated
,
&
pos
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -2815,7 +2842,9 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
__get_win_info_
fn
)
{
// Todo(liuyao) save window to tdb
void
**
pIte
=
NULL
;
size_t
keyLen
=
0
;
while
((
pIte
=
taosHashIterate
(
pHashMap
,
pIte
))
!=
NULL
)
{
uint64_t
*
pGroupId
=
taosHashGetKey
(
pIte
,
&
keyLen
);
SArray
*
pWins
=
(
SArray
*
)
(
*
pIte
);
int32_t
size
=
taosArrayGetSize
(
pWins
);
for
(
int32_t
i
=
0
;
i
<
size
;
i
++
)
{
...
...
@@ -2825,7 +2854,7 @@ int32_t closeSessionWindow(SHashObj* pHashMap, STimeWindowAggSupp* pTwSup, SArra
if
(
!
pSeWin
->
isClosed
)
{
pSeWin
->
isClosed
=
true
;
if
(
pTwSup
->
calTrigger
==
STREAM_TRIGGER_WINDOW_CLOSE
)
{
int32_t
code
=
saveResult
(
pSeWin
->
win
.
skey
,
pSeWin
->
pos
.
pageId
,
pSeWin
->
pos
.
offset
,
0
,
pClosed
);
int32_t
code
=
saveResult
(
pSeWin
->
win
.
skey
,
pSeWin
->
pos
.
pageId
,
pSeWin
->
pos
.
offset
,
*
pGroupId
,
pClosed
);
pSeWin
->
isOutput
=
true
;
}
}
...
...
@@ -2892,7 +2921,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
SStreamSessionAggOperatorInfo
*
pChildInfo
=
pChildOp
->
info
;
doClearSessionWindows
(
&
pChildInfo
->
streamAggSup
,
&
pChildInfo
->
binfo
,
pBlock
,
0
,
pChildOp
->
numOfExprs
,
pChildInfo
->
gap
,
NULL
);
rebuildTimeWindow
(
pInfo
,
pWins
,
p
Info
->
binfo
.
pRes
->
info
.
groupId
,
pOperator
->
numOfExprs
,
pOperator
->
pTaskInfo
);
rebuildTimeWindow
(
pInfo
,
pWins
,
p
Block
->
info
.
groupId
,
pOperator
->
numOfExprs
,
pOperator
->
pTaskInfo
);
}
taosArrayDestroy
(
pWins
);
continue
;
...
...
@@ -2916,7 +2945,7 @@ static SSDataBlock* doStreamSessionAgg(SOperatorInfo* pOperator) {
closeSessionWindow
(
pInfo
->
streamAggSup
.
pResultRows
,
&
pInfo
->
twAggSup
,
pUpdated
,
getSessionWinInfo
);
copyUpdateResult
(
pStUpdated
,
pUpdated
,
pBInfo
->
pRes
->
info
.
groupId
);
copyUpdateResult
(
pStUpdated
,
pUpdated
);
taosHashCleanup
(
pStUpdated
);
finalizeUpdatedResult
(
pOperator
->
numOfExprs
,
pInfo
->
streamAggSup
.
pResultBuf
,
pUpdated
,
...
...
@@ -3216,8 +3245,9 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
}
pCurWin
->
winInfo
.
isClosed
=
false
;
if
(
pInfo
->
twAggSup
.
calTrigger
==
STREAM_TRIGGER_AT_ONCE
)
{
code
=
taosHashPut
(
pSeUpdated
,
&
pCurWin
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
),
&
(
pCurWin
->
winInfo
.
win
.
skey
),
sizeof
(
TSKEY
));
SWinRes
value
=
{.
ts
=
pCurWin
->
winInfo
.
win
.
skey
,
.
groupId
=
groupId
};
code
=
taosHashPut
(
pSeUpdated
,
&
pCurWin
->
winInfo
.
pos
,
sizeof
(
SResultRowPosition
),
&
value
,
sizeof
(
SWinRes
));
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
longjmp
(
pTaskInfo
->
env
,
TSDB_CODE_QRY_OUT_OF_MEMORY
);
}
...
...
@@ -3274,7 +3304,7 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
closeSessionWindow
(
pInfo
->
streamAggSup
.
pResultRows
,
&
pInfo
->
twAggSup
,
pUpdated
,
getStateWinInfo
);
copyUpdateResult
(
pSeUpdated
,
pUpdated
,
pBInfo
->
pRes
->
info
.
groupId
);
copyUpdateResult
(
pSeUpdated
,
pUpdated
);
taosHashCleanup
(
pSeUpdated
);
finalizeUpdatedResult
(
pOperator
->
numOfExprs
,
pInfo
->
streamAggSup
.
pResultBuf
,
pUpdated
,
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
1b907e84
...
...
@@ -195,6 +195,7 @@ int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, S
cJSON
*
syncNode2Json
(
const
SSyncNode
*
pSyncNode
);
char
*
syncNode2Str
(
const
SSyncNode
*
pSyncNode
);
char
*
syncNode2SimpleStr
(
const
SSyncNode
*
pSyncNode
);
bool
syncNodeInConfig
(
SSyncNode
*
pSyncNode
,
const
SSyncCfg
*
config
);
void
syncNodeUpdateConfig
(
SSyncNode
*
pSyncNode
,
SSyncCfg
*
newConfig
,
SyncIndex
lastConfigChangeIndex
,
bool
*
isDrop
);
SSyncNode
*
syncNodeAcquire
(
int64_t
rid
);
...
...
@@ -230,6 +231,8 @@ int32_t syncNodeGetPreIndexTerm(SSyncNode* pSyncNode, SyncIndex index, SyncInd
int32_t
syncNodeCommit
(
SSyncNode
*
ths
,
SyncIndex
beginIndex
,
SyncIndex
endIndex
,
uint64_t
flag
);
int32_t
syncNodeUpdateNewConfigIndex
(
SSyncNode
*
ths
,
SSyncCfg
*
pNewCfg
);
bool
syncNodeInRaftGroup
(
SSyncNode
*
ths
,
SRaftId
*
pRaftId
);
SSyncSnapshotSender
*
syncNodeGetSnapshotSender
(
SSyncNode
*
ths
,
SRaftId
*
pDestId
);
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
1b907e84
...
...
@@ -713,7 +713,8 @@ static int32_t syncNodeMakeLogSame(SSyncNode* ths, SyncAppendEntries* pMsg) {
// delete confict entries
code
=
ths
->
pLogStore
->
syncLogTruncate
(
ths
->
pLogStore
,
delBegin
);
ASSERT
(
code
==
0
);
sDebug
(
"vgId:%d sync event log truncate, from %ld to %ld"
,
ths
->
vgId
,
delBegin
,
delEnd
);
sDebug
(
"vgId:%d sync event currentTerm:%lu log truncate, from %ld to %ld"
,
ths
->
vgId
,
ths
->
pRaftStore
->
currentTerm
,
delBegin
,
delEnd
);
logStoreSimpleLog2
(
"after syncNodeMakeLogSame"
,
ths
->
pLogStore
);
return
code
;
...
...
@@ -994,8 +995,8 @@ int32_t syncNodeOnAppendEntriesSnapshotCb(SSyncNode* ths, SyncAppendEntries* pMs
SyncIndex
commitEnd
=
snapshot
.
lastApplyIndex
;
ths
->
commitIndex
=
snapshot
.
lastApplyIndex
;
sDebug
(
"vgId:%d sync event c
ommit by snapshot from index:%ld to index:%ld, %s"
,
ths
->
vgId
,
commitBegin
,
commitEnd
,
syncUtilState2String
(
ths
->
state
));
sDebug
(
"vgId:%d sync event c
urrentTerm:%lu commit by snapshot from index:%ld to index:%ld, %s"
,
ths
->
vgId
,
ths
->
pRaftStore
->
currentTerm
,
commitBegin
,
commitEnd
,
syncUtilState2String
(
ths
->
state
));
}
SyncIndex
beginIndex
=
ths
->
commitIndex
+
1
;
...
...
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
1b907e84
...
...
@@ -190,18 +190,19 @@ int32_t syncNodeOnAppendEntriesReplySnapshotCb(SSyncNode* ths, SyncAppendEntries
if
(
gRaftDetailLog
)
{
char
*
s
=
snapshotSender2Str
(
pSender
);
sDebug
(
"vgId:%d sync event snapshot send to %s:%d start sender first time, lastApplyIndex:%ld lastApplyTerm:%lu "
"lastConfigIndex:%ld"
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu "
"sender:%s"
,
ths
->
vgId
,
host
,
port
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
last
ConfigIndex
,
s
);
ths
->
vgId
,
ths
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
last
ApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
privateTerm
,
s
);
taosMemoryFree
(
s
);
}
else
{
sDebug
(
"vgId:%d sync event snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
"lastApplyTerm:%lu lastConfigIndex:%ld"
,
ths
->
vgId
,
host
,
port
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
last
ConfigIndex
);
"vgId:%d sync event
currentTerm:%lu
snapshot send to %s:%d start sender first time, lastApplyIndex:%ld "
"lastApplyTerm:%lu lastConfigIndex:%ld
privateTerm:%lu
"
,
ths
->
vgId
,
ths
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
last
ApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
privateTerm
);
}
}
...
...
source/libs/sync/src/syncCommit.c
浏览文件 @
1b907e84
...
...
@@ -56,8 +56,9 @@ void syncMaybeAdvanceCommitIndex(SSyncNode* pSyncNode) {
SyncIndex
commitEnd
=
snapshot
.
lastApplyIndex
;
pSyncNode
->
commitIndex
=
snapshot
.
lastApplyIndex
;
sDebug
(
"vgId:%d sync event commit by snapshot from index:%ld to index:%ld, %s"
,
pSyncNode
->
vgId
,
pSyncNode
->
commitIndex
,
snapshot
.
lastApplyIndex
,
syncUtilState2String
(
pSyncNode
->
state
));
sDebug
(
"vgId:%d sync event currentTerm:%lu commit by snapshot from index:%ld to index:%ld, %s"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
commitIndex
,
snapshot
.
lastApplyIndex
,
syncUtilState2String
(
pSyncNode
->
state
));
}
// update commit index
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
1b907e84
...
...
@@ -187,7 +187,9 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
ASSERT
(
rid
==
pSyncNode
->
rid
);
int32_t
ret
=
0
;
bool
IamInNew
=
false
;
bool
IamInNew
=
syncNodeInConfig
(
pSyncNode
,
pNewCfg
);
#if 0
for (int i = 0; i < pNewCfg->replicaNum; ++i) {
if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
...
...
@@ -203,6 +205,7 @@ int32_t syncReconfigBuild(int64_t rid, const SSyncCfg* pNewCfg, SRpcMsg* pRpcMsg
}
*/
}
#endif
if
(
!
IamInNew
)
{
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
...
@@ -230,7 +233,9 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
bool
IamInNew
=
false
;
bool
IamInNew
=
syncNodeInConfig
(
pSyncNode
,
pNewCfg
);
#if 0
for (int i = 0; i < pNewCfg->replicaNum; ++i) {
if (strcmp((pNewCfg->nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(pNewCfg->nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
...
...
@@ -249,6 +254,7 @@ int32_t syncReconfig(int64_t rid, const SSyncCfg* pNewCfg) {
}
*/
}
#endif
if
(
!
IamInNew
)
{
sError
(
"sync reconfig error, not in new config"
);
...
...
@@ -377,6 +383,17 @@ ESyncState syncGetMyRole(int64_t rid) {
return
state
;
}
bool
syncIsReady
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
return
false
;
}
assert
(
rid
==
pSyncNode
->
rid
);
bool
b
=
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
&&
pSyncNode
->
restoreFinish
;
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
b
;
}
bool
syncIsRestoreFinish
(
int64_t
rid
)
{
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
...
...
@@ -558,7 +575,8 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
return
-
1
;
}
assert
(
rid
==
pSyncNode
->
rid
);
sDebug
(
"vgId:%d sync event propose msgType:%s"
,
pSyncNode
->
vgId
,
TMSG_INFO
(
pMsg
->
msgType
));
sDebug
(
"vgId:%d sync event currentTerm:%lu propose msgType:%s,%d"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
ret
=
syncNodePropose
(
pSyncNode
,
pMsg
,
isWeak
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
...
...
@@ -567,7 +585,8 @@ int32_t syncPropose(int64_t rid, const SRpcMsg* pMsg, bool isWeak) {
int32_t
syncNodePropose
(
SSyncNode
*
pSyncNode
,
const
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
ret
=
0
;
sDebug
(
"vgId:%d sync event propose msgType:%s"
,
pSyncNode
->
vgId
,
TMSG_INFO
(
pMsg
->
msgType
));
sDebug
(
"vgId:%d sync event currentTerm:%lu propose msgType:%s,%d"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pMsg
->
msgType
),
pMsg
->
msgType
);
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
SRespStub
stub
;
...
...
@@ -600,8 +619,6 @@ int32_t syncNodePropose(SSyncNode* pSyncNode, const SRpcMsg* pMsg, bool isWeak)
SSyncNode
*
syncNodeOpen
(
const
SSyncInfo
*
pOldSyncInfo
)
{
SSyncInfo
*
pSyncInfo
=
(
SSyncInfo
*
)
pOldSyncInfo
;
sDebug
(
"vgId:%d sync event sync open"
,
pSyncInfo
->
vgId
);
SSyncNode
*
pSyncNode
=
(
SSyncNode
*
)
taosMemoryMalloc
(
sizeof
(
SSyncNode
));
assert
(
pSyncNode
!=
NULL
);
memset
(
pSyncNode
,
0
,
sizeof
(
SSyncNode
));
...
...
@@ -815,6 +832,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pOldSyncInfo) {
// snapshot meta
// pSyncNode->sMeta.lastConfigIndex = -1;
sDebug
(
"vgId:%d sync event currentTerm:%lu sync open"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
);
return
pSyncNode
;
}
...
...
@@ -860,7 +879,7 @@ void syncNodeStartStandBy(SSyncNode* pSyncNode) {
}
void
syncNodeClose
(
SSyncNode
*
pSyncNode
)
{
sDebug
(
"vgId:%d sync event
sync close"
,
pSyncNode
->
vgId
);
sDebug
(
"vgId:%d sync event
currentTerm:%lu sync close"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
);
int32_t
ret
;
assert
(
pSyncNode
!=
NULL
);
...
...
@@ -1258,9 +1277,36 @@ char* syncNode2SimpleStr(const SSyncNode* pSyncNode) {
return
s
;
}
void
syncNodeUpdateConfig
(
SSyncNode
*
pSyncNode
,
SSyncCfg
*
newConfig
,
SyncIndex
lastConfigChangeIndex
,
bool
*
isDrop
)
{
bool
syncNodeInConfig
(
SSyncNode
*
pSyncNode
,
const
SSyncCfg
*
config
)
{
bool
b1
=
false
;
bool
b2
=
false
;
for
(
int
i
=
0
;
i
<
config
->
replicaNum
;
++
i
)
{
if
(
strcmp
((
config
->
nodeInfo
)[
i
].
nodeFqdn
,
pSyncNode
->
myNodeInfo
.
nodeFqdn
)
==
0
&&
(
config
->
nodeInfo
)[
i
].
nodePort
==
pSyncNode
->
myNodeInfo
.
nodePort
)
{
b1
=
true
;
break
;
}
}
for
(
int
i
=
0
;
i
<
config
->
replicaNum
;
++
i
)
{
SRaftId
raftId
;
raftId
.
addr
=
syncUtilAddr2U64
((
config
->
nodeInfo
)[
i
].
nodeFqdn
,
(
config
->
nodeInfo
)[
i
].
nodePort
);
raftId
.
vgId
=
pSyncNode
->
vgId
;
if
(
syncUtilSameId
(
&
raftId
,
&
(
pSyncNode
->
myRaftId
)))
{
b2
=
true
;
break
;
}
}
ASSERT
(
b1
==
b2
);
return
b1
;
}
void
syncNodeUpdateConfig
(
SSyncNode
*
pSyncNode
,
SSyncCfg
*
pNewConfig
,
SyncIndex
lastConfigChangeIndex
,
bool
*
isDrop
)
{
SSyncCfg
oldConfig
=
pSyncNode
->
pRaftCfg
->
cfg
;
pSyncNode
->
pRaftCfg
->
cfg
=
*
n
ewConfig
;
pSyncNode
->
pRaftCfg
->
cfg
=
*
pN
ewConfig
;
pSyncNode
->
pRaftCfg
->
lastConfigIndex
=
lastConfigChangeIndex
;
int32_t
ret
=
0
;
...
...
@@ -1272,7 +1318,8 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
SSyncSnapshotSender
*
oldSenders
[
TSDB_MAX_REPLICA
];
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
oldSenders
[
i
]
=
(
pSyncNode
->
senders
)[
i
];
sDebug
(
"vgId:%d sync event save senders %d, %p"
,
pSyncNode
->
vgId
,
i
,
oldSenders
[
i
]);
sDebug
(
"vgId:%d sync event currentTerm:%lu save senders %d, %p, privateTerm:%lu"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
i
,
oldSenders
[
i
],
oldSenders
[
i
]
->
privateTerm
);
if
(
gRaftDetailLog
)
{
;
}
...
...
@@ -1324,8 +1371,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
((
pSyncNode
->
replicasId
)[
i
].
addr
,
host
,
sizeof
(
host
),
&
port
);
sDebug
(
"vgId:%d sync event reset sender for %lu, newIndex:%d, %s:%d, %p"
,
pSyncNode
->
vgId
,
(
pSyncNode
->
replicasId
)[
i
].
addr
,
i
,
host
,
port
,
oldSenders
[
j
]);
sDebug
(
"vgId:%d sync event currentTerm:%lu reset sender for %lu, newIndex:%d, %s:%d, %p, privateTerm:%lu"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
(
pSyncNode
->
replicasId
)[
i
].
addr
,
i
,
host
,
port
,
oldSenders
[
j
],
oldSenders
[
j
]
->
privateTerm
);
(
pSyncNode
->
senders
)[
i
]
=
oldSenders
[
j
];
oldSenders
[
j
]
=
NULL
;
reset
=
true
;
...
...
@@ -1333,8 +1381,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
// reset replicaIndex
int32_t
oldreplicaIndex
=
(
pSyncNode
->
senders
)[
i
]
->
replicaIndex
;
(
pSyncNode
->
senders
)[
i
]
->
replicaIndex
=
i
;
sDebug
(
"vgId:%d sync event udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d"
,
pSyncNode
->
vgId
,
oldreplicaIndex
,
i
,
host
,
port
,
(
pSyncNode
->
senders
)[
i
],
reset
);
sDebug
(
"vgId:%d sync event currentTerm:%lu udpate replicaIndex from %d to %d, %s:%d, %p, reset:%d"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
oldreplicaIndex
,
i
,
host
,
port
,
(
pSyncNode
->
senders
)[
i
],
reset
);
}
}
}
...
...
@@ -1343,7 +1392,9 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
((
pSyncNode
->
senders
)[
i
]
==
NULL
)
{
(
pSyncNode
->
senders
)[
i
]
=
snapshotSenderCreate
(
pSyncNode
,
i
);
sDebug
(
"vgId:%d sync event create new sender %p replicaIndex:%d"
,
pSyncNode
->
vgId
,
(
pSyncNode
->
senders
)[
i
],
i
);
sDebug
(
"vgId:%d sync event currentTerm:%lu create new sender %p replicaIndex:%d, privateTerm:%lu"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
(
pSyncNode
->
senders
)[
i
],
i
,
(
pSyncNode
->
senders
)[
i
]
->
privateTerm
);
}
}
...
...
@@ -1351,13 +1402,16 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
for
(
int
i
=
0
;
i
<
TSDB_MAX_REPLICA
;
++
i
)
{
if
(
oldSenders
[
i
]
!=
NULL
)
{
snapshotSenderDestroy
(
oldSenders
[
i
]);
sDebug
(
"vgId:%d sync event delete old sender %p replicaIndex:%d"
,
pSyncNode
->
vgId
,
oldSenders
[
i
],
i
);
sDebug
(
"vgId:%d sync event currentTerm:%lu delete old sender %p replicaIndex:%d"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
oldSenders
[
i
],
i
);
oldSenders
[
i
]
=
NULL
;
}
}
bool
IamInOld
=
false
;
bool
IamInNew
=
false
;
bool
IamInOld
=
syncNodeInConfig
(
pSyncNode
,
&
oldConfig
);
bool
IamInNew
=
syncNodeInConfig
(
pSyncNode
,
pNewConfig
);
#if 0
for (int i = 0; i < oldConfig.replicaNum; ++i) {
if (strcmp((oldConfig.nodeInfo)[i].nodeFqdn, pSyncNode->myNodeInfo.nodeFqdn) == 0 &&
(oldConfig.nodeInfo)[i].nodePort == pSyncNode->myNodeInfo.nodePort) {
...
...
@@ -1373,6 +1427,7 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
break;
}
}
#endif
*
isDrop
=
true
;
if
(
IamInOld
&&
!
IamInNew
)
{
...
...
@@ -1381,6 +1436,10 @@ void syncNodeUpdateConfig(SSyncNode* pSyncNode, SSyncCfg* newConfig, SyncIndex l
*
isDrop
=
false
;
}
// may be add me to a new raft group
if
(
IamInOld
&&
IamInNew
&&
oldConfig
.
replicaNum
==
1
)
{
}
if
(
IamInNew
)
{
pSyncNode
->
pRaftCfg
->
isStandBy
=
0
;
// change isStandBy to normal
}
...
...
@@ -1406,14 +1465,19 @@ void syncNodeRelease(SSyncNode* pNode) { taosReleaseRef(tsNodeRefId, pNode->rid)
void
syncNodeUpdateTerm
(
SSyncNode
*
pSyncNode
,
SyncTerm
term
)
{
if
(
term
>
pSyncNode
->
pRaftStore
->
currentTerm
)
{
raftStoreSetTerm
(
pSyncNode
->
pRaftStore
,
term
);
syncNodeBecomeFollower
(
pSyncNode
,
"update term"
);
char
tmpBuf
[
64
];
snprintf
(
tmpBuf
,
sizeof
(
tmpBuf
),
"update term to %lu"
,
term
);
syncNodeBecomeFollower
(
pSyncNode
,
tmpBuf
);
raftStoreClearVote
(
pSyncNode
->
pRaftStore
);
}
}
void
syncNodeBecomeFollower
(
SSyncNode
*
pSyncNode
,
const
char
*
debugStr
)
{
sDebug
(
"vgId:%d sync event become follower, isStandBy:%d, replicaNum:%d, %s"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
debugStr
);
sDebug
(
"vgId:%d sync event currentTerm:%lu become follower, isStandBy:%d, replicaNum:%d, "
"restoreFinish:%d, %s"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
restoreFinish
,
debugStr
);
// maybe clear leader cache
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
...
...
@@ -1447,8 +1511,12 @@ void syncNodeBecomeFollower(SSyncNode* pSyncNode, const char* debugStr) {
// /\ UNCHANGED <<messages, currentTerm, votedFor, candidateVars, logVars>>
//
void
syncNodeBecomeLeader
(
SSyncNode
*
pSyncNode
,
const
char
*
debugStr
)
{
sDebug
(
"vgId:%d sync event become leader, isStandBy:%d, replicaNum:%d %s"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
debugStr
);
// reset restoreFinish
pSyncNode
->
restoreFinish
=
false
;
sDebug
(
"vgId:%d sync event currentTerm:%lu become leader, isStandBy:%d, replicaNum:%d, restoreFinish:%d, %s"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pSyncNode
->
pRaftCfg
->
isStandBy
,
pSyncNode
->
replicaNum
,
pSyncNode
->
restoreFinish
,
debugStr
);
// state change
pSyncNode
->
state
=
TAOS_SYNC_STATE_LEADER
;
...
...
@@ -2022,21 +2090,13 @@ const char* syncStr(ESyncState state) {
static
int32_t
syncDoLeaderTransfer
(
SSyncNode
*
ths
,
SRpcMsg
*
pRpcMsg
,
SSyncRaftEntry
*
pEntry
)
{
SyncLeaderTransfer
*
pSyncLeaderTransfer
=
syncLeaderTransferFromRpcMsg2
(
pRpcMsg
);
/*
char host[128];
uint16_t port;
syncUtilU642Addr(pSyncLeaderTransfer->newLeaderId.addr, host, sizeof(host), &port);
sDebug("vgId:%d sync event, maybe leader transfer to %s:%d %lu", ths->vgId, host, port,
pSyncLeaderTransfer->newLeaderId.addr);
*/
sDebug
(
"vgId:%d sync event, begin leader transfer"
,
ths
->
vgId
);
sDebug
(
"vgId:%d sync event currentTerm:%lu begin leader transfer"
,
ths
->
vgId
,
ths
->
pRaftStore
->
currentTerm
);
if
(
strcmp
(
pSyncLeaderTransfer
->
newNodeInfo
.
nodeFqdn
,
ths
->
myNodeInfo
.
nodeFqdn
)
==
0
&&
pSyncLeaderTransfer
->
newNodeInfo
.
nodePort
==
ths
->
myNodeInfo
.
nodePort
)
{
sDebug
(
"vgId:%d sync event
,
maybe leader transfer to %s:%d %lu"
,
ths
->
vgId
,
pSyncLeaderTransfer
->
newNodeInfo
.
nodeFqdn
,
pSyncLeaderTransfer
->
newNodeInfo
.
nodePort
,
pSyncLeaderTransfer
->
newLeaderId
.
addr
);
sDebug
(
"vgId:%d sync event
currentTerm:%lu
maybe leader transfer to %s:%d %lu"
,
ths
->
vgId
,
ths
->
pRaftStore
->
currentTerm
,
pSyncLeaderTransfer
->
newNodeInfo
.
nodeFqdn
,
pSyncLeaderTransfer
->
new
NodeInfo
.
nodePort
,
pSyncLeaderTransfer
->
new
LeaderId
.
addr
);
// reset elect timer now!
int32_t
electMS
=
1
;
...
...
@@ -2069,6 +2129,21 @@ static int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
return
0
;
}
int32_t
syncNodeUpdateNewConfigIndex
(
SSyncNode
*
ths
,
SSyncCfg
*
pNewCfg
)
{
for
(
int
i
=
0
;
i
<
pNewCfg
->
replicaNum
;
++
i
)
{
SRaftId
raftId
;
raftId
.
addr
=
syncUtilAddr2U64
((
pNewCfg
->
nodeInfo
)[
i
].
nodeFqdn
,
(
pNewCfg
->
nodeInfo
)[
i
].
nodePort
);
raftId
.
vgId
=
ths
->
vgId
;
if
(
syncUtilSameId
(
&
(
ths
->
myRaftId
),
&
raftId
))
{
pNewCfg
->
myIndex
=
i
;
return
0
;
}
}
return
-
1
;
}
static
int32_t
syncNodeConfigChange
(
SSyncNode
*
ths
,
SRpcMsg
*
pRpcMsg
,
SSyncRaftEntry
*
pEntry
)
{
SSyncCfg
oldSyncCfg
=
ths
->
pRaftCfg
->
cfg
;
...
...
@@ -2077,19 +2152,23 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
ASSERT
(
ret
==
0
);
// update new config myIndex
bool
IamInNew
=
false
;
for
(
int
i
=
0
;
i
<
newSyncCfg
.
replicaNum
;
++
i
)
{
if
(
strcmp
(
ths
->
myNodeInfo
.
nodeFqdn
,
(
newSyncCfg
.
nodeInfo
)[
i
].
nodeFqdn
)
==
0
&&
ths
->
myNodeInfo
.
nodePort
==
(
newSyncCfg
.
nodeInfo
)[
i
].
nodePort
)
{
newSyncCfg
.
myIndex
=
i
;
IamInNew
=
true
;
break
;
}
}
syncNodeUpdateNewConfigIndex
(
ths
,
&
newSyncCfg
);
bool
IamInNew
=
syncNodeInConfig
(
ths
,
&
newSyncCfg
);
/*
for (int i = 0; i < newSyncCfg.replicaNum; ++i) {
if (strcmp(ths->myNodeInfo.nodeFqdn, (newSyncCfg.nodeInfo)[i].nodeFqdn) == 0 &&
ths->myNodeInfo.nodePort == (newSyncCfg.nodeInfo)[i].nodePort) {
newSyncCfg.myIndex = i;
IamInNew = true;
break;
}
}
*/
bool
isDrop
;
// if (IamInNew || (!IamInNew && ths->state != TAOS_SYNC_STATE_LEADER)) {
if
(
IamInNew
)
{
syncNodeUpdateConfig
(
ths
,
&
newSyncCfg
,
pEntry
->
index
,
&
isDrop
);
...
...
@@ -2139,8 +2218,8 @@ static int32_t syncNodeConfigChange(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftE
int32_t
syncNodeCommit
(
SSyncNode
*
ths
,
SyncIndex
beginIndex
,
SyncIndex
endIndex
,
uint64_t
flag
)
{
int32_t
code
=
0
;
ESyncState
state
=
flag
;
sDebug
(
"vgId:%d sync event c
ommit by wal from index:%"
PRId64
" to index:%"
PRId64
", %s"
,
ths
->
vgId
,
beginIndex
,
endIndex
,
syncUtilState2String
(
state
));
sDebug
(
"vgId:%d sync event c
urrentTerm:%lu commit by wal from index:%"
PRId64
" to index:%"
PRId64
", %s"
,
ths
->
vgId
,
ths
->
pRaftStore
->
currentTerm
,
beginIndex
,
endIndex
,
syncUtilState2String
(
state
));
// execute fsm
if
(
ths
->
pFsm
!=
NULL
)
{
...
...
@@ -2188,7 +2267,8 @@ int32_t syncNodeCommit(SSyncNode* ths, SyncIndex beginIndex, SyncIndex endIndex,
ths
->
pFsm
->
FpRestoreFinishCb
(
ths
->
pFsm
);
}
ths
->
restoreFinish
=
true
;
sDebug
(
"vgId:%d sync event restore finish, index:%ld"
,
ths
->
vgId
,
pEntry
->
index
);
sDebug
(
"vgId:%d sync event currentTerm:%lu restore finish, %s, index:%ld"
,
ths
->
vgId
,
ths
->
pRaftStore
->
currentTerm
,
syncUtilState2String
(
ths
->
state
),
pEntry
->
index
);
}
}
...
...
source/libs/sync/src/syncRaftLog.c
浏览文件 @
1b907e84
...
...
@@ -15,6 +15,7 @@
#include "syncRaftLog.h"
#include "syncRaftCfg.h"
#include "syncRaftStore.h"
#include "wal.h"
// refactor, log[0 .. n] ==> log[m .. n]
...
...
@@ -162,9 +163,10 @@ static int32_t raftLogAppendEntry(struct SSyncLogStore* pLogStore, SSyncRaftEntr
walFsync
(
pWal
,
true
);
sDebug
(
"vgId:%d sync event write index:%ld, %s, isStandBy:%d, msgType:%s, originalRpcType:%s"
,
pData
->
pSyncNode
->
vgId
,
pEntry
->
index
,
syncUtilState2String
(
pData
->
pSyncNode
->
state
),
pData
->
pSyncNode
->
pRaftCfg
->
isStandBy
,
TMSG_INFO
(
pEntry
->
msgType
),
TMSG_INFO
(
pEntry
->
originalRpcType
));
sDebug
(
"vgId:%d sync event currentTerm:%lu write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d"
,
pData
->
pSyncNode
->
vgId
,
pData
->
pSyncNode
->
pRaftStore
->
currentTerm
,
pEntry
->
index
,
syncUtilState2String
(
pData
->
pSyncNode
->
state
),
pData
->
pSyncNode
->
pRaftCfg
->
isStandBy
,
TMSG_INFO
(
pEntry
->
msgType
),
pEntry
->
msgType
,
TMSG_INFO
(
pEntry
->
originalRpcType
),
pEntry
->
originalRpcType
);
return
code
;
}
...
...
@@ -320,7 +322,12 @@ int32_t logStoreAppendEntry(SSyncLogStore* pLogStore, SSyncRaftEntry* pEntry) {
walFsync
(
pWal
,
true
);
sDebug
(
"sync event old write wal: %ld"
,
pEntry
->
index
);
sDebug
(
"vgId:%d sync event currentTerm:%lu old write index:%ld, %s, isStandBy:%d, msgType:%s,%d, originalRpcType:%s,%d"
,
pData
->
pSyncNode
->
vgId
,
pData
->
pSyncNode
->
pRaftStore
->
currentTerm
,
pEntry
->
index
,
syncUtilState2String
(
pData
->
pSyncNode
->
state
),
pData
->
pSyncNode
->
pRaftCfg
->
isStandBy
,
TMSG_INFO
(
pEntry
->
msgType
),
pEntry
->
msgType
,
TMSG_INFO
(
pEntry
->
originalRpcType
),
pEntry
->
originalRpcType
);
return
code
;
}
...
...
source/libs/sync/src/syncRespMgr.c
浏览文件 @
1b907e84
...
...
@@ -14,6 +14,7 @@
*/
#include "syncRespMgr.h"
#include "syncRaftStore.h"
SSyncRespMgr
*
syncRespMgrCreate
(
void
*
data
,
int64_t
ttl
)
{
SSyncRespMgr
*
pObj
=
(
SSyncRespMgr
*
)
taosMemoryMalloc
(
sizeof
(
SSyncRespMgr
));
...
...
@@ -45,8 +46,9 @@ int64_t syncRespMgrAdd(SSyncRespMgr *pObj, SRespStub *pStub) {
taosHashPut
(
pObj
->
pRespHash
,
&
keyCode
,
sizeof
(
keyCode
),
pStub
,
sizeof
(
SRespStub
));
SSyncNode
*
pSyncNode
=
pObj
->
data
;
sDebug
(
"vgId:%d sync event resp mgr add, type:%s seq:%lu handle:%p"
,
pSyncNode
->
vgId
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
keyCode
,
pStub
->
rpcMsg
.
info
.
handle
);
sDebug
(
"vgId:%d sync event currentTerm:%lu resp mgr add, msgType:%s,%d seq:%lu handle:%p ahandle:%p"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
keyCode
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
taosThreadMutexUnlock
(
&
(
pObj
->
mutex
));
return
keyCode
;
...
...
@@ -69,8 +71,9 @@ int32_t syncRespMgrGet(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStub) {
memcpy
(
pStub
,
pTmp
,
sizeof
(
SRespStub
));
SSyncNode
*
pSyncNode
=
pObj
->
data
;
sDebug
(
"vgId:%d sync event resp mgr get, type:%s seq:%lu handle:%p"
,
pSyncNode
->
vgId
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
index
,
pStub
->
rpcMsg
.
info
.
handle
);
sDebug
(
"vgId:%d sync event currentTerm:%lu resp mgr get, msgType:%s,%d seq:%lu handle:%p ahandle:%p"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
index
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
taosThreadMutexUnlock
(
&
(
pObj
->
mutex
));
return
1
;
// get one object
...
...
@@ -87,8 +90,9 @@ int32_t syncRespMgrGetAndDel(SSyncRespMgr *pObj, uint64_t index, SRespStub *pStu
memcpy
(
pStub
,
pTmp
,
sizeof
(
SRespStub
));
SSyncNode
*
pSyncNode
=
pObj
->
data
;
sDebug
(
"vgId:%d sync event resp mgr get and del, type:%s seq:%lu handle:%p"
,
pSyncNode
->
vgId
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
index
,
pStub
->
rpcMsg
.
info
.
handle
);
sDebug
(
"vgId:%d sync event currentTerm:%lu resp mgr get and del, msgType:%s,%d seq:%lu handle:%p ahandle:%p"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
TMSG_INFO
(
pStub
->
rpcMsg
.
msgType
),
pStub
->
rpcMsg
.
msgType
,
index
,
pStub
->
rpcMsg
.
info
.
handle
,
pStub
->
rpcMsg
.
info
.
ahandle
);
taosHashRemove
(
pObj
->
pRespHash
,
&
index
,
sizeof
(
index
));
taosThreadMutexUnlock
(
&
(
pObj
->
mutex
));
...
...
source/libs/sync/src/syncSnapshot.c
浏览文件 @
1b907e84
...
...
@@ -141,18 +141,22 @@ void snapshotSenderStart(SSyncSnapshotSender *pSender) {
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
"vgId:%d sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"lastConfigIndex:%ld send "
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send "
"msg:%s"
,
pSender
->
pSyncNode
->
vgId
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
msgStr
);
pSender
->
pSyncNode
->
vgId
,
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
privateTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sDebug
(
"vgId:%d sync event snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"lastConfigIndex:%ld"
,
pSender
->
pSyncNode
->
vgId
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
);
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d begin seq:%d ack:%d lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu"
,
pSender
->
pSyncNode
->
vgId
,
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
privateTerm
);
}
syncSnapshotSendDestroy
(
pMsg
);
...
...
@@ -279,25 +283,31 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender) {
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
"vgId:%d sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"lastConfigIndex:%ld send "
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu send "
"msg:%s"
,
pSender
->
pSyncNode
->
vgId
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
msgStr
);
pSender
->
pSyncNode
->
vgId
,
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
privateTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sDebug
(
"vgId:%d sync event snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"lastConfigIndex:%ld"
,
pSender
->
pSyncNode
->
vgId
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
);
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d finish seq:%d ack:%d lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu"
,
pSender
->
pSyncNode
->
vgId
,
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
privateTerm
);
}
}
else
{
sDebug
(
"vgId:%d sync event snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld lastApplyTerm:%lu "
"lastConfigIndex:%ld"
,
pSender
->
pSyncNode
->
vgId
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
);
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d sending seq:%d ack:%d lastApplyIndex:%ld "
"lastApplyTerm:%lu "
"lastConfigIndex:%ld privateTerm:%lu"
,
pSender
->
pSyncNode
->
vgId
,
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
snapshot
.
lastApplyIndex
,
pSender
->
snapshot
.
lastApplyTerm
,
pSender
->
snapshot
.
lastConfigIndex
,
pSender
->
privateTerm
);
}
syncSnapshotSendDestroy
(
pMsg
);
...
...
@@ -328,12 +338,15 @@ int32_t snapshotReSend(SSyncSnapshotSender *pSender) {
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
"vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d send msg:%s"
,
pSender
->
pSyncNode
->
vgId
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
msgStr
);
sDebug
(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu send msg:%s"
,
pSender
->
pSyncNode
->
vgId
,
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
privateTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sDebug
(
"vgId:%d sync event snapshot send to %s:%d resend seq:%d ack:%d"
,
pSender
->
pSyncNode
->
vgId
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
);
sDebug
(
"vgId:%d sync event currentTerm:%lu snapshot send to %s:%d resend seq:%d ack:%d privateTerm:%lu"
,
pSender
->
pSyncNode
->
vgId
,
pSender
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pSender
->
seq
,
pSender
->
ack
,
pSender
->
privateTerm
);
}
syncSnapshotSendDestroy
(
pMsg
);
...
...
@@ -485,7 +498,7 @@ void snapshotReceiverStop(SSyncSnapshotReceiver *pReceiver, bool apply) {
pReceiver
->
start
=
false
;
if
(
apply
)
{
++
(
pReceiver
->
privateTerm
);
//
++(pReceiver->privateTerm);
}
if
(
gRaftDetailLog
)
{
...
...
@@ -566,16 +579,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
"vgId:%d sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, recv msg:%s"
,
pSyncNode
->
vgId
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfig
Index
,
msgStr
);
"vgId:%d sync event
currentTerm:%lu
snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld,
privateTerm:%lu,
recv msg:%s"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
last
Index
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sDebug
(
"vgId:%d sync event snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld"
,
pSyncNode
->
vgId
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
);
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d begin ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld privateTerm:%lu"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
);
}
}
else
if
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_END
)
{
...
...
@@ -590,6 +604,12 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
pMsg
->
lastConfigIndex
>=
SYNC_INDEX_BEGIN
)
{
int32_t
oldReplicaNum
=
pSyncNode
->
replicaNum
;
// update new config myIndex
SSyncCfg
newSyncCfg
=
pMsg
->
lastConfig
;
syncNodeUpdateNewConfigIndex
(
pSyncNode
,
&
newSyncCfg
);
bool
IamInNew
=
syncNodeInConfig
(
pSyncNode
,
&
newSyncCfg
);
#if 0
// update new config myIndex
bool IamInNew = false;
SSyncCfg newSyncCfg = pMsg->lastConfig;
...
...
@@ -601,17 +621,23 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
break;
}
}
#endif
bool
isDrop
;
if
(
IamInNew
)
{
sDebug
(
"vgId:%d sync event update config by snapshot, lastIndex:%ld, lastTerm:%lu, lastConfigIndex:%ld "
,
pSyncNode
->
vgId
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
);
sDebug
(
"vgId:%d sync event currentTerm:%lu update config by snapshot, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld "
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
);
syncNodeUpdateConfig
(
pSyncNode
,
&
newSyncCfg
,
pMsg
->
lastConfigIndex
,
&
isDrop
);
}
else
{
sDebug
(
"vgId:%d sync event do not update config by snapshot, I am not in newCfg, lastIndex:%ld, lastTerm:%lu, "
"vgId:%d sync event currentTerm:%lu do not update config by snapshot, I am not in newCfg, "
"lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld "
,
pSyncNode
->
vgId
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
);
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
);
}
// change isStandBy to normal
...
...
@@ -636,19 +662,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
gRaftDetailLog
)
{
char
*
logSimpleStr
=
logStoreSimple2Str
(
pSyncNode
->
pLogStore
);
sDebug
(
"vgId:%d sync event snapshot recv from %s:%d finish, update log begin index:%ld, "
"vgId:%d sync event
currentTerm:%lu
snapshot recv from %s:%d finish, update log begin index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, raft log:%s"
,
pSyncNode
->
vgId
,
host
,
port
,
pMsg
->
lastIndex
+
1
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
snapshot
.
lastConfigIndex
,
logSimpleStr
);
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld, privateTerm:%lu, raft log:%s"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pMsg
->
lastIndex
+
1
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
snapshot
.
lastConfigIndex
,
pReceiver
->
privateTerm
,
logSimpleStr
);
taosMemoryFree
(
logSimpleStr
);
}
else
{
sDebug
(
"vgId:%d sync event snapshot recv from %s:%d finish, update log begin index:%ld, "
"vgId:%d sync event
currentTerm:%lu
snapshot recv from %s:%d finish, update log begin index:%ld, "
"snapshot.lastApplyIndex:%ld, "
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld"
,
pSyncNode
->
vgId
,
host
,
port
,
pMsg
->
lastIndex
+
1
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
snapshot
.
last
ConfigIndex
);
"snapshot.lastApplyTerm:%lu, snapshot.lastConfigIndex:%ld
, privateTerm:%lu
"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pMsg
->
lastIndex
+
1
,
snapshot
.
last
ApplyIndex
,
snapshot
.
lastApplyTerm
,
snapshot
.
lastConfigIndex
,
pReceiver
->
privateTerm
);
}
pReceiver
->
pWriter
=
NULL
;
...
...
@@ -659,17 +686,17 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
"vgId:%d sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, recv msg:%s"
,
pReceiver
->
pSyncNode
->
vgId
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
last
ConfigIndex
,
msgStr
);
"vgId:%d sync event
currentTerm:%lu
snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld,
privateTerm:%lu,
recv msg:%s"
,
pReceiver
->
pSyncNode
->
vgId
,
pReceiver
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
last
Index
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sDebug
(
"vgId:%d sync event snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld"
,
pReceiver
->
pSyncNode
->
vgId
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
last
ConfigIndex
);
"vgId:%d sync event
currentTerm:%lu
snapshot recv from %s:%d end ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld
, privateTerm:%lu
"
,
pReceiver
->
pSyncNode
->
vgId
,
pReceiver
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
last
Index
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
);
}
}
else
if
(
pMsg
->
seq
==
SYNC_SNAPSHOT_SEQ_FORCE_CLOSE
)
{
...
...
@@ -684,18 +711,20 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
"vgId:%d sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, recv "
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv "
"msg:%s"
,
pReceiver
->
pSyncNode
->
vgId
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
last
ConfigIndex
,
msgStr
);
pReceiver
->
pSyncNode
->
vgId
,
pReceiver
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
last
Index
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sDebug
(
"vgId:%d sync event snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld"
,
pReceiver
->
pSyncNode
->
vgId
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
);
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d force close ack:%d, lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu"
,
pReceiver
->
pSyncNode
->
vgId
,
pReceiver
->
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
);
}
}
else
if
(
pMsg
->
seq
>
SYNC_SNAPSHOT_SEQ_BEGIN
&&
pMsg
->
seq
<
SYNC_SNAPSHOT_SEQ_END
)
{
...
...
@@ -715,16 +744,19 @@ int32_t syncNodeOnSnapshotSendCb(SSyncNode *pSyncNode, SyncSnapshotSend *pMsg) {
if
(
gRaftDetailLog
)
{
char
*
msgStr
=
syncSnapshotSend2Str
(
pMsg
);
sDebug
(
"vgId:%d sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld, recv msg:%s"
,
pSyncNode
->
vgId
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
msgStr
);
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu, recv msg:%s"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
,
msgStr
);
taosMemoryFree
(
msgStr
);
}
else
{
sDebug
(
"vgId:%d sync event snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, lastTerm:%lu, "
"lastConfigIndex:%ld"
,
pSyncNode
->
vgId
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
);
"vgId:%d sync event currentTerm:%lu snapshot recv from %s:%d receiving ack:%d, lastIndex:%ld, "
"lastTerm:%lu, "
"lastConfigIndex:%ld, privateTerm:%lu"
,
pSyncNode
->
vgId
,
pSyncNode
->
pRaftStore
->
currentTerm
,
host
,
port
,
pReceiver
->
ack
,
pMsg
->
lastIndex
,
pMsg
->
lastTerm
,
pMsg
->
lastConfigIndex
,
pReceiver
->
privateTerm
);
}
}
else
{
...
...
source/libs/sync/test/syncTestTool.cpp
浏览文件 @
1b907e84
...
...
@@ -297,7 +297,7 @@ void usage(char* exe) {
SRpcMsg
*
createRpcMsg
(
int
i
,
int
count
,
int
myIndex
)
{
SRpcMsg
*
pMsg
=
(
SRpcMsg
*
)
taosMemoryMalloc
(
sizeof
(
SRpcMsg
));
memset
(
pMsg
,
0
,
sizeof
(
SRpcMsg
));
pMsg
->
msgType
=
9999
;
pMsg
->
msgType
=
TDMT_VND_SUBMIT
;
pMsg
->
contLen
=
256
;
pMsg
->
pCont
=
rpcMallocCont
(
pMsg
->
contLen
);
snprintf
((
char
*
)(
pMsg
->
pCont
),
pMsg
->
contLen
,
"value-myIndex:%u-%d-%d-%ld"
,
myIndex
,
i
,
count
,
taosGetTimestampMs
());
...
...
@@ -384,8 +384,10 @@ int main(int argc, char** argv) {
leaderTransferWait
++
;
if
(
leaderTransferWait
==
7
)
{
sTrace
(
"begin leader transfer ..."
);
int32_t
ret
=
syncLeaderTransfer
(
rid
);
if
(
leaderTransfer
)
{
sTrace
(
"begin leader transfer ..."
);
int32_t
ret
=
syncLeaderTransfer
(
rid
);
}
}
if
(
alreadySend
<
writeRecordNum
)
{
...
...
source/libs/tdb/src/db/tdbPage.c
浏览文件 @
1b907e84
...
...
@@ -246,14 +246,17 @@ void tdbPageCopy(SPage *pFromPage, SPage *pToPage) {
int
tdbPageCapacity
(
int
pageSize
,
int
amHdrSize
)
{
int
szPageHdr
;
int
minCellIndexSize
;
// at least one cell in cell index
if
(
pageSize
<
65536
)
{
szPageHdr
=
pageMethods
.
szPageHdr
;
minCellIndexSize
=
pageMethods
.
szOffset
;
}
else
{
szPageHdr
=
pageLargeMethods
.
szPageHdr
;
minCellIndexSize
=
pageLargeMethods
.
szOffset
;
}
return
pageSize
-
szPageHdr
-
amHdrSize
;
return
pageSize
-
szPageHdr
-
amHdrSize
-
sizeof
(
SPageFtr
)
-
minCellIndexSize
;
}
static
int
tdbPageAllocate
(
SPage
*
pPage
,
int
szCell
,
SCell
**
ppCell
)
{
...
...
@@ -599,4 +602,4 @@ SPageMethods pageLargeMethods = {
setLPageCellOffset
,
// setCellOffset
getLPageFreeCellInfo
,
// getFreeCellInfo
setLPageFreeCellInfo
// setFreeCellInfo
};
\ No newline at end of file
};
source/os/src/osSysinfo.c
浏览文件 @
1b907e84
...
...
@@ -945,3 +945,19 @@ SysNameInfo taosGetSysNameInfo() {
return
info
;
#endif
}
bool
taosCheckCurrentInDll
()
{
#ifdef WINDOWS
MEMORY_BASIC_INFORMATION
mbi
;
char
path
[
PATH_MAX
]
=
{
0
};
GetModuleFileName
(((
VirtualQuery
(
taosCheckCurrentInDll
,
&
mbi
,
sizeof
(
mbi
))
!=
0
)
?
(
HMODULE
)
mbi
.
AllocationBase
:
NULL
),
path
,
PATH_MAX
);
int
strLastIndex
=
strlen
(
path
);
if
((
path
[
strLastIndex
-
3
]
==
'd'
||
path
[
strLastIndex
-
3
]
==
'D'
)
&&
(
path
[
strLastIndex
-
2
]
==
'l'
||
path
[
strLastIndex
-
2
]
==
'L'
)
&&
(
path
[
strLastIndex
-
1
]
==
'l'
||
path
[
strLastIndex
-
1
]
==
'L'
))
{
return
true
;
}
return
false
;
#else
return
false
;
#endif
}
\ No newline at end of file
source/util/src/tcache.c
浏览文件 @
1b907e84
...
...
@@ -829,7 +829,11 @@ void *taosCacheTimedRefresh(void *handle) {
const
int32_t
SLEEP_DURATION
=
500
;
// 500 ms
int64_t
count
=
0
;
atexit
(
taosCacheRefreshWorkerUnexpectedStopped
);
#ifdef WINDOWS
if
(
taosCheckCurrentInDll
())
{
atexit
(
taosCacheRefreshWorkerUnexpectedStopped
);
}
#endif
while
(
1
)
{
taosMsleep
(
SLEEP_DURATION
);
...
...
tests/script/tsim/stream/distributeInterval0.sim
浏览文件 @
1b907e84
...
...
@@ -173,4 +173,39 @@ endi
sql select _wstartts, count(*) c1, count(d) c2 , sum(a) c3 , max(b) c4, min(c) c5, avg(d) from st interval(10s);
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
sql create database test1 vgroups 1;
sql use test1;
sql create stable st(ts timestamp, a int, b int , c int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t2 trigger at_once into streamtST1 as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6 from st interval(10s) ;
sql insert into ts1 values(1648791211000,1,2,3);
sql insert into ts1 values(1648791222001,2,2,3);
sql insert into ts2 values(1648791211000,1,2,3);
sql insert into ts2 values(1648791222001,2,2,3);
$loop_count = 0
loop2:
sql select * from streamtST1;
sleep 300
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
# row 0
if $data01 != 2 then
print =====data01=$data01
goto loop2
endi
#rows 1
if $data11 != 2 then
print =====data11=$data11
goto loop2
endi
system sh/stop_dnodes.sh
\ No newline at end of file
tests/script/tsim/stream/partitionby.sim
浏览文件 @
1b907e84
...
...
@@ -34,6 +34,7 @@ print =====rows=$rows
goto loop0
endi
print =====loop0
sql create database test1 vgroups 1;
sql use test1;
...
...
@@ -51,7 +52,7 @@ sql insert into ts2 values(1648791211000,1,2,3);
$loop_count = 0
loop
0
:
loop
1
:
sleep 300
sql select * from streamt;
...
...
@@ -62,7 +63,62 @@ endi
if $rows != 2 then
print =====rows=$rows
goto loop
0
goto loop
1
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
\ No newline at end of file
print =====loop1
sql create database test2 vgroups 1;
sql use test2;
sql create stable st(ts timestamp,a int,b int,c int,id int) tags(ta int,tb int,tc int);
sql create table ts1 using st tags(1,1,1);
sql create table ts2 using st tags(2,2,2);
sql create stream stream_t2 trigger at_once into streamtST as select _wstartts, count(*) c1, count(a) c2 , sum(a) c3 , max(b) c5, min(c) c6, max(id) c7 from st partition by ta interval(10s) ;
sql insert into ts1 values(1648791211000,1,2,3,1);
sql insert into ts1 values(1648791222001,2,2,3,2);
sql insert into ts2 values(1648791211000,1,2,3,3);
sql insert into ts2 values(1648791222001,2,2,3,4);
sql insert into ts2 values(1648791222002,2,2,3,5);
sql insert into ts2 values(1648791222002,2,2,3,6);
sql insert into ts1 values(1648791211000,1,2,3,1);
sql insert into ts1 values(1648791222001,2,2,3,2);
sql insert into ts2 values(1648791211000,1,2,3,3);
sql insert into ts2 values(1648791222001,2,2,3,4);
$loop_count = 0
loop2:
sleep 300
sql select * from streamtST;
$loop_count = $loop_count + 1
if $loop_count == 10 then
return -1
endi
if $data01 != 1 then
print =====data01=$data01
goto loop2
endi
if $data02 != 1 then
print =====data02=$data02
goto loop2
endi
if $data03 != 1 then
print =====data03=$data03
goto loop2
endi
if $data04 != 2 then
print =====data04=$data04
goto loop2
endi
print =====loop2
system sh/stop_dnodes.sh
\ No newline at end of file
tests/system-test/test.py
浏览文件 @
1b907e84
...
...
@@ -33,8 +33,16 @@ import taos
def
checkRunTimeError
():
import
win32gui
timeCount
=
0
while
1
:
time
.
sleep
(
1
)
timeCount
=
timeCount
+
1
if
(
timeCount
>
900
):
os
.
system
(
"TASKKILL /F /IM taosd.exe"
)
os
.
system
(
"TASKKILL /F /IM taos.exe"
)
os
.
system
(
"TASKKILL /F /IM tmq_sim.exe"
)
os
.
system
(
"TASKKILL /F /IM mintty.exe"
)
quit
(
0
)
hwnd
=
win32gui
.
FindWindow
(
None
,
"Microsoft Visual C++ Runtime Library"
)
if
hwnd
:
os
.
system
(
"TASKKILL /F /IM taosd.exe"
)
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录