Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4a988a7c
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1187
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
4a988a7c
编写于
7月 01, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into feature/stream
上级
daf5a0bd
1920980d
变更
30
隐藏空白更改
内联
并排
Showing
30 changed file
with
901 addition
and
67 deletion
+901
-67
include/common/tdatablock.h
include/common/tdatablock.h
+1
-0
include/common/tmsgdef.h
include/common/tmsgdef.h
+2
-0
include/libs/sync/sync.h
include/libs/sync/sync.h
+5
-1
include/libs/sync/syncTools.h
include/libs/sync/syncTools.h
+60
-0
include/util/taoserror.h
include/util/taoserror.h
+1
-0
include/util/tarray.h
include/util/tarray.h
+1
-1
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+7
-0
source/dnode/mnode/impl/src/mndSync.c
source/dnode/mnode/impl/src/mndSync.c
+1
-1
source/dnode/vnode/src/vnd/vnodeSync.c
source/dnode/vnode/src/vnd/vnodeSync.c
+5
-5
source/libs/executor/src/executorimpl.c
source/libs/executor/src/executorimpl.c
+4
-2
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+8
-1
source/libs/sync/inc/syncAppendEntries.h
source/libs/sync/inc/syncAppendEntries.h
+1
-0
source/libs/sync/inc/syncAppendEntriesReply.h
source/libs/sync/inc/syncAppendEntriesReply.h
+6
-0
source/libs/sync/inc/syncInt.h
source/libs/sync/inc/syncInt.h
+2
-0
source/libs/sync/inc/syncReplication.h
source/libs/sync/inc/syncReplication.h
+2
-0
source/libs/sync/inc/syncSnapshot.h
source/libs/sync/inc/syncSnapshot.h
+7
-7
source/libs/sync/src/syncAppendEntries.c
source/libs/sync/src/syncAppendEntries.c
+2
-0
source/libs/sync/src/syncAppendEntriesReply.c
source/libs/sync/src/syncAppendEntriesReply.c
+84
-33
source/libs/sync/src/syncMain.c
source/libs/sync/src/syncMain.c
+131
-1
source/libs/sync/src/syncMessage.c
source/libs/sync/src/syncMessage.c
+316
-1
source/libs/sync/src/syncRaftCfg.c
source/libs/sync/src/syncRaftCfg.c
+4
-4
source/libs/sync/src/syncReplication.c
source/libs/sync/src/syncReplication.c
+87
-0
source/libs/sync/src/syncSnapshot.c
source/libs/sync/src/syncSnapshot.c
+5
-5
source/libs/sync/test/CMakeLists.txt
source/libs/sync/test/CMakeLists.txt
+14
-0
source/libs/sync/test/syncAppendEntriesBatchTest.cpp
source/libs/sync/test/syncAppendEntriesBatchTest.cpp
+139
-0
source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
+1
-1
source/libs/sync/test/syncSnapshotSenderTest.cpp
source/libs/sync/test/syncSnapshotSenderTest.cpp
+1
-1
source/libs/sync/test/syncTestTool.cpp
source/libs/sync/test/syncTestTool.cpp
+1
-1
source/util/src/tarray.c
source/util/src/tarray.c
+2
-2
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
include/common/tdatablock.h
浏览文件 @
4a988a7c
...
...
@@ -235,6 +235,7 @@ SColumnInfoData* bdGetColumnInfoData(SSDataBlock* pBlock, int32_t index);
void
blockEncode
(
const
SSDataBlock
*
pBlock
,
char
*
data
,
int32_t
*
dataLen
,
int32_t
numOfCols
,
int8_t
needCompress
);
const
char
*
blockDecode
(
SSDataBlock
*
pBlock
,
int32_t
numOfCols
,
int32_t
numOfRows
,
const
char
*
pData
);
void
blockDebugShowDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
);
void
blockDebugShowDataBlocks
(
const
SArray
*
dataBlocks
,
const
char
*
flag
);
// for debug
char
*
dumpBlockData
(
SSDataBlock
*
pDataBlock
,
const
char
*
flag
,
char
**
dumpBuf
);
...
...
include/common/tmsgdef.h
浏览文件 @
4a988a7c
...
...
@@ -233,10 +233,12 @@ enum {
TD_DEF_MSG_TYPE
(
TDMT_SYNC_PING
,
"sync-ping"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_PING_REPLY
,
"sync-ping-reply"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_CLIENT_REQUEST
,
"sync-client-request"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_CLIENT_REQUEST_BATCH
,
"sync-client-request-batch"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_CLIENT_REQUEST_REPLY
,
"sync-client-request-reply"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_REQUEST_VOTE
,
"sync-request-vote"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_REQUEST_VOTE_REPLY
,
"sync-request-vote-reply"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_APPEND_ENTRIES
,
"sync-append-entries"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_APPEND_ENTRIES_BATCH
,
"sync-append-entries-batch"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_APPEND_ENTRIES_REPLY
,
"sync-append-entries-reply"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_NOOP
,
"sync-noop"
,
NULL
,
NULL
)
TD_DEF_MSG_TYPE
(
TDMT_SYNC_UNKNOWN
,
"sync-unknown"
,
NULL
,
NULL
)
...
...
include/libs/sync/sync.h
浏览文件 @
4a988a7c
...
...
@@ -26,6 +26,7 @@ extern "C" {
extern
bool
gRaftDetailLog
;
#define SYNC_MAX_BATCH_SIZE 100
#define SYNC_INDEX_BEGIN 0
#define SYNC_INDEX_INVALID -1
#define SYNC_TERM_INVALID 0xFFFFFFFFFFFFFFFF
...
...
@@ -120,7 +121,7 @@ typedef struct SSyncFSM {
int32_t
(
*
FpGetSnapshot
)(
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
,
void
*
pReaderParam
,
void
**
ppReader
);
int32_t
(
*
FpGetSnapshotInfo
)(
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
);
int32_t
(
*
FpSnapshotStartRead
)(
struct
SSyncFSM
*
pFsm
,
void
**
ppReader
);
int32_t
(
*
FpSnapshotStartRead
)(
struct
SSyncFSM
*
pFsm
,
void
*
pReaderParam
,
void
*
*
ppReader
);
int32_t
(
*
FpSnapshotStopRead
)(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
);
int32_t
(
*
FpSnapshotDoRead
)(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
,
void
**
ppBuf
,
int32_t
*
len
);
...
...
@@ -164,6 +165,7 @@ typedef struct SSyncLogStore {
bool
(
*
syncLogIsEmpty
)(
struct
SSyncLogStore
*
pLogStore
);
int32_t
(
*
syncLogEntryCount
)(
struct
SSyncLogStore
*
pLogStore
);
int32_t
(
*
syncLogRestoreFromSnapshot
)(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
index
);
bool
(
*
syncLogExist
)(
struct
SSyncLogStore
*
pLogStore
,
SyncIndex
index
);
SyncIndex
(
*
syncLogWriteIndex
)(
struct
SSyncLogStore
*
pLogStore
);
SyncIndex
(
*
syncLogLastIndex
)(
struct
SSyncLogStore
*
pLogStore
);
...
...
@@ -179,6 +181,7 @@ typedef struct SSyncInfo {
bool
isStandBy
;
bool
snapshotEnable
;
SyncGroupId
vgId
;
int32_t
batchSize
;
SSyncCfg
syncCfg
;
char
path
[
TSDB_FILENAME_LEN
];
SWal
*
pWal
;
...
...
@@ -202,6 +205,7 @@ SyncGroupId syncGetVgId(int64_t rid);
void
syncGetEpSet
(
int64_t
rid
,
SEpSet
*
pEpSet
);
void
syncGetRetryEpSet
(
int64_t
rid
,
SEpSet
*
pEpSet
);
int32_t
syncPropose
(
int64_t
rid
,
SRpcMsg
*
pMsg
,
bool
isWeak
);
// int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize);
bool
syncEnvIsStart
();
const
char
*
syncStr
(
ESyncState
state
);
bool
syncIsRestoreFinish
(
int64_t
rid
);
...
...
include/libs/sync/syncTools.h
浏览文件 @
4a988a7c
...
...
@@ -219,6 +219,34 @@ void syncClientRequestPrint2(char* s, const SyncClientRequest* pMsg);
void
syncClientRequestLog
(
const
SyncClientRequest
*
pMsg
);
void
syncClientRequestLog2
(
char
*
s
,
const
SyncClientRequest
*
pMsg
);
// ---------------------------------------------
typedef
struct
SOffsetAndContLen
{
int32_t
offset
;
int32_t
contLen
;
}
SOffsetAndContLen
;
typedef
struct
SRaftMeta
{
uint64_t
seqNum
;
bool
isWeak
;
}
SRaftMeta
;
// block1:
// block2: SRaftMeta array
// block3: rpc msg array (with pCont)
typedef
struct
SyncClientRequestBatch
{
uint32_t
bytes
;
int32_t
vgId
;
uint32_t
msgType
;
// SyncClientRequestBatch msgType
uint32_t
dataCount
;
uint32_t
dataLen
;
// user RpcMsg.contLen
char
data
[];
// user RpcMsg.pCont
}
SyncClientRequestBatch
;
SyncClientRequestBatch
*
syncClientRequestBatchBuild
(
SRpcMsg
*
rpcMsgArr
,
SRaftMeta
*
raftArr
,
int32_t
arrSize
,
int32_t
vgId
);
void
syncClientRequestBatch2RpcMsg
(
const
SyncClientRequestBatch
*
pSyncMsg
,
SRpcMsg
*
pRpcMsg
);
// ---------------------------------------------
typedef
struct
SyncClientRequestReply
{
uint32_t
bytes
;
...
...
@@ -325,22 +353,53 @@ void syncAppendEntriesLog(const SyncAppendEntries* pMsg);
void
syncAppendEntriesLog2
(
char
*
s
,
const
SyncAppendEntries
*
pMsg
);
// ---------------------------------------------
// define ahead
/*
typedef struct SOffsetAndContLen {
int32_t offset;
int32_t contLen;
} SOffsetAndContLen;
*/
typedef
struct
SyncAppendEntriesBatch
{
uint32_t
bytes
;
int32_t
vgId
;
uint32_t
msgType
;
SRaftId
srcId
;
SRaftId
destId
;
// private data
SyncTerm
term
;
SyncIndex
prevLogIndex
;
SyncTerm
prevLogTerm
;
SyncIndex
commitIndex
;
SyncTerm
privateTerm
;
int32_t
dataCount
;
uint32_t
dataLen
;
char
data
[];
}
SyncAppendEntriesBatch
;
SyncAppendEntriesBatch
*
syncAppendEntriesBatchBuild
(
SRpcMsg
*
rpcMsgArr
,
int32_t
arrSize
,
int32_t
vgId
);
void
syncAppendEntriesBatchDestroy
(
SyncAppendEntriesBatch
*
pMsg
);
void
syncAppendEntriesBatchSerialize
(
const
SyncAppendEntriesBatch
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
);
void
syncAppendEntriesBatchDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncAppendEntriesBatch
*
pMsg
);
char
*
syncAppendEntriesBatchSerialize2
(
const
SyncAppendEntriesBatch
*
pMsg
,
uint32_t
*
len
);
SyncAppendEntriesBatch
*
syncAppendEntriesBatchDeserialize2
(
const
char
*
buf
,
uint32_t
len
);
void
syncAppendEntriesBatch2RpcMsg
(
const
SyncAppendEntriesBatch
*
pMsg
,
SRpcMsg
*
pRpcMsg
);
void
syncAppendEntriesBatchFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncAppendEntriesBatch
*
pMsg
);
SyncAppendEntriesBatch
*
syncAppendEntriesBatchFromRpcMsg2
(
const
SRpcMsg
*
pRpcMsg
);
cJSON
*
syncAppendEntriesBatch2Json
(
const
SyncAppendEntriesBatch
*
pMsg
);
char
*
syncAppendEntriesBatch2Str
(
const
SyncAppendEntriesBatch
*
pMsg
);
void
syncAppendEntriesBatch2RpcMsgArray
(
SyncAppendEntriesBatch
*
pSyncMsg
,
SRpcMsg
*
rpcMsgArr
,
int32_t
maxArrSize
,
int32_t
*
pRetArrSize
);
// for debug ----------------------
void
syncAppendEntriesBatchPrint
(
const
SyncAppendEntriesBatch
*
pMsg
);
void
syncAppendEntriesBatchPrint2
(
char
*
s
,
const
SyncAppendEntriesBatch
*
pMsg
);
void
syncAppendEntriesBatchLog
(
const
SyncAppendEntriesBatch
*
pMsg
);
void
syncAppendEntriesBatchLog2
(
char
*
s
,
const
SyncAppendEntriesBatch
*
pMsg
);
// ---------------------------------------------
typedef
struct
SyncAppendEntriesReply
{
uint32_t
bytes
;
...
...
@@ -542,6 +601,7 @@ int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg);
int32_t
syncNodeOnPingReplyCb
(
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
);
int32_t
syncNodeOnTimeoutCb
(
SSyncNode
*
ths
,
SyncTimeout
*
pMsg
);
int32_t
syncNodeOnClientRequestCb
(
SSyncNode
*
ths
,
SyncClientRequest
*
pMsg
,
SyncIndex
*
pRetIndex
);
int32_t
syncNodeOnClientRequestBatchCb
(
SSyncNode
*
ths
,
SyncClientRequestBatch
*
pMsg
);
int32_t
syncNodeOnRequestVoteCb
(
SSyncNode
*
ths
,
SyncRequestVote
*
pMsg
);
int32_t
syncNodeOnRequestVoteReplyCb
(
SSyncNode
*
ths
,
SyncRequestVoteReply
*
pMsg
);
int32_t
syncNodeOnAppendEntriesCb
(
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
);
...
...
include/util/taoserror.h
浏览文件 @
4a988a7c
...
...
@@ -423,6 +423,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_SYN_RECONFIG_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0910)
#define TSDB_CODE_SYN_PROPOSE_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0911)
#define TSDB_CODE_SYN_STANDBY_NOT_READY TAOS_DEF_ERROR_CODE(0, 0x0912)
#define TSDB_CODE_SYN_BATCH_ERROR TAOS_DEF_ERROR_CODE(0, 0x0913)
#define TSDB_CODE_SYN_INTERNAL_ERROR TAOS_DEF_ERROR_CODE(0, 0x09FF)
// tq
...
...
include/util/tarray.h
浏览文件 @
4a988a7c
...
...
@@ -139,7 +139,7 @@ void* taosArrayGetLast(const SArray* pArray);
* @param pArray
* @return
*/
int32
_t
taosArrayGetSize
(
const
SArray
*
pArray
);
size
_t
taosArrayGetSize
(
const
SArray
*
pArray
);
/**
* set the size of array
...
...
source/common/src/tdatablock.c
浏览文件 @
4a988a7c
...
...
@@ -1605,6 +1605,13 @@ static char* formatTimestamp(char* buf, int64_t val, int precision) {
return
buf
;
}
void
blockDebugShowDataBlock
(
SSDataBlock
*
pBlock
,
const
char
*
flag
)
{
SArray
*
dataBlocks
=
taosArrayInit
(
1
,
sizeof
(
SSDataBlock
));
taosArrayPush
(
dataBlocks
,
pBlock
);
blockDebugShowDataBlocks
(
dataBlocks
,
flag
);
taosArrayDestroy
(
dataBlocks
);
}
void
blockDebugShowDataBlocks
(
const
SArray
*
dataBlocks
,
const
char
*
flag
)
{
char
pBuf
[
128
]
=
{
0
};
int32_t
sz
=
taosArrayGetSize
(
dataBlocks
);
...
...
source/dnode/mnode/impl/src/mndSync.c
浏览文件 @
4a988a7c
...
...
@@ -117,7 +117,7 @@ void mndReConfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReConfigCbMeta cbM
}
}
int32_t
mndSnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
**
ppReader
)
{
int32_t
mndSnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pParam
,
void
*
*
ppReader
)
{
mDebug
(
"start to read snapshot from sdb"
);
SMnode
*
pMnode
=
pFsm
->
data
;
return
sdbStartRead
(
pMnode
->
pSdb
,
(
SSdbIter
**
)
ppReader
,
NULL
,
NULL
,
NULL
);
...
...
source/dnode/vnode/src/vnd/vnodeSync.c
浏览文件 @
4a988a7c
...
...
@@ -119,7 +119,7 @@ static int32_t vnodeProcessAlterReplicaReq(SVnode *pVnode, SRpcMsg *pMsg) {
}
void
vnodeProposeMsg
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SVnode
*
pVnode
=
pInfo
->
ahandle
;
SVnode
*
pVnode
=
pInfo
->
ahandle
;
int32_t
vgId
=
pVnode
->
config
.
vgId
;
int32_t
code
=
0
;
SRpcMsg
*
pMsg
=
NULL
;
...
...
@@ -199,7 +199,7 @@ void vnodeProposeMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
}
void
vnodeApplyMsg
(
SQueueInfo
*
pInfo
,
STaosQall
*
qall
,
int32_t
numOfMsgs
)
{
SVnode
*
pVnode
=
pInfo
->
ahandle
;
SVnode
*
pVnode
=
pInfo
->
ahandle
;
int32_t
vgId
=
pVnode
->
config
.
vgId
;
int32_t
code
=
0
;
SRpcMsg
*
pMsg
=
NULL
;
...
...
@@ -240,7 +240,7 @@ int32_t vnodeProcessSyncReq(SVnode *pVnode, SRpcMsg *pMsg, SRpcMsg **pRsp) {
STraceId
*
trace
=
&
pMsg
->
info
.
traceId
;
do
{
char
*
syncNodeStr
=
sync2SimpleStr
(
pVnode
->
sync
);
char
*
syncNodeStr
=
sync2SimpleStr
(
pVnode
->
sync
);
static
int64_t
vndTick
=
0
;
if
(
++
vndTick
%
10
==
1
)
{
vGTrace
(
"vgId:%d, sync trace msg:%s, %s"
,
syncGetVgId
(
pVnode
->
sync
),
TMSG_INFO
(
pMsg
->
msgType
),
syncNodeStr
);
...
...
@@ -375,7 +375,7 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
}
static
void
vnodeSyncCommitMsg
(
SSyncFSM
*
pFsm
,
const
SRpcMsg
*
pMsg
,
SFsmCbMeta
cbMeta
)
{
SVnode
*
pVnode
=
pFsm
->
data
;
SVnode
*
pVnode
=
pFsm
->
data
;
SSnapshot
snapshot
=
{
0
};
SyncIndex
beginIndex
=
SYNC_INDEX_INVALID
;
char
logBuf
[
256
]
=
{
0
};
...
...
@@ -409,7 +409,7 @@ static void vnodeSyncRollBackMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta
syncRpcMsgLog2
(
logBuf
,
(
SRpcMsg
*
)
pMsg
);
}
static
int32_t
vnodeSnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
**
ppReader
)
{
return
0
;
}
static
int32_t
vnodeSnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pParam
,
void
*
*
ppReader
)
{
return
0
;
}
static
int32_t
vnodeSnapshotStopRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
)
{
return
0
;
}
...
...
source/libs/executor/src/executorimpl.c
浏览文件 @
4a988a7c
...
...
@@ -3240,8 +3240,6 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
longjmp
(
pTaskInfo
->
env
,
code
);
}
doFilter
(
pProjectInfo
->
pFilterNode
,
pBlock
);
setInputDataBlock
(
pOperator
,
pSup
->
pCtx
,
pBlock
,
order
,
scanFlag
,
false
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pInfo
->
pRes
->
info
.
rows
+
pBlock
->
info
.
rows
);
...
...
@@ -3252,6 +3250,10 @@ static SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
int32_t
status
=
handleLimitOffset
(
pOperator
,
pBlock
);
// filter shall be applied after apply functions and limit/offset on the result
doFilter
(
pProjectInfo
->
pFilterNode
,
pInfo
->
pRes
);
if
(
status
==
PROJECT_RETRIEVE_CONTINUE
)
{
continue
;
}
else
if
(
status
==
PROJECT_RETRIEVE_DONE
)
{
...
...
source/libs/planner/src/planOptimizer.c
浏览文件 @
4a988a7c
...
...
@@ -476,10 +476,16 @@ static int32_t pushDownCondOptPushCondToScan(SOptimizeContext* pCxt, SScanLogicN
return
pushDownCondOptAppendCond
(
&
pScan
->
node
.
pConditions
,
pCond
);
}
static
int32_t
pushDownCondOptPushCondToProject
(
SOptimizeContext
*
pCxt
,
SProjectLogicNode
*
pProject
,
SNode
**
pCond
)
{
return
pushDownCondOptAppendCond
(
&
pProject
->
node
.
pConditions
,
pCond
);
}
static
int32_t
pushDownCondOptPushCondToChild
(
SOptimizeContext
*
pCxt
,
SLogicNode
*
pChild
,
SNode
**
pCond
)
{
switch
(
nodeType
(
pChild
))
{
case
QUERY_NODE_LOGIC_PLAN_SCAN
:
return
pushDownCondOptPushCondToScan
(
pCxt
,
(
SScanLogicNode
*
)
pChild
,
pCond
);
case
QUERY_NODE_LOGIC_PLAN_PROJECT
:
return
pushDownCondOptPushCondToProject
(
pCxt
,
(
SProjectLogicNode
*
)
pChild
,
pCond
);
default:
break
;
}
...
...
@@ -713,7 +719,8 @@ static int32_t pushDownCondOptDealAgg(SOptimizeContext* pCxt, SAggLogicNode* pAg
}
//TODO: remove it after full implementation of pushing down to child
if
(
1
!=
LIST_LENGTH
(
pAgg
->
node
.
pChildren
)
||
QUERY_NODE_LOGIC_PLAN_SCAN
!=
nodeType
(
nodesListGetNode
(
pAgg
->
node
.
pChildren
,
0
)))
{
QUERY_NODE_LOGIC_PLAN_SCAN
!=
nodeType
(
nodesListGetNode
(
pAgg
->
node
.
pChildren
,
0
))
&&
QUERY_NODE_LOGIC_PLAN_PROJECT
!=
nodeType
(
nodesListGetNode
(
pAgg
->
node
.
pChildren
,
0
)))
{
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/sync/inc/syncAppendEntries.h
浏览文件 @
4a988a7c
...
...
@@ -94,6 +94,7 @@ extern "C" {
//
int32_t
syncNodeOnAppendEntriesCb
(
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
);
int32_t
syncNodeOnAppendEntriesSnapshotCb
(
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
);
int32_t
syncNodeOnAppendEntriesSnapshot2Cb
(
SSyncNode
*
ths
,
SyncAppendEntriesBatch
*
pMsg
);
#ifdef __cplusplus
}
...
...
source/libs/sync/inc/syncAppendEntriesReply.h
浏览文件 @
4a988a7c
...
...
@@ -42,6 +42,12 @@ extern "C" {
//
int32_t
syncNodeOnAppendEntriesReplyCb
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
);
int32_t
syncNodeOnAppendEntriesReplySnapshotCb
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
);
int32_t
syncNodeOnAppendEntriesReplySnapshot2Cb
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
);
typedef
struct
SReaderParam
{
SyncIndex
start
;
SyncIndex
end
;
}
SReaderParam
;
#ifdef __cplusplus
}
...
...
source/libs/sync/inc/syncInt.h
浏览文件 @
4a988a7c
...
...
@@ -67,6 +67,7 @@ typedef struct SSyncNode {
char
path
[
TSDB_FILENAME_LEN
];
char
raftStorePath
[
TSDB_FILENAME_LEN
*
2
];
char
configPath
[
TSDB_FILENAME_LEN
*
2
];
int32_t
batchSize
;
// sync io
SWal
*
pWal
;
...
...
@@ -170,6 +171,7 @@ void syncNodeStart(SSyncNode* pSyncNode);
void
syncNodeStartStandBy
(
SSyncNode
*
pSyncNode
);
void
syncNodeClose
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodePropose
(
SSyncNode
*
pSyncNode
,
SRpcMsg
*
pMsg
,
bool
isWeak
);
int32_t
syncNodeProposeBatch
(
SSyncNode
*
pSyncNode
,
SRpcMsg
*
pMsgArr
,
bool
*
pIsWeakArr
,
int32_t
arrSize
);
// option
bool
syncNodeSnapshotEnable
(
SSyncNode
*
pSyncNode
);
...
...
source/libs/sync/inc/syncReplication.h
浏览文件 @
4a988a7c
...
...
@@ -53,8 +53,10 @@ extern "C" {
//
int32_t
syncNodeAppendEntriesPeers
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeAppendEntriesPeersSnapshot
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeAppendEntriesPeersSnapshot2
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeReplicate
(
SSyncNode
*
pSyncNode
);
int32_t
syncNodeAppendEntries
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncAppendEntries
*
pMsg
);
int32_t
syncNodeAppendEntriesBatch
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncAppendEntriesBatch
*
pMsg
);
#ifdef __cplusplus
}
...
...
source/libs/sync/inc/syncSnapshot.h
浏览文件 @
4a988a7c
...
...
@@ -40,8 +40,8 @@ typedef struct SSyncSnapshotSender {
bool
start
;
int32_t
seq
;
int32_t
ack
;
void
*
pReader
;
void
*
pCurrentBlock
;
void
*
pReader
;
void
*
pCurrentBlock
;
int32_t
blockLen
;
SSnapshot
snapshot
;
SSyncCfg
lastConfig
;
...
...
@@ -62,14 +62,14 @@ int32_t snapshotSend(SSyncSnapshotSender *pSender);
int32_t
snapshotReSend
(
SSyncSnapshotSender
*
pSender
);
cJSON
*
snapshotSender2Json
(
SSyncSnapshotSender
*
pSender
);
char
*
snapshotSender2Str
(
SSyncSnapshotSender
*
pSender
);
char
*
snapshotSender2SimpleStr
(
SSyncSnapshotSender
*
pSender
,
char
*
event
);
char
*
snapshotSender2Str
(
SSyncSnapshotSender
*
pSender
);
char
*
snapshotSender2SimpleStr
(
SSyncSnapshotSender
*
pSender
,
char
*
event
);
//---------------------------------------------------
typedef
struct
SSyncSnapshotReceiver
{
bool
start
;
int32_t
ack
;
void
*
pWriter
;
void
*
pWriter
;
SyncTerm
term
;
SyncTerm
privateTerm
;
SSnapshot
snapshot
;
...
...
@@ -85,8 +85,8 @@ bool snapshotReceiverIsStart(SSyncSnapshotReceiver *pReceiver);
void
snapshotReceiverStop
(
SSyncSnapshotReceiver
*
pReceiver
);
cJSON
*
snapshotReceiver2Json
(
SSyncSnapshotReceiver
*
pReceiver
);
char
*
snapshotReceiver2Str
(
SSyncSnapshotReceiver
*
pReceiver
);
char
*
snapshotReceiver2SimpleStr
(
SSyncSnapshotReceiver
*
pReceiver
,
char
*
event
);
char
*
snapshotReceiver2Str
(
SSyncSnapshotReceiver
*
pReceiver
);
char
*
snapshotReceiver2SimpleStr
(
SSyncSnapshotReceiver
*
pReceiver
,
char
*
event
);
//---------------------------------------------------
// on message
...
...
source/libs/sync/src/syncAppendEntries.c
浏览文件 @
4a988a7c
...
...
@@ -719,6 +719,8 @@ static bool syncNodeOnAppendEntriesLogOK(SSyncNode* pSyncNode, SyncAppendEntries
return
false
;
}
int32_t
syncNodeOnAppendEntriesSnapshot2Cb
(
SSyncNode
*
ths
,
SyncAppendEntriesBatch
*
pMsg
)
{
return
0
;
}
int32_t
syncNodeOnAppendEntriesSnapshotCb
(
SSyncNode
*
ths
,
SyncAppendEntries
*
pMsg
)
{
int32_t
ret
=
0
;
int32_t
code
=
0
;
...
...
source/libs/sync/src/syncAppendEntriesReply.c
浏览文件 @
4a988a7c
...
...
@@ -108,48 +108,82 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
return
ret
;
}
#if 0
int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg) {
// only start once
static
void
syncNodeStartSnapshot
(
SSyncNode
*
ths
,
SyncIndex
beginIndex
,
SyncIndex
endIndex
,
SyncTerm
lastApplyTerm
,
SyncAppendEntriesReply
*
pMsg
)
{
// get sender
SSyncSnapshotSender
*
pSender
=
syncNodeGetSnapshotSender
(
ths
,
&
(
pMsg
->
srcId
));
ASSERT
(
pSender
!=
NULL
);
SSnapshot
snapshot
=
{
.
data
=
NULL
,
.
lastApplyIndex
=
endIndex
,
.
lastApplyTerm
=
lastApplyTerm
,
.
lastConfigIndex
=
SYNC_INDEX_INVALID
};
void
*
pReader
=
NULL
;
SReaderParam
readerParam
=
{.
start
=
beginIndex
,
.
end
=
endIndex
};
ths
->
pFsm
->
FpSnapshotStartRead
(
ths
->
pFsm
,
&
readerParam
,
&
pReader
);
if
(
!
snapshotSenderIsStart
(
pSender
)
&&
pMsg
->
privateTerm
<
pSender
->
privateTerm
)
{
ASSERT
(
pReader
!=
NULL
);
snapshotSenderStart
(
pSender
,
snapshot
,
pReader
);
}
else
{
if
(
pReader
!=
NULL
)
{
ths
->
pFsm
->
FpSnapshotStopRead
(
ths
->
pFsm
,
pReader
);
}
}
}
int32_t
syncNodeOnAppendEntriesReplySnapshot2Cb
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
)
{
int32_t
ret
=
0
;
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "==syncNodeOnAppendEntriesReplyCb== term:%lu", ths->pRaftStore->currentTerm);
syncAppendEntriesReplyLog2(logBuf, pMsg);
// if already drop replica, do not process
if
(
!
syncNodeInRaftGroup
(
ths
,
&
(
pMsg
->
srcId
))
&&
!
ths
->
pRaftCfg
->
isStandBy
)
{
syncNodeEventLog
(
ths
,
"recv sync-append-entries-reply, maybe replica already dropped"
);
return
-
1
;
}
// drop stale response
if
(
pMsg
->
term
<
ths
->
pRaftStore
->
currentTerm
)
{
sTrace("DropStaleResponse, receive term:%" PRIu64 ", current term:%" PRIu64 "", pMsg->term,
ths->pRaftStore->currentTerm);
return ret;
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"recv sync-append-entries-reply, recv-term:%lu, drop stale response"
,
pMsg
->
term
);
syncNodeEventLog
(
ths
,
logBuf
);
return
-
1
;
}
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pNextIndex", ths->pNextIndex);
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== before pMatchIndex", ths->pMatchIndex);
// no need this code, because if I receive reply.term, then I must have sent for that term.
// if (pMsg->term > ths->pRaftStore->currentTerm) {
// syncNodeUpdateTerm(ths, pMsg->term);
// }
// error term
if
(
pMsg
->
term
>
ths
->
pRaftStore
->
currentTerm
)
{
char logBuf[128] = {0};
snprintf(logBuf, sizeof(logBuf), "syncNodeOnAppendEntriesReplyCb error term, receive:%lu current:%lu", pMsg->term,
ths->pRaftStore->currentTerm);
syncNodeLog2(logBuf, ths);
sError("%s", logBuf);
return ret;
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"recv sync-append-entries-reply, error term, recv-term:%lu"
,
pMsg
->
term
);
syncNodeErrorLog
(
ths
,
logBuf
);
return
-
1
;
}
ASSERT
(
pMsg
->
term
==
ths
->
pRaftStore
->
currentTerm
);
if
(
pMsg
->
success
)
{
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
syncIndexMgrSetIndex(ths->pNextIndex, &(pMsg->srcId), pMsg->matchIndex + 1)
;
SyncIndex
newNextIndex
=
pMsg
->
matchIndex
+
1
;
SyncIndex
newMatchIndex
=
pMsg
->
matchIndex
;
// matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
syncIndexMgrSetIndex(ths->pMatchIndex, &(pMsg->srcId), pMsg->matchIndex);
if
(
ths
->
pLogStore
->
syncLogExist
(
ths
->
pLogStore
,
newNextIndex
)
&&
ths
->
pLogStore
->
syncLogExist
(
ths
->
pLogStore
,
newNextIndex
-
1
))
{
// nextIndex' = [nextIndex EXCEPT ![i][j] = m.mmatchIndex + 1]
syncIndexMgrSetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
newNextIndex
);
// maybe commit
syncMaybeAdvanceCommitIndex(ths);
// matchIndex' = [matchIndex EXCEPT ![i][j] = m.mmatchIndex]
syncIndexMgrSetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
),
newMatchIndex
);
// maybe commit
if
(
ths
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
syncMaybeAdvanceCommitIndex
(
ths
);
}
}
else
{
// start snapshot <match+1, old snapshot.end>
SSnapshot
snapshot
;
ths
->
pFsm
->
FpGetSnapshotInfo
(
ths
->
pFsm
,
&
snapshot
);
syncNodeStartSnapshot
(
ths
,
newMatchIndex
+
1
,
snapshot
.
lastApplyIndex
,
snapshot
.
lastApplyTerm
,
pMsg
);
syncIndexMgrSetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
snapshot
.
lastApplyIndex
+
1
);
syncIndexMgrSetIndex
(
ths
->
pMatchIndex
,
&
(
pMsg
->
srcId
),
newMatchIndex
);
}
}
else
{
SyncIndex
nextIndex
=
syncIndexMgrGetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
));
...
...
@@ -157,18 +191,35 @@ int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* p
// notice! int64, uint64
if
(
nextIndex
>
SYNC_INDEX_BEGIN
)
{
--
nextIndex
;
if
(
ths
->
pLogStore
->
syncLogExist
(
ths
->
pLogStore
,
nextIndex
)
&&
ths
->
pLogStore
->
syncLogExist
(
ths
->
pLogStore
,
nextIndex
-
1
))
{
// do nothing
}
else
{
SSyncRaftEntry
*
pEntry
;
int32_t
code
=
ths
->
pLogStore
->
syncLogGetEntry
(
ths
->
pLogStore
,
nextIndex
,
&
pEntry
);
ASSERT
(
code
==
0
);
syncNodeStartSnapshot
(
ths
,
SYNC_INDEX_BEGIN
,
nextIndex
,
pEntry
->
term
,
pMsg
);
// get sender
SSyncSnapshotSender
*
pSender
=
syncNodeGetSnapshotSender
(
ths
,
&
(
pMsg
->
srcId
));
ASSERT
(
pSender
!=
NULL
);
SyncIndex
sentryIndex
=
pSender
->
snapshot
.
lastApplyIndex
+
1
;
// update nextIndex to sentryIndex
if
(
nextIndex
<=
sentryIndex
)
{
nextIndex
=
sentryIndex
;
}
}
}
else
{
nextIndex
=
SYNC_INDEX_BEGIN
;
}
syncIndexMgrSetIndex
(
ths
->
pNextIndex
,
&
(
pMsg
->
srcId
),
nextIndex
);
}
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pNextIndex", ths->pNextIndex);
syncIndexMgrLog2("==syncNodeOnAppendEntriesReplyCb== after pMatchIndex", ths->pMatchIndex);
return ret;
return
0
;
}
#endif
int32_t
syncNodeOnAppendEntriesReplySnapshotCb
(
SSyncNode
*
ths
,
SyncAppendEntriesReply
*
pMsg
)
{
int32_t
ret
=
0
;
...
...
source/libs/sync/src/syncMain.c
浏览文件 @
4a988a7c
...
...
@@ -50,7 +50,6 @@ static int32_t syncNodeAppendNoop(SSyncNode* ths);
// process message ----
int32_t
syncNodeOnPingCb
(
SSyncNode
*
ths
,
SyncPing
*
pMsg
);
int32_t
syncNodeOnPingReplyCb
(
SSyncNode
*
ths
,
SyncPingReply
*
pMsg
);
int32_t
syncNodeOnClientRequestCb
(
SSyncNode
*
ths
,
SyncClientRequest
*
pMsg
,
SyncIndex
*
pRetIndex
);
// life cycle
static
void
syncFreeNode
(
void
*
param
);
...
...
@@ -627,6 +626,94 @@ int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak) {
return
ret
;
}
int32_t
syncProposeBatch
(
int64_t
rid
,
SRpcMsg
*
pMsgArr
,
bool
*
pIsWeakArr
,
int32_t
arrSize
)
{
if
(
arrSize
<
0
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
int32_t
ret
=
0
;
SSyncNode
*
pSyncNode
=
taosAcquireRef
(
tsNodeRefId
,
rid
);
if
(
pSyncNode
==
NULL
)
{
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
ASSERT
(
rid
==
pSyncNode
->
rid
);
ret
=
syncNodeProposeBatch
(
pSyncNode
,
pMsgArr
,
pIsWeakArr
,
arrSize
);
taosReleaseRef
(
tsNodeRefId
,
pSyncNode
->
rid
);
return
ret
;
}
static
bool
syncNodeBatchOK
(
SRpcMsg
*
pMsgArr
,
int32_t
arrSize
)
{
for
(
int32_t
i
=
0
;
i
<
arrSize
;
++
i
)
{
if
(
pMsgArr
[
i
].
msgType
==
TDMT_SYNC_CONFIG_CHANGE
)
{
return
false
;
}
if
(
pMsgArr
[
i
].
msgType
==
TDMT_SYNC_CONFIG_CHANGE_FINISH
)
{
return
false
;
}
}
return
true
;
}
int32_t
syncNodeProposeBatch
(
SSyncNode
*
pSyncNode
,
SRpcMsg
*
pMsgArr
,
bool
*
pIsWeakArr
,
int32_t
arrSize
)
{
if
(
!
syncNodeBatchOK
(
pMsgArr
,
arrSize
))
{
syncNodeErrorLog
(
pSyncNode
,
"sync propose batch error"
);
terrno
=
TSDB_CODE_SYN_BATCH_ERROR
;
return
-
1
;
}
if
(
arrSize
>
SYNC_MAX_BATCH_SIZE
)
{
syncNodeErrorLog
(
pSyncNode
,
"sync propose match batch error"
);
terrno
=
TSDB_CODE_SYN_BATCH_ERROR
;
return
-
1
;
}
if
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
)
{
syncNodeErrorLog
(
pSyncNode
,
"sync propose not leader"
);
terrno
=
TSDB_CODE_SYN_NOT_LEADER
;
return
-
1
;
}
if
(
pSyncNode
->
changing
)
{
syncNodeErrorLog
(
pSyncNode
,
"sync propose not ready"
);
terrno
=
TSDB_CODE_SYN_PROPOSE_NOT_READY
;
return
-
1
;
}
SRaftMeta
raftArr
[
SYNC_MAX_BATCH_SIZE
];
for
(
int
i
=
0
;
i
<
arrSize
;
++
i
)
{
SRespStub
stub
;
stub
.
createTime
=
taosGetTimestampMs
();
stub
.
rpcMsg
=
pMsgArr
[
i
];
uint64_t
seqNum
=
syncRespMgrAdd
(
pSyncNode
->
pSyncRespMgr
,
&
stub
);
raftArr
[
i
].
isWeak
=
pIsWeakArr
[
i
];
raftArr
[
i
].
seqNum
=
seqNum
;
}
SyncClientRequestBatch
*
pSyncMsg
=
syncClientRequestBatchBuild
(
pMsgArr
,
raftArr
,
arrSize
,
pSyncNode
->
vgId
);
ASSERT
(
pSyncMsg
!=
NULL
);
SRpcMsg
rpcMsg
;
syncClientRequestBatch2RpcMsg
(
pSyncMsg
,
&
rpcMsg
);
taosMemoryFree
(
pSyncMsg
);
// only free msg body, do not free rpc msg content
if
(
pSyncNode
->
FpEqMsg
!=
NULL
&&
(
*
pSyncNode
->
FpEqMsg
)(
pSyncNode
->
msgcb
,
&
rpcMsg
)
==
0
)
{
// enqueue msg ok
}
else
{
sError
(
"enqueue msg error, FpEqMsg is NULL"
);
terrno
=
TSDB_CODE_SYN_INTERNAL_ERROR
;
return
-
1
;
}
return
0
;
}
int32_t
syncNodePropose
(
SSyncNode
*
pSyncNode
,
SRpcMsg
*
pMsg
,
bool
isWeak
)
{
int32_t
ret
=
0
;
...
...
@@ -2362,6 +2449,49 @@ int32_t syncNodeOnClientRequestCb(SSyncNode* ths, SyncClientRequest* pMsg, SyncI
return
ret
;
}
int32_t
syncNodeOnClientRequestBatchCb
(
SSyncNode
*
ths
,
SyncClientRequestBatch
*
pMsg
)
{
int32_t
code
=
0
;
if
(
ths
->
state
!=
TAOS_SYNC_STATE_LEADER
)
{
// call FpCommitCb, delete resp mgr
return
-
1
;
}
SyncIndex
index
=
ths
->
pLogStore
->
syncLogWriteIndex
(
ths
->
pLogStore
);
SyncTerm
term
=
ths
->
pRaftStore
->
currentTerm
;
int32_t
raftMetaArrayLen
=
sizeof
(
SRaftMeta
)
*
pMsg
->
dataCount
;
int32_t
rpcArrayLen
=
sizeof
(
SRpcMsg
)
*
pMsg
->
dataCount
;
SRaftMeta
*
raftMetaArr
=
(
SRaftMeta
*
)(
pMsg
->
data
);
SRpcMsg
*
msgArr
=
(
SRpcMsg
*
)((
char
*
)(
pMsg
->
data
)
+
raftMetaArrayLen
);
for
(
int32_t
i
=
0
;
i
<
pMsg
->
dataCount
;
++
i
)
{
SSyncRaftEntry
*
pEntry
=
syncEntryBuild
(
msgArr
[
i
].
contLen
);
ASSERT
(
pEntry
!=
NULL
);
pEntry
->
originalRpcType
=
msgArr
[
i
].
msgType
;
pEntry
->
seqNum
=
raftMetaArr
[
i
].
seqNum
;
pEntry
->
isWeak
=
raftMetaArr
[
i
].
isWeak
;
pEntry
->
term
=
term
;
pEntry
->
index
=
index
;
memcpy
(
pEntry
->
data
,
msgArr
[
i
].
pCont
,
msgArr
[
i
].
contLen
);
ASSERT
(
msgArr
[
i
].
contLen
==
pEntry
->
dataLen
);
code
=
ths
->
pLogStore
->
syncLogAppendEntry
(
ths
->
pLogStore
,
pEntry
);
if
(
code
!=
0
)
{
// del resp mgr, call FpCommitCb
ASSERT
(
0
);
return
-
1
;
}
}
// fsync once
SSyncLogStoreData
*
pData
=
ths
->
pLogStore
->
data
;
SWal
*
pWal
=
pData
->
pWal
;
walFsync
(
pWal
,
true
);
return
0
;
}
static
void
syncFreeNode
(
void
*
param
)
{
SSyncNode
*
pNode
=
param
;
// inner object already free
...
...
source/libs/sync/src/syncMessage.c
浏览文件 @
4a988a7c
...
...
@@ -591,7 +591,7 @@ void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLe
void
syncPingReplyDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncPingReply
*
pMsg
)
{
memcpy
(
pMsg
,
buf
,
len
);
ASSERT
(
len
==
pMsg
->
bytes
);
ASSERT
(
pMsg
->
bytes
==
sizeof
(
SyncPing
)
+
pMsg
->
dataLen
);
ASSERT
(
pMsg
->
bytes
==
sizeof
(
SyncPing
Reply
)
+
pMsg
->
dataLen
);
}
char
*
syncPingReplySerialize2
(
const
SyncPingReply
*
pMsg
,
uint32_t
*
len
)
{
...
...
@@ -956,6 +956,48 @@ void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg) {
}
}
// ---- message process SyncClientRequestBatch----
// block1:
// block2: SRaftMeta array
// block3: rpc msg array (with pCont)
SyncClientRequestBatch
*
syncClientRequestBatchBuild
(
SRpcMsg
*
rpcMsgArr
,
SRaftMeta
*
raftArr
,
int32_t
arrSize
,
int32_t
vgId
)
{
ASSERT
(
rpcMsgArr
!=
NULL
);
ASSERT
(
arrSize
>
0
);
int32_t
dataLen
=
0
;
int32_t
raftMetaArrayLen
=
sizeof
(
SRaftMeta
)
*
arrSize
;
int32_t
rpcArrayLen
=
sizeof
(
SRpcMsg
)
*
arrSize
;
dataLen
+=
(
raftMetaArrayLen
+
rpcArrayLen
);
uint32_t
bytes
=
sizeof
(
SyncClientRequestBatch
)
+
dataLen
;
SyncClientRequestBatch
*
pMsg
=
taosMemoryMalloc
(
bytes
);
memset
(
pMsg
,
0
,
bytes
);
pMsg
->
bytes
=
bytes
;
pMsg
->
vgId
=
vgId
;
pMsg
->
msgType
=
TDMT_SYNC_CLIENT_REQUEST_BATCH
;
pMsg
->
dataCount
=
arrSize
;
pMsg
->
dataLen
=
dataLen
;
SRaftMeta
*
raftMetaArr
=
(
SRaftMeta
*
)(
pMsg
->
data
);
SRpcMsg
*
msgArr
=
(
SRpcMsg
*
)((
char
*
)(
pMsg
->
data
)
+
raftMetaArrayLen
);
for
(
int
i
=
0
;
i
<
arrSize
;
++
i
)
{
// init raftMetaArr
raftMetaArr
[
i
].
isWeak
=
raftArr
[
i
].
isWeak
;
raftMetaArr
[
i
].
seqNum
=
raftArr
[
i
].
seqNum
;
// init msgArr
msgArr
[
i
]
=
rpcMsgArr
[
i
];
}
return
pMsg
;
}
void
syncClientRequestBatch2RpcMsg
(
const
SyncClientRequestBatch
*
pSyncMsg
,
SRpcMsg
*
pRpcMsg
)
{}
// ---- message process SyncRequestVote----
SyncRequestVote
*
syncRequestVoteBuild
(
int32_t
vgId
)
{
uint32_t
bytes
=
sizeof
(
SyncRequestVote
);
...
...
@@ -1426,6 +1468,279 @@ void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg) {
}
}
// ---- message process SyncAppendEntriesBatch----
// block1: SOffsetAndContLen
// block2: SOffsetAndContLen Array
// block3: SRpcMsg Array
// block4: SRpcMsg pCont Array
SyncAppendEntriesBatch
*
syncAppendEntriesBatchBuild
(
SRpcMsg
*
rpcMsgArr
,
int32_t
arrSize
,
int32_t
vgId
)
{
ASSERT
(
rpcMsgArr
!=
NULL
);
ASSERT
(
arrSize
>
0
);
int32_t
dataLen
=
0
;
int32_t
metaArrayLen
=
sizeof
(
SOffsetAndContLen
)
*
arrSize
;
// <offset, contLen>
int32_t
rpcArrayLen
=
sizeof
(
SRpcMsg
)
*
arrSize
;
// SRpcMsg
int32_t
contArrayLen
=
0
;
for
(
int
i
=
0
;
i
<
arrSize
;
++
i
)
{
// SRpcMsg pCont
contArrayLen
+=
rpcMsgArr
[
i
].
contLen
;
}
dataLen
+=
(
metaArrayLen
+
rpcArrayLen
+
contArrayLen
);
uint32_t
bytes
=
sizeof
(
SyncAppendEntriesBatch
)
+
dataLen
;
SyncAppendEntriesBatch
*
pMsg
=
taosMemoryMalloc
(
bytes
);
memset
(
pMsg
,
0
,
bytes
);
pMsg
->
bytes
=
bytes
;
pMsg
->
vgId
=
vgId
;
pMsg
->
msgType
=
TDMT_SYNC_APPEND_ENTRIES_BATCH
;
pMsg
->
dataCount
=
arrSize
;
pMsg
->
dataLen
=
dataLen
;
SOffsetAndContLen
*
metaArr
=
(
SOffsetAndContLen
*
)(
pMsg
->
data
);
SRpcMsg
*
msgArr
=
(
SRpcMsg
*
)((
char
*
)(
pMsg
->
data
)
+
metaArrayLen
);
char
*
pData
=
pMsg
->
data
;
for
(
int
i
=
0
;
i
<
arrSize
;
++
i
)
{
// init <offset, contLen>
if
(
i
==
0
)
{
metaArr
[
i
].
offset
=
metaArrayLen
+
rpcArrayLen
;
metaArr
[
i
].
contLen
=
rpcMsgArr
[
i
].
contLen
;
}
else
{
metaArr
[
i
].
offset
=
metaArr
[
i
-
1
].
offset
+
metaArr
[
i
-
1
].
contLen
;
metaArr
[
i
].
contLen
=
rpcMsgArr
[
i
].
contLen
;
}
// init msgArr
msgArr
[
i
]
=
rpcMsgArr
[
i
];
// init data
ASSERT
(
rpcMsgArr
[
i
].
contLen
==
metaArr
[
i
].
contLen
);
memcpy
(
pData
+
metaArr
[
i
].
offset
,
rpcMsgArr
[
i
].
pCont
,
rpcMsgArr
[
i
].
contLen
);
}
return
pMsg
;
}
void
syncAppendEntriesBatchDestroy
(
SyncAppendEntriesBatch
*
pMsg
)
{
if
(
pMsg
!=
NULL
)
{
taosMemoryFree
(
pMsg
);
}
}
void
syncAppendEntriesBatchSerialize
(
const
SyncAppendEntriesBatch
*
pMsg
,
char
*
buf
,
uint32_t
bufLen
)
{
ASSERT
(
pMsg
->
bytes
<=
bufLen
);
memcpy
(
buf
,
pMsg
,
pMsg
->
bytes
);
}
void
syncAppendEntriesBatchDeserialize
(
const
char
*
buf
,
uint32_t
len
,
SyncAppendEntriesBatch
*
pMsg
)
{
memcpy
(
pMsg
,
buf
,
len
);
ASSERT
(
len
==
pMsg
->
bytes
);
ASSERT
(
pMsg
->
bytes
==
sizeof
(
SyncAppendEntriesBatch
)
+
pMsg
->
dataLen
);
}
char
*
syncAppendEntriesBatchSerialize2
(
const
SyncAppendEntriesBatch
*
pMsg
,
uint32_t
*
len
)
{
char
*
buf
=
taosMemoryMalloc
(
pMsg
->
bytes
);
ASSERT
(
buf
!=
NULL
);
syncAppendEntriesBatchSerialize
(
pMsg
,
buf
,
pMsg
->
bytes
);
if
(
len
!=
NULL
)
{
*
len
=
pMsg
->
bytes
;
}
return
buf
;
}
SyncAppendEntriesBatch
*
syncAppendEntriesBatchDeserialize2
(
const
char
*
buf
,
uint32_t
len
)
{
uint32_t
bytes
=
*
((
uint32_t
*
)
buf
);
SyncAppendEntriesBatch
*
pMsg
=
taosMemoryMalloc
(
bytes
);
ASSERT
(
pMsg
!=
NULL
);
syncAppendEntriesBatchDeserialize
(
buf
,
len
,
pMsg
);
ASSERT
(
len
==
pMsg
->
bytes
);
return
pMsg
;
}
void
syncAppendEntriesBatch2RpcMsg
(
const
SyncAppendEntriesBatch
*
pMsg
,
SRpcMsg
*
pRpcMsg
)
{
memset
(
pRpcMsg
,
0
,
sizeof
(
*
pRpcMsg
));
pRpcMsg
->
msgType
=
pMsg
->
msgType
;
pRpcMsg
->
contLen
=
pMsg
->
bytes
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
syncAppendEntriesBatchSerialize
(
pMsg
,
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
}
void
syncAppendEntriesBatchFromRpcMsg
(
const
SRpcMsg
*
pRpcMsg
,
SyncAppendEntriesBatch
*
pMsg
)
{
syncAppendEntriesBatchDeserialize
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
pMsg
);
}
SyncAppendEntriesBatch
*
syncAppendEntriesBatchFromRpcMsg2
(
const
SRpcMsg
*
pRpcMsg
)
{
SyncAppendEntriesBatch
*
pMsg
=
syncAppendEntriesBatchDeserialize2
(
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
);
ASSERT
(
pMsg
!=
NULL
);
return
pMsg
;
}
cJSON
*
syncAppendEntriesBatch2Json
(
const
SyncAppendEntriesBatch
*
pMsg
)
{
char
u64buf
[
128
]
=
{
0
};
cJSON
*
pRoot
=
cJSON_CreateObject
();
if
(
pMsg
!=
NULL
)
{
cJSON_AddNumberToObject
(
pRoot
,
"bytes"
,
pMsg
->
bytes
);
cJSON_AddNumberToObject
(
pRoot
,
"vgId"
,
pMsg
->
vgId
);
cJSON_AddNumberToObject
(
pRoot
,
"msgType"
,
pMsg
->
msgType
);
cJSON
*
pSrcId
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
srcId
.
addr
);
cJSON_AddStringToObject
(
pSrcId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
srcId
.
addr
;
cJSON
*
pTmp
=
pSrcId
;
char
host
[
128
]
=
{
0
};
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pSrcId
,
"vgId"
,
pMsg
->
srcId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"srcId"
,
pSrcId
);
cJSON
*
pDestId
=
cJSON_CreateObject
();
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
destId
.
addr
);
cJSON_AddStringToObject
(
pDestId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pMsg
->
destId
.
addr
;
cJSON
*
pTmp
=
pDestId
;
char
host
[
128
]
=
{
0
};
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
cJSON_AddStringToObject
(
pTmp
,
"addr_host"
,
host
);
cJSON_AddNumberToObject
(
pTmp
,
"addr_port"
,
port
);
}
cJSON_AddNumberToObject
(
pDestId
,
"vgId"
,
pMsg
->
destId
.
vgId
);
cJSON_AddItemToObject
(
pRoot
,
"destId"
,
pDestId
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
term
);
cJSON_AddStringToObject
(
pRoot
,
"term"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%ld"
,
pMsg
->
prevLogIndex
);
cJSON_AddStringToObject
(
pRoot
,
"prevLogIndex"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
prevLogTerm
);
cJSON_AddStringToObject
(
pRoot
,
"prevLogTerm"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%ld"
,
pMsg
->
commitIndex
);
cJSON_AddStringToObject
(
pRoot
,
"commitIndex"
,
u64buf
);
snprintf
(
u64buf
,
sizeof
(
u64buf
),
"%lu"
,
pMsg
->
privateTerm
);
cJSON_AddStringToObject
(
pRoot
,
"privateTerm"
,
u64buf
);
cJSON_AddNumberToObject
(
pRoot
,
"dataCount"
,
pMsg
->
dataCount
);
cJSON_AddNumberToObject
(
pRoot
,
"dataLen"
,
pMsg
->
dataLen
);
int32_t
metaArrayLen
=
sizeof
(
SOffsetAndContLen
)
*
pMsg
->
dataCount
;
// <offset, contLen>
int32_t
rpcArrayLen
=
sizeof
(
SRpcMsg
)
*
pMsg
->
dataCount
;
// SRpcMsg
int32_t
contArrayLen
=
pMsg
->
dataLen
-
metaArrayLen
-
rpcArrayLen
;
cJSON_AddNumberToObject
(
pRoot
,
"metaArrayLen"
,
metaArrayLen
);
cJSON_AddNumberToObject
(
pRoot
,
"rpcArrayLen"
,
rpcArrayLen
);
cJSON_AddNumberToObject
(
pRoot
,
"contArrayLen"
,
contArrayLen
);
SOffsetAndContLen
*
metaArr
=
(
SOffsetAndContLen
*
)(
pMsg
->
data
);
SRpcMsg
*
msgArr
=
(
SRpcMsg
*
)(
pMsg
->
data
+
metaArrayLen
);
void
*
pData
=
(
void
*
)(
pMsg
->
data
+
metaArrayLen
+
rpcArrayLen
);
cJSON
*
pMetaArr
=
cJSON_CreateArray
();
cJSON_AddItemToObject
(
pRoot
,
"metaArr"
,
pMetaArr
);
for
(
int
i
=
0
;
i
<
pMsg
->
dataCount
;
++
i
)
{
cJSON
*
pMeta
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pMeta
,
"offset"
,
metaArr
[
i
].
offset
);
cJSON_AddNumberToObject
(
pMeta
,
"contLen"
,
metaArr
[
i
].
contLen
);
cJSON_AddItemToArray
(
pMetaArr
,
pMeta
);
}
cJSON
*
pMsgArr
=
cJSON_CreateArray
();
cJSON_AddItemToObject
(
pRoot
,
"msgArr"
,
pMsgArr
);
for
(
int
i
=
0
;
i
<
pMsg
->
dataCount
;
++
i
)
{
cJSON
*
pRpcMsgJson
=
cJSON_CreateObject
();
cJSON_AddNumberToObject
(
pRpcMsgJson
,
"code"
,
msgArr
[
i
].
code
);
cJSON_AddNumberToObject
(
pRpcMsgJson
,
"contLen"
,
msgArr
[
i
].
contLen
);
cJSON_AddNumberToObject
(
pRpcMsgJson
,
"msgType"
,
msgArr
[
i
].
msgType
);
cJSON_AddItemToArray
(
pMsgArr
,
pRpcMsgJson
);
}
char
*
s
;
s
=
syncUtilprintBin
((
char
*
)(
pMsg
->
data
),
pMsg
->
dataLen
);
cJSON_AddStringToObject
(
pRoot
,
"data"
,
s
);
taosMemoryFree
(
s
);
s
=
syncUtilprintBin2
((
char
*
)(
pMsg
->
data
),
pMsg
->
dataLen
);
cJSON_AddStringToObject
(
pRoot
,
"data2"
,
s
);
taosMemoryFree
(
s
);
}
cJSON
*
pJson
=
cJSON_CreateObject
();
cJSON_AddItemToObject
(
pJson
,
"SyncAppendEntriesBatch"
,
pRoot
);
return
pJson
;
}
char
*
syncAppendEntriesBatch2Str
(
const
SyncAppendEntriesBatch
*
pMsg
)
{
cJSON
*
pJson
=
syncAppendEntriesBatch2Json
(
pMsg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
void
syncAppendEntriesBatch2RpcMsgArray
(
SyncAppendEntriesBatch
*
pSyncMsg
,
SRpcMsg
*
rpcMsgArr
,
int32_t
maxArrSize
,
int32_t
*
pRetArrSize
)
{
if
(
pRetArrSize
!=
NULL
)
{
*
pRetArrSize
=
pSyncMsg
->
dataCount
;
}
int32_t
arrSize
=
pSyncMsg
->
dataCount
;
if
(
arrSize
>
maxArrSize
)
{
arrSize
=
maxArrSize
;
}
int32_t
metaArrayLen
=
sizeof
(
SOffsetAndContLen
)
*
pSyncMsg
->
dataCount
;
// <offset, contLen>
int32_t
rpcArrayLen
=
sizeof
(
SRpcMsg
)
*
pSyncMsg
->
dataCount
;
// SRpcMsg
int32_t
contArrayLen
=
pSyncMsg
->
dataLen
-
metaArrayLen
-
rpcArrayLen
;
SOffsetAndContLen
*
metaArr
=
(
SOffsetAndContLen
*
)(
pSyncMsg
->
data
);
SRpcMsg
*
msgArr
=
(
SRpcMsg
*
)(
pSyncMsg
->
data
+
metaArrayLen
);
void
*
pData
=
pSyncMsg
->
data
+
metaArrayLen
+
rpcArrayLen
;
for
(
int
i
=
0
;
i
<
arrSize
;
++
i
)
{
rpcMsgArr
[
i
]
=
msgArr
[
i
];
rpcMsgArr
[
i
].
pCont
=
rpcMallocCont
(
msgArr
[
i
].
contLen
);
void
*
pRpcCont
=
pSyncMsg
->
data
+
metaArr
[
i
].
offset
;
memcpy
(
rpcMsgArr
[
i
].
pCont
,
pRpcCont
,
rpcMsgArr
[
i
].
contLen
);
}
}
// for debug ----------------------
void
syncAppendEntriesBatchPrint
(
const
SyncAppendEntriesBatch
*
pMsg
)
{
char
*
serialized
=
syncAppendEntriesBatch2Str
(
pMsg
);
printf
(
"syncAppendEntriesBatchPrint | len:%lu | %s
\n
"
,
strlen
(
serialized
),
serialized
);
fflush
(
NULL
);
taosMemoryFree
(
serialized
);
}
void
syncAppendEntriesBatchPrint2
(
char
*
s
,
const
SyncAppendEntriesBatch
*
pMsg
)
{
char
*
serialized
=
syncAppendEntriesBatch2Str
(
pMsg
);
printf
(
"syncAppendEntriesBatchPrint2 | len:%lu | %s | %s
\n
"
,
strlen
(
serialized
),
s
,
serialized
);
fflush
(
NULL
);
taosMemoryFree
(
serialized
);
}
void
syncAppendEntriesBatchLog
(
const
SyncAppendEntriesBatch
*
pMsg
)
{
char
*
serialized
=
syncAppendEntriesBatch2Str
(
pMsg
);
sTrace
(
"syncAppendEntriesBatchLog | len:%lu | %s"
,
strlen
(
serialized
),
serialized
);
taosMemoryFree
(
serialized
);
}
void
syncAppendEntriesBatchLog2
(
char
*
s
,
const
SyncAppendEntriesBatch
*
pMsg
)
{
if
(
gRaftDetailLog
)
{
char
*
serialized
=
syncAppendEntriesBatch2Str
(
pMsg
);
sTraceLong
(
"syncAppendEntriesBatchLog2 | len:%lu | %s | %s"
,
strlen
(
serialized
),
s
,
serialized
);
taosMemoryFree
(
serialized
);
}
}
// ---- message process SyncAppendEntriesReply----
SyncAppendEntriesReply
*
syncAppendEntriesReplyBuild
(
int32_t
vgId
)
{
uint32_t
bytes
=
sizeof
(
SyncAppendEntriesReply
);
...
...
source/libs/sync/src/syncRaftCfg.c
浏览文件 @
4a988a7c
...
...
@@ -101,7 +101,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) {
char
*
syncCfg2Str
(
SSyncCfg
*
pSyncCfg
)
{
cJSON
*
pJson
=
syncCfg2Json
(
pSyncCfg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
...
...
@@ -109,7 +109,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) {
char
*
syncCfg2SimpleStr
(
SSyncCfg
*
pSyncCfg
)
{
if
(
pSyncCfg
!=
NULL
)
{
int32_t
len
=
512
;
char
*
s
=
taosMemoryMalloc
(
len
);
char
*
s
=
taosMemoryMalloc
(
len
);
memset
(
s
,
0
,
len
);
snprintf
(
s
,
len
,
"{replica-num:%d, my-index:%d, "
,
pSyncCfg
->
replicaNum
,
pSyncCfg
->
myIndex
);
...
...
@@ -205,7 +205,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) {
char
*
raftCfg2Str
(
SRaftCfg
*
pRaftCfg
)
{
cJSON
*
pJson
=
raftCfg2Json
(
pRaftCfg
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
...
...
@@ -280,7 +280,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) {
(
pRaftCfg
->
configIndexArr
)[
i
]
=
atoll
(
pIndex
->
valuestring
);
}
cJSON
*
pJsonSyncCfg
=
cJSON_GetObjectItem
(
pJson
,
"SSyncCfg"
);
cJSON
*
pJsonSyncCfg
=
cJSON_GetObjectItem
(
pJson
,
"SSyncCfg"
);
int32_t
code
=
syncCfgFromJson
(
pJsonSyncCfg
,
&
(
pRaftCfg
->
cfg
));
ASSERT
(
code
==
0
);
...
...
source/libs/sync/src/syncReplication.c
浏览文件 @
4a988a7c
...
...
@@ -116,6 +116,73 @@ int32_t syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {
return
ret
;
}
int32_t
syncNodeAppendEntriesPeersSnapshot2
(
SSyncNode
*
pSyncNode
)
{
if
(
pSyncNode
->
state
!=
TAOS_SYNC_STATE_LEADER
)
{
return
-
1
;
}
int32_t
ret
=
0
;
for
(
int
i
=
0
;
i
<
pSyncNode
->
peersNum
;
++
i
)
{
SRaftId
*
pDestId
=
&
(
pSyncNode
->
peersId
[
i
]);
// next index
SyncIndex
nextIndex
=
syncIndexMgrGetIndex
(
pSyncNode
->
pNextIndex
,
pDestId
);
// pre index, pre term
SyncIndex
preLogIndex
=
syncNodeGetPreIndex
(
pSyncNode
,
nextIndex
);
SyncTerm
preLogTerm
=
syncNodeGetPreTerm
(
pSyncNode
,
nextIndex
);
if
(
preLogTerm
==
SYNC_TERM_INVALID
)
{
SSyncSnapshotSender
*
pSender
=
syncNodeGetSnapshotSender
(
pSyncNode
,
pDestId
);
ASSERT
(
pSender
!=
NULL
);
ASSERT
(
!
snapshotSenderIsStart
(
pSender
));
SyncIndex
newNextIndex
=
syncNodeGetLastIndex
(
pSyncNode
)
+
1
;
syncIndexMgrSetIndex
(
pSyncNode
->
pNextIndex
,
pDestId
,
newNextIndex
);
syncIndexMgrSetIndex
(
pSyncNode
->
pMatchIndex
,
pDestId
,
SYNC_INDEX_INVALID
);
sError
(
"vgId:%d sync get pre term error, nextIndex:%ld, update next-index:%ld, match-index:%d, raftid:%ld"
,
pSyncNode
->
vgId
,
nextIndex
,
newNextIndex
,
SYNC_INDEX_INVALID
,
pDestId
->
addr
);
return
-
1
;
}
SRpcMsg
rpcMsgArr
[
SYNC_MAX_BATCH_SIZE
];
memset
(
rpcMsgArr
,
0
,
sizeof
(
rpcMsgArr
));
int32_t
getCount
=
0
;
for
(
int32_t
i
=
0
;
i
<
pSyncNode
->
batchSize
;
++
i
)
{
SSyncRaftEntry
*
pEntry
;
int32_t
code
=
pSyncNode
->
pLogStore
->
syncLogGetEntry
(
pSyncNode
->
pLogStore
,
nextIndex
,
&
pEntry
);
if
(
code
==
0
)
{
ASSERT
(
pEntry
!=
NULL
);
// get rpc msg [i] from entry
syncEntryDestory
(
pEntry
);
getCount
++
;
}
else
{
break
;
}
}
SyncAppendEntriesBatch
*
pMsg
=
syncAppendEntriesBatchBuild
(
rpcMsgArr
,
getCount
,
pSyncNode
->
vgId
);
ASSERT
(
pMsg
!=
NULL
);
// prepare msg
pMsg
->
srcId
=
pSyncNode
->
myRaftId
;
pMsg
->
destId
=
*
pDestId
;
pMsg
->
term
=
pSyncNode
->
pRaftStore
->
currentTerm
;
pMsg
->
prevLogIndex
=
preLogIndex
;
pMsg
->
prevLogTerm
=
preLogTerm
;
pMsg
->
commitIndex
=
pSyncNode
->
commitIndex
;
pMsg
->
privateTerm
=
0
;
pMsg
->
dataCount
=
getCount
;
// send msg
syncNodeAppendEntriesBatch
(
pSyncNode
,
pDestId
,
pMsg
);
syncAppendEntriesBatchDestroy
(
pMsg
);
}
return
0
;
}
int32_t
syncNodeAppendEntriesPeersSnapshot
(
SSyncNode
*
pSyncNode
)
{
ASSERT
(
pSyncNode
->
state
==
TAOS_SYNC_STATE_LEADER
);
...
...
@@ -234,4 +301,24 @@ int32_t syncNodeAppendEntries(SSyncNode* pSyncNode, const SRaftId* destRaftId, c
syncAppendEntries2RpcMsg
(
pMsg
,
&
rpcMsg
);
syncNodeSendMsgById
(
destRaftId
,
pSyncNode
,
&
rpcMsg
);
return
ret
;
}
int32_t
syncNodeAppendEntriesBatch
(
SSyncNode
*
pSyncNode
,
const
SRaftId
*
destRaftId
,
const
SyncAppendEntriesBatch
*
pMsg
)
{
do
{
char
host
[
128
];
uint16_t
port
;
syncUtilU642Addr
(
destRaftId
->
addr
,
host
,
sizeof
(
host
),
&
port
);
sDebug
(
"vgId:%d, send sync-append-entries-batch to %s:%d, {term:%lu, pre-index:%ld, pre-term:%lu, pterm:%lu, "
"commit:%ld, "
"datalen:%d, dataCount:%d}"
,
pSyncNode
->
vgId
,
host
,
port
,
pMsg
->
term
,
pMsg
->
prevLogIndex
,
pMsg
->
prevLogTerm
,
pMsg
->
privateTerm
,
pMsg
->
commitIndex
,
pMsg
->
dataLen
,
pMsg
->
dataCount
);
}
while
(
0
);
SRpcMsg
rpcMsg
;
syncAppendEntriesBatch2RpcMsg
(
pMsg
,
&
rpcMsg
);
syncNodeSendMsgById
(
destRaftId
,
pSyncNode
,
&
rpcMsg
);
return
0
;
}
\ No newline at end of file
source/libs/sync/src/syncSnapshot.c
浏览文件 @
4a988a7c
...
...
@@ -349,14 +349,14 @@ cJSON *snapshotSender2Json(SSyncSnapshotSender *pSender) {
char
*
snapshotSender2Str
(
SSyncSnapshotSender
*
pSender
)
{
cJSON
*
pJson
=
snapshotSender2Json
(
pSender
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
char
*
snapshotSender2SimpleStr
(
SSyncSnapshotSender
*
pSender
,
char
*
event
)
{
int32_t
len
=
256
;
char
*
s
=
taosMemoryMalloc
(
len
);
char
*
s
=
taosMemoryMalloc
(
len
);
SRaftId
destId
=
pSender
->
pSyncNode
->
replicasId
[
pSender
->
replicaIndex
];
char
host
[
64
];
...
...
@@ -604,7 +604,7 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
cJSON_AddStringToObject
(
pFromId
,
"addr"
,
u64buf
);
{
uint64_t
u64
=
pReceiver
->
fromId
.
addr
;
cJSON
*
pTmp
=
pFromId
;
cJSON
*
pTmp
=
pFromId
;
char
host
[
128
]
=
{
0
};
uint16_t
port
;
syncUtilU642Addr
(
u64
,
host
,
sizeof
(
host
),
&
port
);
...
...
@@ -637,14 +637,14 @@ cJSON *snapshotReceiver2Json(SSyncSnapshotReceiver *pReceiver) {
char
*
snapshotReceiver2Str
(
SSyncSnapshotReceiver
*
pReceiver
)
{
cJSON
*
pJson
=
snapshotReceiver2Json
(
pReceiver
);
char
*
serialized
=
cJSON_Print
(
pJson
);
char
*
serialized
=
cJSON_Print
(
pJson
);
cJSON_Delete
(
pJson
);
return
serialized
;
}
char
*
snapshotReceiver2SimpleStr
(
SSyncSnapshotReceiver
*
pReceiver
,
char
*
event
)
{
int32_t
len
=
256
;
char
*
s
=
taosMemoryMalloc
(
len
);
char
*
s
=
taosMemoryMalloc
(
len
);
SRaftId
fromId
=
pReceiver
->
fromId
;
char
host
[
128
];
...
...
source/libs/sync/test/CMakeLists.txt
浏览文件 @
4a988a7c
...
...
@@ -21,6 +21,7 @@ add_executable(syncEntryCacheTest "")
add_executable
(
syncRequestVoteTest
""
)
add_executable
(
syncRequestVoteReplyTest
""
)
add_executable
(
syncAppendEntriesTest
""
)
add_executable
(
syncAppendEntriesBatchTest
""
)
add_executable
(
syncAppendEntriesReplyTest
""
)
add_executable
(
syncClientRequestTest
""
)
add_executable
(
syncTimeoutTest
""
)
...
...
@@ -146,6 +147,10 @@ target_sources(syncAppendEntriesTest
PRIVATE
"syncAppendEntriesTest.cpp"
)
target_sources
(
syncAppendEntriesBatchTest
PRIVATE
"syncAppendEntriesBatchTest.cpp"
)
target_sources
(
syncAppendEntriesReplyTest
PRIVATE
"syncAppendEntriesReplyTest.cpp"
...
...
@@ -387,6 +392,11 @@ target_include_directories(syncAppendEntriesTest
"
${
TD_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncAppendEntriesBatchTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/sync"
"
${
CMAKE_CURRENT_SOURCE_DIR
}
/../inc"
)
target_include_directories
(
syncAppendEntriesReplyTest
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/sync"
...
...
@@ -636,6 +646,10 @@ target_link_libraries(syncAppendEntriesTest
sync
gtest_main
)
target_link_libraries
(
syncAppendEntriesBatchTest
sync
gtest_main
)
target_link_libraries
(
syncAppendEntriesReplyTest
sync
gtest_main
...
...
source/libs/sync/test/syncAppendEntriesBatchTest.cpp
0 → 100644
浏览文件 @
4a988a7c
//#include <gtest/gtest.h>
#include <stdio.h>
#include "syncIO.h"
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "trpc.h"
void
logTest
()
{
sTrace
(
"--- sync log test: trace"
);
sDebug
(
"--- sync log test: debug"
);
sInfo
(
"--- sync log test: info"
);
sWarn
(
"--- sync log test: warn"
);
sError
(
"--- sync log test: error"
);
sFatal
(
"--- sync log test: fatal"
);
}
SRpcMsg
*
createRpcMsg
(
int32_t
i
,
int32_t
dataLen
)
{
SRpcMsg
*
pRpcMsg
=
(
SRpcMsg
*
)
taosMemoryMalloc
(
sizeof
(
SRpcMsg
));
memset
(
pRpcMsg
,
0
,
sizeof
(
SRpcMsg
));
pRpcMsg
->
msgType
=
TDMT_SYNC_PING
;
pRpcMsg
->
contLen
=
dataLen
;
pRpcMsg
->
pCont
=
rpcMallocCont
(
pRpcMsg
->
contLen
);
pRpcMsg
->
code
=
10
*
i
;
snprintf
((
char
*
)
pRpcMsg
->
pCont
,
pRpcMsg
->
contLen
,
"value_%d"
,
i
);
return
pRpcMsg
;
}
SyncAppendEntriesBatch
*
createMsg
()
{
SRpcMsg
rpcMsgArr
[
5
];
memset
(
rpcMsgArr
,
0
,
sizeof
(
rpcMsgArr
));
for
(
int32_t
i
=
0
;
i
<
5
;
++
i
)
{
SRpcMsg
*
pRpcMsg
=
createRpcMsg
(
i
,
20
);
rpcMsgArr
[
i
]
=
*
pRpcMsg
;
taosMemoryFree
(
pRpcMsg
);
}
SyncAppendEntriesBatch
*
pMsg
=
syncAppendEntriesBatchBuild
(
rpcMsgArr
,
5
,
1234
);
pMsg
->
srcId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
1234
);
pMsg
->
srcId
.
vgId
=
100
;
pMsg
->
destId
.
addr
=
syncUtilAddr2U64
(
"127.0.0.1"
,
5678
);
pMsg
->
destId
.
vgId
=
100
;
pMsg
->
prevLogIndex
=
11
;
pMsg
->
prevLogTerm
=
22
;
pMsg
->
commitIndex
=
33
;
pMsg
->
privateTerm
=
44
;
return
pMsg
;
}
void
test1
()
{
SyncAppendEntriesBatch
*
pMsg
=
createMsg
();
syncAppendEntriesBatchLog2
((
char
*
)
"test1:"
,
pMsg
);
SRpcMsg
rpcMsgArr
[
5
];
int32_t
retArrSize
;
syncAppendEntriesBatch2RpcMsgArray
(
pMsg
,
rpcMsgArr
,
5
,
&
retArrSize
);
for
(
int
i
=
0
;
i
<
retArrSize
;
++
i
)
{
char
logBuf
[
128
];
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==test1 decode rpc msg %d: msgType:%d, code:%d, contLen:%d, pCont:%s
\n
"
,
i
,
rpcMsgArr
[
i
].
msgType
,
rpcMsgArr
[
i
].
code
,
rpcMsgArr
[
i
].
contLen
,
(
char
*
)
rpcMsgArr
[
i
].
pCont
);
sTrace
(
"%s"
,
logBuf
);
}
syncAppendEntriesBatchDestroy
(
pMsg
);
}
/*
void test2() {
SyncAppendEntries *pMsg = createMsg();
uint32_t len = pMsg->bytes;
char * serialized = (char *)taosMemoryMalloc(len);
syncAppendEntriesSerialize(pMsg, serialized, len);
SyncAppendEntries *pMsg2 = syncAppendEntriesBuild(pMsg->dataLen, 1000);
syncAppendEntriesDeserialize(serialized, len, pMsg2);
syncAppendEntriesLog2((char *)"test2: syncAppendEntriesSerialize -> syncAppendEntriesDeserialize ", pMsg2);
taosMemoryFree(serialized);
syncAppendEntriesDestroy(pMsg);
syncAppendEntriesDestroy(pMsg2);
}
void test3() {
SyncAppendEntries *pMsg = createMsg();
uint32_t len;
char * serialized = syncAppendEntriesSerialize2(pMsg, &len);
SyncAppendEntries *pMsg2 = syncAppendEntriesDeserialize2(serialized, len);
syncAppendEntriesLog2((char *)"test3: syncAppendEntriesSerialize3 -> syncAppendEntriesDeserialize2 ", pMsg2);
taosMemoryFree(serialized);
syncAppendEntriesDestroy(pMsg);
syncAppendEntriesDestroy(pMsg2);
}
void test4() {
SyncAppendEntries *pMsg = createMsg();
SRpcMsg rpcMsg;
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
SyncAppendEntries *pMsg2 = (SyncAppendEntries *)taosMemoryMalloc(rpcMsg.contLen);
syncAppendEntriesFromRpcMsg(&rpcMsg, pMsg2);
syncAppendEntriesLog2((char *)"test4: syncAppendEntries2RpcMsg -> syncAppendEntriesFromRpcMsg ", pMsg2);
rpcFreeCont(rpcMsg.pCont);
syncAppendEntriesDestroy(pMsg);
syncAppendEntriesDestroy(pMsg2);
}
void test5() {
SyncAppendEntries *pMsg = createMsg();
SRpcMsg rpcMsg;
syncAppendEntries2RpcMsg(pMsg, &rpcMsg);
SyncAppendEntries *pMsg2 = syncAppendEntriesFromRpcMsg2(&rpcMsg);
syncAppendEntriesLog2((char *)"test5: syncAppendEntries2RpcMsg -> syncAppendEntriesFromRpcMsg2 ", pMsg2);
rpcFreeCont(rpcMsg.pCont);
syncAppendEntriesDestroy(pMsg);
syncAppendEntriesDestroy(pMsg2);
}
*/
int
main
()
{
gRaftDetailLog
=
true
;
tsAsyncLog
=
0
;
sDebugFlag
=
DEBUG_DEBUG
+
DEBUG_TRACE
+
DEBUG_SCREEN
+
DEBUG_FILE
;
logTest
();
test1
();
/*
test2();
test3();
test4();
test5();
*/
return
0
;
}
source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
浏览文件 @
4a988a7c
...
...
@@ -77,7 +77,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
return
0
;
}
int32_t
SnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
**
ppReader
)
{
int32_t
SnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pParam
,
void
**
ppReader
)
{
*
ppReader
=
(
void
*
)
0xABCD
;
char
logBuf
[
256
]
=
{
0
};
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p"
,
pFsm
,
*
ppReader
);
...
...
source/libs/sync/test/syncSnapshotSenderTest.cpp
浏览文件 @
4a988a7c
...
...
@@ -25,7 +25,7 @@ void ReConfigCb(struct SSyncFSM* pFsm, SSyncCfg newCfg, SReConfigCbMeta cbMeta)
int32_t
GetSnapshot
(
struct
SSyncFSM
*
pFsm
,
SSnapshot
*
pSnapshot
)
{
return
0
;
}
int32_t
SnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
**
ppReader
)
{
return
0
;
}
int32_t
SnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pParam
,
void
**
ppReader
)
{
return
0
;
}
int32_t
SnapshotStopRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
)
{
return
0
;
}
int32_t
SnapshotDoRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pReader
,
void
**
ppBuf
,
int32_t
*
len
)
{
return
0
;
}
...
...
source/libs/sync/test/syncTestTool.cpp
浏览文件 @
4a988a7c
...
...
@@ -74,7 +74,7 @@ int32_t GetSnapshotCb(struct SSyncFSM* pFsm, SSnapshot* pSnapshot) {
return
0
;
}
int32_t
SnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
**
ppReader
)
{
int32_t
SnapshotStartRead
(
struct
SSyncFSM
*
pFsm
,
void
*
pParam
,
void
**
ppReader
)
{
*
ppReader
=
(
void
*
)
0xABCD
;
char
logBuf
[
256
]
=
{
0
};
snprintf
(
logBuf
,
sizeof
(
logBuf
),
"==callback== ==SnapshotStartRead== pFsm:%p, *ppReader:%p"
,
pFsm
,
*
ppReader
);
...
...
source/util/src/tarray.c
浏览文件 @
4a988a7c
...
...
@@ -206,11 +206,11 @@ void* taosArrayGetP(const SArray* pArray, size_t index) {
void
*
taosArrayGetLast
(
const
SArray
*
pArray
)
{
return
TARRAY_GET_ELEM
(
pArray
,
pArray
->
size
-
1
);
}
int32
_t
taosArrayGetSize
(
const
SArray
*
pArray
)
{
size
_t
taosArrayGetSize
(
const
SArray
*
pArray
)
{
if
(
pArray
==
NULL
)
{
return
0
;
}
return
(
int32_t
)
pArray
->
size
;
return
pArray
->
size
;
}
void
taosArraySetSize
(
SArray
*
pArray
,
size_t
size
)
{
...
...
source/util/src/terror.c
浏览文件 @
4a988a7c
...
...
@@ -429,6 +429,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_SYN_NEW_CONFIG_ERROR, "Sync new config error
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_RECONFIG_NOT_READY
,
"Sync not ready for reconfig"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_PROPOSE_NOT_READY
,
"Sync not ready for propose"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_STANDBY_NOT_READY
,
"Sync not ready for standby"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_BATCH_ERROR
,
"Sync batch error"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_SYN_INTERNAL_ERROR
,
"Sync internal error"
)
// wal
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录