diff --git a/docs/zh/05-get-started/index.md b/docs/zh/05-get-started/index.md
index 33661580ee4977171620496f69b3db955d5c35dd..dec4d800bc322f0ca42224b845006a62921e3f45 100644
--- a/docs/zh/05-get-started/index.md
+++ b/docs/zh/05-get-started/index.md
@@ -3,6 +3,8 @@ title: 立即开始
description: '快速设置 TDengine 环境并体验其高效写入和查询'
---
+import xiaot from './tdengine.webp'
+
TDengine 完整的软件包包括服务端(taosd)、用于与第三方系统对接并提供 RESTful 接口的 taosAdapter、应用驱动(taosc)、命令行程序 (CLI,taos) 和一些工具软件。TDengine 除了提供多种语言的连接器之外,还通过 [taosAdapter](../reference/taosadapter) 提供 [RESTful 接口](../connector/rest-api)。
本章主要介绍如何利用 Docker 或者安装包快速设置 TDengine 环境并体验其高效写入和查询。
@@ -18,4 +20,4 @@ import {useCurrentSidebarCategory} from '@docusaurus/theme-common';
微信扫描下面二维码,加“小 T”为好友,即可加入“物联网大数据技术前沿群”,与大家共同交流物联网大数据技术应用、TDengine 使用问题和技巧等话题。
-
+
diff --git a/docs/zh/08-connector/10-cpp.mdx b/docs/zh/08-connector/10-cpp.mdx
index cc7991da74729f3a96c475d41df4f5437a0f0605..8a4f4946a71aa27e4bbfc6d27fc3469b260ce550 100644
--- a/docs/zh/08-connector/10-cpp.mdx
+++ b/docs/zh/08-connector/10-cpp.mdx
@@ -115,6 +115,7 @@ TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤)
订阅和消费
```c
+ {{#include examples/c/tmq.c}}
```
diff --git a/docs/zh/28-releases/01-tdengine.md b/docs/zh/28-releases/01-tdengine.md
index 31093ce5577b804dcc66978b2f13baa8c207795d..f72735d903fc2fde684e56d76d6efc9d1dca643d 100644
--- a/docs/zh/28-releases/01-tdengine.md
+++ b/docs/zh/28-releases/01-tdengine.md
@@ -1,9 +1,11 @@
---
sidebar_label: TDengine 发布历史
-title: TDengine 发布历史
+title: TDengine 发布历史及下载链接
description: TDengine 发布历史、Release Notes 及下载链接
---
+各版本 TDengine 安装包下载链接如下:
+
import Release from "/components/ReleaseV3";
## 3.0.1.6
@@ -33,4 +35,3 @@ import Release from "/components/ReleaseV3";
## 3.0.1.0
-
diff --git a/docs/zh/28-releases/02-tools.md b/docs/zh/28-releases/02-tools.md
index 2623391fb90c1ac7b12c8017c93fa57324e1981b..ac4a884f8b95d50d2319b38ba8ad395f20cf4518 100644
--- a/docs/zh/28-releases/02-tools.md
+++ b/docs/zh/28-releases/02-tools.md
@@ -1,9 +1,11 @@
---
sidebar_label: taosTools 发布历史
-title: taosTools 发布历史
+title: taosTools 发布历史及下载链接
description: taosTools 的发布历史、Release Notes 和下载链接
---
+各版本 taosTools 安装包下载链接如下:
+
import Release from "/components/ReleaseV3";
## 2.2.7
diff --git a/include/common/tmsg.h b/include/common/tmsg.h
index b16b5a2d4bdf441b2d9a176ab04a6ccaee2fa596..99c5c72e2fa53048bf37700f2b6e7b940cadf010 100644
--- a/include/common/tmsg.h
+++ b/include/common/tmsg.h
@@ -418,13 +418,17 @@ static FORCE_INLINE int32_t taosEncodeSSchemaWrapper(void** buf, const SSchemaWr
static FORCE_INLINE void* taosDecodeSSchemaWrapper(const void* buf, SSchemaWrapper* pSW) {
buf = taosDecodeVariantI32(buf, &pSW->nCols);
buf = taosDecodeVariantI32(buf, &pSW->version);
- pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
- if (pSW->pSchema == NULL) {
- return NULL;
- }
+ if (pSW->nCols > 0) {
+ pSW->pSchema = (SSchema*)taosMemoryCalloc(pSW->nCols, sizeof(SSchema));
+ if (pSW->pSchema == NULL) {
+ return NULL;
+ }
- for (int32_t i = 0; i < pSW->nCols; i++) {
- buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
+ for (int32_t i = 0; i < pSW->nCols; i++) {
+ buf = taosDecodeSSchema(buf, &pSW->pSchema[i]);
+ }
+ } else {
+ pSW->pSchema = NULL;
}
return (void*)buf;
}
@@ -839,7 +843,7 @@ typedef struct {
int64_t dbId;
int32_t vgVersion;
int32_t numOfTable; // unit is TSDB_TABLE_NUM_UNIT
- int64_t stateTs; // ms
+ int64_t stateTs; // ms
} SUseDbReq;
int32_t tSerializeSUseDbReq(void* buf, int32_t bufLen, SUseDbReq* pReq);
@@ -2990,7 +2994,8 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
}
static FORCE_INLINE void tDeleteSMqSubTopicEp(SMqSubTopicEp* pSubTopicEp) {
- if (pSubTopicEp->schema.nCols) taosMemoryFreeClear(pSubTopicEp->schema.pSchema);
+ taosMemoryFreeClear(pSubTopicEp->schema.pSchema);
+ pSubTopicEp->schema.nCols = 0;
taosArrayDestroy(pSubTopicEp->vgs);
}
diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h
index a27be95049d64b8ff68bc2f75fd62e49f32a47cd..0b997690a18e3d4befe7c0f50701eafdb6879843 100644
--- a/include/libs/sync/sync.h
+++ b/include/libs/sync/sync.h
@@ -58,7 +58,6 @@ typedef int64_t SyncIndex;
typedef uint64_t SyncTerm;
typedef struct SSyncNode SSyncNode;
-typedef struct SSyncBuffer SSyncBuffer;
typedef struct SWal SWal;
typedef struct SSyncRaftEntry SSyncRaftEntry;
diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c
index 34f462cb3d480874df3feaa311b3a4a1e4074c72..7fd288cd57df53d06d7fd9c888104e1b23d1a39f 100644
--- a/source/libs/executor/src/executorimpl.c
+++ b/source/libs/executor/src/executorimpl.c
@@ -3423,6 +3423,7 @@ int32_t buildSessionResultDataBlock(SOperatorInfo* pOperator, SStreamState* pSta
ASSERT(code == 0);
if (code == -1) {
// coverity scan
+ pGroupResInfo->index += 1;
continue;
}
SResultRow* pRow = (SResultRow*)pVal;
diff --git a/source/libs/executor/src/scanoperator.c b/source/libs/executor/src/scanoperator.c
index 55837c4a6aa32fe45fa1a3a7ee5105cc3f73cc28..d2e59ea8e114c4b6c313a27598c9dbde5a3c2bda 100644
--- a/source/libs/executor/src/scanoperator.c
+++ b/source/libs/executor/src/scanoperator.c
@@ -1773,6 +1773,7 @@ static SSDataBlock* doQueueScan(SOperatorInfo* pOperator) {
tqOffsetResetToLog(&pTaskInfo->streamInfo.prepareStatus, pTaskInfo->streamInfo.snapshotVer);
qDebug("queue scan tsdb over, switch to wal ver %" PRId64 "", pTaskInfo->streamInfo.snapshotVer + 1);
if (tqSeekVer(pInfo->tqReader, pTaskInfo->streamInfo.snapshotVer + 1) < 0) {
+ tqOffsetResetToLog(&pTaskInfo->streamInfo.lastStatus, pTaskInfo->streamInfo.snapshotVer);
return NULL;
}
ASSERT(pInfo->tqReader->pWalReader->curVersion == pTaskInfo->streamInfo.snapshotVer + 1);
diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c
index 4c369e880266017667db5b0563a5129799e57d9f..40742087eaee42037320266f7cc7d246f322611f 100644
--- a/source/libs/executor/src/timewindowoperator.c
+++ b/source/libs/executor/src/timewindowoperator.c
@@ -3580,6 +3580,11 @@ static void removeSessionResult(SSHashObj* pHashMap, SSHashObj* pResMap, SSessio
tSimpleHashRemove(pResMap, &key, sizeof(SSessionKey));
}
+static void getSessionHashKey(const SSessionKey* pKey, SSessionKey* pHashKey) {
+ *pHashKey = *pKey;
+ pHashKey->win.ekey = pKey->win.skey;
+}
+
static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) {
if (tSimpleHashGetSize(pHashMap) == 0) {
return;
@@ -3588,8 +3593,8 @@ static void removeSessionResults(SSHashObj* pHashMap, SArray* pWins) {
for (int32_t i = 0; i < size; i++) {
SSessionKey* pWin = taosArrayGet(pWins, i);
if (!pWin) continue;
- SSessionKey key = *pWin;
- key.win.ekey = key.win.skey;
+ SSessionKey key = {0};
+ getSessionHashKey(pWin, &key);
tSimpleHashRemove(pHashMap, &key, sizeof(SSessionKey));
}
}
@@ -3642,7 +3647,9 @@ static int32_t doOneWindowAggImpl(SColumnInfoData* pTimeWindowData, SResultWindo
static bool doDeleteSessionWindow(SStreamAggSupporter* pAggSup, SSessionKey* pKey) {
streamStateSessionDel(pAggSup->pState, pKey);
- tSimpleHashRemove(pAggSup->pResultRows, pKey, sizeof(SSessionKey));
+ SSessionKey hashKey = {0};
+ getSessionHashKey(pKey, &hashKey);
+ tSimpleHashRemove(pAggSup->pResultRows, &hashKey, sizeof(SSessionKey));
return true;
}
@@ -3753,8 +3760,8 @@ static void doStreamSessionAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSData
}
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
- SSessionKey key = winInfo.sessionWin;
- key.win.ekey = key.win.skey;
+ SSessionKey key = {0};
+ getSessionHashKey(&winInfo.sessionWin, &key);
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &winInfo, sizeof(SResultWindowInfo));
}
@@ -3896,8 +3903,8 @@ static void rebuildSessionWindow(SOperatorInfo* pOperator, SArray* pWinArray, SS
SOperatorInfo* pChild = taosArrayGetP(pInfo->pChildren, j);
SStreamSessionAggOperatorInfo* pChInfo = pChild->info;
SStreamAggSupporter* pChAggSup = &pChInfo->streamAggSup;
- SSessionKey chWinKey = *pWinKey;
- chWinKey.win.ekey = chWinKey.win.skey;
+ SSessionKey chWinKey = {0};
+ getSessionHashKey(pWinKey, &chWinKey);
SStreamStateCur* pCur = streamStateSessionSeekKeyCurrentNext(pChAggSup->pState, &chWinKey);
SResultRow* pResult = NULL;
SResultRow* pChResult = NULL;
@@ -3978,8 +3985,8 @@ static void copyDeleteWindowInfo(SArray* pResWins, SSHashObj* pStDeleted) {
for (int32_t i = 0; i < size; i++) {
SSessionKey* pWinKey = taosArrayGet(pResWins, i);
if (!pWinKey) continue;
- SSessionKey winInfo = *pWinKey;
- winInfo.win.ekey = winInfo.win.skey;
+ SSessionKey winInfo = {0};
+ getSessionHashKey(pWinKey, &winInfo);
tSimpleHashPut(pStDeleted, &winInfo, sizeof(SSessionKey), NULL, 0);
}
}
@@ -4561,8 +4568,8 @@ static void doStreamStateAggImpl(SOperatorInfo* pOperator, SSDataBlock* pSDataBl
}
if (pInfo->twAggSup.calTrigger == STREAM_TRIGGER_WINDOW_CLOSE) {
- SSessionKey key = curWin.winInfo.sessionWin;
- key.win.ekey = key.win.skey;
+ SSessionKey key = {0};
+ getSessionHashKey(&curWin.winInfo.sessionWin, &key);
tSimpleHashPut(pAggSup->pResultRows, &key, sizeof(SSessionKey), &curWin.winInfo, sizeof(SResultWindowInfo));
}
}
@@ -4645,6 +4652,12 @@ static SSDataBlock* doStreamStateAgg(SOperatorInfo* pOperator) {
initGroupResInfoFromArrayList(&pInfo->groupResInfo, pUpdated);
blockDataEnsureCapacity(pInfo->binfo.pRes, pOperator->resultInfo.capacity);
+#if 0
+ char* pBuf = streamStateSessionDump(pInfo->streamAggSup.pState);
+ qDebug("===stream===final session%s", pBuf);
+ taosMemoryFree(pBuf);
+#endif
+
doBuildDeleteDataBlock(pOperator, pInfo->pSeDeleted, pInfo->pDelRes, &pInfo->pDelIterator);
if (pInfo->pDelRes->info.rows > 0) {
printDataBlock(pInfo->pDelRes, "single state delete");
diff --git a/source/libs/parser/src/parUtil.c b/source/libs/parser/src/parUtil.c
index 00a72a1946e7039ef1a07b9b4efa6a12fd59a412..e8c3f2fa8da8717c826ced8e33a540ebaa4db829 100644
--- a/source/libs/parser/src/parUtil.c
+++ b/source/libs/parser/src/parUtil.c
@@ -138,7 +138,7 @@ static char* getSyntaxErrFormat(int32_t errCode) {
case TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY:
return "Primary timestamp column cannot be dropped";
case TSDB_CODE_PAR_INVALID_MODIFY_COL:
- return "Only binary/nchar column length could be modified";
+ return "Only binary/nchar column length could be modified, and the length can only be increased, not decreased";
case TSDB_CODE_PAR_INVALID_TBNAME:
return "Invalid tbname pseudo column";
case TSDB_CODE_PAR_INVALID_FUNCTION_NAME:
diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c
index ccb0dd4a92e12ee51bbdba6ffb4777ae663cae8e..88c39c1157c4ecae57829bd44b1d31ec66d5ab68 100644
--- a/source/libs/stream/src/streamState.c
+++ b/source/libs/stream/src/streamState.c
@@ -521,9 +521,13 @@ int32_t streamStateSessionGet(SStreamState* pState, SSessionKey* key, void** pVa
void* tmp = NULL;
int32_t code = streamStateSessionGetKVByCur(pCur, &resKey, &tmp, pVLen);
if (code == 0) {
- *key = resKey;
- *pVal = tdbRealloc(NULL, *pVLen);
- memcpy(*pVal, tmp, *pVLen);
+ if (key->win.skey != resKey.win.skey) {
+ code = -1;
+ } else {
+ *key = resKey;
+ *pVal = tdbRealloc(NULL, *pVLen);
+ memcpy(*pVal, tmp, *pVLen);
+ }
}
streamStateFreeCur(pCur);
return code;
diff --git a/source/libs/sync/inc/syncAppendEntries.h b/source/libs/sync/inc/syncAppendEntries.h
index 0a67939d356c80208caf431ca8fff5bbd7760328..a87a28baf524898937ad4838fd08d80d451f38e4 100644
--- a/source/libs/sync/inc/syncAppendEntries.h
+++ b/source/libs/sync/inc/syncAppendEntries.h
@@ -21,7 +21,6 @@ extern "C" {
#endif
#include "syncInt.h"
-#include "syncMessage.h"
// TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) ==
diff --git a/source/libs/sync/inc/syncAppendEntriesReply.h b/source/libs/sync/inc/syncAppendEntriesReply.h
index d2dff92d43f31bc2c05c7e9d3a1a476ccd06ffbf..09750864d596932efa42dda30f92c58075f644ca 100644
--- a/source/libs/sync/inc/syncAppendEntriesReply.h
+++ b/source/libs/sync/inc/syncAppendEntriesReply.h
@@ -21,7 +21,6 @@ extern "C" {
#endif
#include "syncInt.h"
-#include "syncMessage.h"
// TLA+ Spec
// HandleAppendEntriesResponse(i, j, m) ==
diff --git a/source/libs/sync/inc/syncIndexMgr.h b/source/libs/sync/inc/syncIndexMgr.h
index bd88f5cdce270ac138d12fb89b4af4d4eb94ce29..79b4fa0fbf06be6d1c16e783ebc59638fff1a5b5 100644
--- a/source/libs/sync/inc/syncIndexMgr.h
+++ b/source/libs/sync/inc/syncIndexMgr.h
@@ -41,22 +41,13 @@ void syncIndexMgrDestroy(SSyncIndexMgr *pSyncIndexMgr);
void syncIndexMgrClear(SSyncIndexMgr *pSyncIndexMgr);
void syncIndexMgrSetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncIndex index);
SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
-cJSON * syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr);
-char * syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr);
-void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime);
-int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
-void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime);
-int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
-
-// void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term);
-// SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
-
-// for debug -------------------
-void syncIndexMgrPrint(SSyncIndexMgr *pObj);
-void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj);
-void syncIndexMgrLog(SSyncIndexMgr *pObj);
-void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj);
+void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime);
+int64_t syncIndexMgrGetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
+void syncIndexMgrSetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t recvTime);
+int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
+void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term);
+SyncTerm syncIndexMgrGetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId);
#ifdef __cplusplus
}
diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h
index 8a951ba38d08d4fc5f7a117780ef6a1e218924a4..46bbb1442187d9bd5d08716d2f5a76792f1e48d7 100644
--- a/source/libs/sync/inc/syncInt.h
+++ b/source/libs/sync/inc/syncInt.h
@@ -21,7 +21,6 @@ extern "C" {
#endif
#include "sync.h"
-#include "syncTools.h"
#include "taosdef.h"
#include "tlog.h"
#include "trpc.h"
@@ -85,9 +84,33 @@ typedef struct SSyncSnapshotSender SSyncSnapshotSender;
typedef struct SSyncSnapshotReceiver SSyncSnapshotReceiver;
typedef struct SSyncTimer SSyncTimer;
typedef struct SSyncHbTimerData SSyncHbTimerData;
+typedef struct SyncSnapshotSend SyncSnapshotSend;
+typedef struct SyncSnapshotRsp SyncSnapshotRsp;
+typedef struct SyncLocalCmd SyncLocalCmd;
+typedef struct SyncAppendEntriesBatch SyncAppendEntriesBatch;
+typedef struct SyncPreSnapshotReply SyncPreSnapshotReply;
+typedef struct SyncHeartbeatReply SyncHeartbeatReply;
+typedef struct SyncHeartbeat SyncHeartbeat;
+typedef struct SyncPreSnapshot SyncPreSnapshot;
+
+typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg);
+typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg);
+typedef int32_t (*FpOnClientRequestCb)(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
+typedef int32_t (*FpOnRequestVoteCb)(SSyncNode* ths, SyncRequestVote* pMsg);
+typedef int32_t (*FpOnRequestVoteReplyCb)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
+typedef int32_t (*FpOnAppendEntriesCb)(SSyncNode* ths, SyncAppendEntries* pMsg);
+typedef int32_t (*FpOnAppendEntriesReplyCb)(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
+typedef int32_t (*FpOnTimeoutCb)(SSyncNode* pSyncNode, SyncTimeout* pMsg);
+typedef int32_t (*FpOnSnapshotCb)(SSyncNode* ths, SyncSnapshotSend* pMsg);
+typedef int32_t (*FpOnSnapshotReplyCb)(SSyncNode* ths, SyncSnapshotRsp* pMsg);
extern bool gRaftDetailLog;
+typedef struct SRaftId {
+ SyncNodeId addr;
+ SyncGroupId vgId;
+} SRaftId;
+
typedef struct SSyncHbTimerData {
SSyncNode* pSyncNode;
SSyncTimer* pTimer;
diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h
index 936081c7b2d901881fdfc977ac9f628bf126cca2..7dc04d0b86c8908e471cef89d120a5761662ae65 100644
--- a/source/libs/sync/inc/syncMessage.h
+++ b/source/libs/sync/inc/syncMessage.h
@@ -23,15 +23,666 @@ extern "C" {
#include "syncInt.h"
// ---------------------------------------------
-cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg);
-cJSON* syncRpcUnknownMsg2Json();
-char* syncRpcMsg2Str(SRpcMsg* pRpcMsg);
+typedef struct SyncPing {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType;
+ SRaftId srcId;
+ SRaftId destId;
+ // private data
+ uint32_t dataLen;
+ char data[];
+} SyncPing;
+
+
+void syncPingDestroy(SyncPing* pMsg);
+void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen);
+void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg);
+SyncPing* syncPingDeserialize2(const char* buf, uint32_t len);
+SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg);
+
+// ---------------------------------------------
+typedef struct SyncPingReply {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType;
+ SRaftId srcId;
+ SRaftId destId;
+ // private data
+ uint32_t dataLen;
+ char data[];
+} SyncPingReply;
+
+SyncPingReply* syncPingReplyBuild(uint32_t dataLen);
+SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str);
+SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId);
+void syncPingReplyDestroy(SyncPingReply* pMsg);
+void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen);
+void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg);
+char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len);
+SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len);
+int32_t syncPingReplySerialize3(const SyncPingReply* pMsg, char* buf, int32_t bufLen);
+SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen);
+void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg);
+void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg);
+SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
+cJSON* syncPingReply2Json(const SyncPingReply* pMsg);
+char* syncPingReply2Str(const SyncPingReply* pMsg);
+
+// for debug ----------------------
+void syncPingReplyPrint(const SyncPingReply* pMsg);
+void syncPingReplyPrint2(char* s, const SyncPingReply* pMsg);
+void syncPingReplyLog(const SyncPingReply* pMsg);
+void syncPingReplyLog2(char* s, const SyncPingReply* pMsg);
+
+// ---------------------------------------------
+typedef enum ESyncTimeoutType {
+ SYNC_TIMEOUT_PING = 100,
+ SYNC_TIMEOUT_ELECTION,
+ SYNC_TIMEOUT_HEARTBEAT,
+} ESyncTimeoutType;
+
+const char* syncTimerTypeStr(enum ESyncTimeoutType timerType);
+
+typedef struct SyncTimeout {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType;
+ ESyncTimeoutType timeoutType;
+ uint64_t logicClock;
+ int32_t timerMS;
+ void* data; // need optimized
+} SyncTimeout;
+
+SyncTimeout* syncTimeoutBuild();
+SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, int32_t vgId,
+ void* data);
+void syncTimeoutDestroy(SyncTimeout* pMsg);
+void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen);
+void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg);
+char* syncTimeoutSerialize2(const SyncTimeout* pMsg, uint32_t* len);
+SyncTimeout* syncTimeoutDeserialize2(const char* buf, uint32_t len);
+void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg);
+void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg);
+SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg);
+cJSON* syncTimeout2Json(const SyncTimeout* pMsg);
+char* syncTimeout2Str(const SyncTimeout* pMsg);
+
+// for debug ----------------------
+void syncTimeoutPrint(const SyncTimeout* pMsg);
+void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg);
+void syncTimeoutLog(const SyncTimeout* pMsg);
+void syncTimeoutLog2(char* s, const SyncTimeout* pMsg);
+
+// ---------------------------------------------
+typedef struct SyncClientRequest {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST
+ uint32_t originalRpcType; // origin RpcMsg msgType
+ uint64_t seqNum;
+ bool isWeak;
+ uint32_t dataLen; // origin RpcMsg.contLen
+ char data[]; // origin RpcMsg.pCont
+} SyncClientRequest;
+
+SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen);
+int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum,
+ bool isWeak, int32_t vgId);
+int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId);
+void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg); // step 2
+void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg);
+cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg);
+char* syncClientRequest2Str(const SyncClientRequest* pMsg);
+
+// for debug ----------------------
+void syncClientRequestPrint(const SyncClientRequest* pMsg);
+void syncClientRequestPrint2(char* s, const SyncClientRequest* pMsg);
+void syncClientRequestLog(const SyncClientRequest* pMsg);
+void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg);
+
+// ---------------------------------------------
+typedef struct SRaftMeta {
+ uint64_t seqNum;
+ bool isWeak;
+} SRaftMeta;
+
+// block1:
+// block2: SRaftMeta array
+// block3: rpc msg array (with pCont pointer)
+
+typedef struct SyncClientRequestBatch {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST_BATCH
+ uint32_t dataCount;
+ uint32_t dataLen;
+ char data[]; // block2, block3
+} SyncClientRequestBatch;
+
+SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize,
+ int32_t vgId);
+void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg);
+void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg);
+void syncClientRequestBatchDestroyDeep(SyncClientRequestBatch* pMsg);
+SRaftMeta* syncClientRequestBatchMetaArr(const SyncClientRequestBatch* pSyncMsg);
+SRpcMsg* syncClientRequestBatchRpcMsgArr(const SyncClientRequestBatch* pSyncMsg);
+SyncClientRequestBatch* syncClientRequestBatchFromRpcMsg(const SRpcMsg* pRpcMsg);
+cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg);
+char* syncClientRequestBatch2Str(const SyncClientRequestBatch* pMsg);
+
+// for debug ----------------------
+void syncClientRequestBatchPrint(const SyncClientRequestBatch* pMsg);
+void syncClientRequestBatchPrint2(char* s, const SyncClientRequestBatch* pMsg);
+void syncClientRequestBatchLog(const SyncClientRequestBatch* pMsg);
+void syncClientRequestBatchLog2(char* s, const SyncClientRequestBatch* pMsg);
+
+// ---------------------------------------------
+typedef struct SyncClientRequestReply {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType;
+ int32_t errCode;
+ SRaftId leaderHint;
+} SyncClientRequestReply;
+
+// ---------------------------------------------
+typedef struct SyncRequestVote {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType;
+ SRaftId srcId;
+ SRaftId destId;
+ // private data
+ SyncTerm term;
+ SyncIndex lastLogIndex;
+ SyncTerm lastLogTerm;
+} SyncRequestVote;
+
+SyncRequestVote* syncRequestVoteBuild(int32_t vgId);
+void syncRequestVoteDestroy(SyncRequestVote* pMsg);
+void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen);
+void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg);
+char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len);
+SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len);
+void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg);
+void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg);
+SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg);
+cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg);
+char* syncRequestVote2Str(const SyncRequestVote* pMsg);
+
+// for debug ----------------------
+void syncRequestVotePrint(const SyncRequestVote* pMsg);
+void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg);
+void syncRequestVoteLog(const SyncRequestVote* pMsg);
+void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg);
+
+// ---------------------------------------------
+typedef struct SyncRequestVoteReply {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType;
+ SRaftId srcId;
+ SRaftId destId;
+ // private data
+ SyncTerm term;
+ bool voteGranted;
+} SyncRequestVoteReply;
+
+SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId);
+void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg);
+void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen);
+void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg);
+char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len);
+SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len);
+void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg);
+void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg);
+SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
+cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg);
+char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg);
+
+// for debug ----------------------
+void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg);
+void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg);
+void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg);
+void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg);
+
+// ---------------------------------------------
+// data: entry
+
+typedef struct SyncAppendEntries {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType;
+ SRaftId srcId;
+ SRaftId destId;
+
+ // private data
+ SyncTerm term;
+ SyncIndex prevLogIndex;
+ SyncTerm prevLogTerm;
+ SyncIndex commitIndex;
+ SyncTerm privateTerm;
+ uint32_t dataLen;
+ char data[];
+} SyncAppendEntries;
+
+SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen, int32_t vgId);
+void syncAppendEntriesDestroy(SyncAppendEntries* pMsg);
+void syncAppendEntriesSerialize(const SyncAppendEntries* pMsg, char* buf, uint32_t bufLen);
+void syncAppendEntriesDeserialize(const char* buf, uint32_t len, SyncAppendEntries* pMsg);
+char* syncAppendEntriesSerialize2(const SyncAppendEntries* pMsg, uint32_t* len);
+SyncAppendEntries* syncAppendEntriesDeserialize2(const char* buf, uint32_t len);
+void syncAppendEntries2RpcMsg(const SyncAppendEntries* pMsg, SRpcMsg* pRpcMsg);
+void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg);
+SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg);
+cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg);
+char* syncAppendEntries2Str(const SyncAppendEntries* pMsg);
+
+// for debug ----------------------
+void syncAppendEntriesPrint(const SyncAppendEntries* pMsg);
+void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg);
+void syncAppendEntriesLog(const SyncAppendEntries* pMsg);
+void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg);
+
+// ---------------------------------------------
+
+typedef struct SOffsetAndContLen {
+ int32_t offset;
+ int32_t contLen;
+} SOffsetAndContLen;
+
+// data:
+// block1: SOffsetAndContLen Array
+// block2: entry Array
+
+typedef struct SyncAppendEntriesBatch {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType;
+ SRaftId srcId;
+ SRaftId destId;
+
+ // private data
+ SyncTerm term;
+ SyncIndex prevLogIndex;
+ SyncTerm prevLogTerm;
+ SyncIndex commitIndex;
+ SyncTerm privateTerm;
+ int32_t dataCount;
+ uint32_t dataLen;
+ char data[]; // block1, block2
+} SyncAppendEntriesBatch;
+
+SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId);
+SOffsetAndContLen* syncAppendEntriesBatchMetaTableArray(SyncAppendEntriesBatch* pMsg);
+void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg);
+void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen);
+void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg);
+char* syncAppendEntriesBatchSerialize2(const SyncAppendEntriesBatch* pMsg, uint32_t* len);
+SyncAppendEntriesBatch* syncAppendEntriesBatchDeserialize2(const char* buf, uint32_t len);
+void syncAppendEntriesBatch2RpcMsg(const SyncAppendEntriesBatch* pMsg, SRpcMsg* pRpcMsg);
+void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesBatch* pMsg);
+SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg);
+
+// ---------------------------------------------
+typedef struct SyncAppendEntriesReply {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType;
+ SRaftId srcId;
+ SRaftId destId;
+ // private data
+ SyncTerm term;
+ SyncTerm privateTerm;
+ bool success;
+ SyncIndex matchIndex;
+ SyncIndex lastSendIndex;
+ int64_t startTime;
+} SyncAppendEntriesReply;
+
+SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId);
+void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg);
+void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen);
+void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg);
+char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len);
+SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len);
+void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg);
+void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg);
+SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
+cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg);
+char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg);
+
+// for debug ----------------------
+void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg);
+void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg);
+void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg);
+void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg);
+
+// ---------------------------------------------
+typedef struct SyncHeartbeat {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType;
+ SRaftId srcId;
+ SRaftId destId;
+
+ // private data
+ SyncTerm term;
+ SyncIndex commitIndex;
+ SyncTerm privateTerm;
+ SyncTerm minMatchIndex;
+
+} SyncHeartbeat;
+
+SyncHeartbeat* syncHeartbeatBuild(int32_t vgId);
+void syncHeartbeatDestroy(SyncHeartbeat* pMsg);
+void syncHeartbeatSerialize(const SyncHeartbeat* pMsg, char* buf, uint32_t bufLen);
+void syncHeartbeatDeserialize(const char* buf, uint32_t len, SyncHeartbeat* pMsg);
+char* syncHeartbeatSerialize2(const SyncHeartbeat* pMsg, uint32_t* len);
+SyncHeartbeat* syncHeartbeatDeserialize2(const char* buf, uint32_t len);
+void syncHeartbeat2RpcMsg(const SyncHeartbeat* pMsg, SRpcMsg* pRpcMsg);
+void syncHeartbeatFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeat* pMsg);
+SyncHeartbeat* syncHeartbeatFromRpcMsg2(const SRpcMsg* pRpcMsg);
+cJSON* syncHeartbeat2Json(const SyncHeartbeat* pMsg);
+char* syncHeartbeat2Str(const SyncHeartbeat* pMsg);
+
+// for debug ----------------------
+void syncHeartbeatPrint(const SyncHeartbeat* pMsg);
+void syncHeartbeatPrint2(char* s, const SyncHeartbeat* pMsg);
+void syncHeartbeatLog(const SyncHeartbeat* pMsg);
+void syncHeartbeatLog2(char* s, const SyncHeartbeat* pMsg);
+
+// ---------------------------------------------
+typedef struct SyncHeartbeatReply {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType;
+ SRaftId srcId;
+ SRaftId destId;
+
+ // private data
+ SyncTerm term;
+ SyncTerm privateTerm;
+ int64_t startTime;
+} SyncHeartbeatReply;
+
+SyncHeartbeatReply* syncHeartbeatReplyBuild(int32_t vgId);
+void syncHeartbeatReplyDestroy(SyncHeartbeatReply* pMsg);
+void syncHeartbeatReplySerialize(const SyncHeartbeatReply* pMsg, char* buf, uint32_t bufLen);
+void syncHeartbeatReplyDeserialize(const char* buf, uint32_t len, SyncHeartbeatReply* pMsg);
+char* syncHeartbeatReplySerialize2(const SyncHeartbeatReply* pMsg, uint32_t* len);
+SyncHeartbeatReply* syncHeartbeatReplyDeserialize2(const char* buf, uint32_t len);
+void syncHeartbeatReply2RpcMsg(const SyncHeartbeatReply* pMsg, SRpcMsg* pRpcMsg);
+void syncHeartbeatReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeatReply* pMsg);
+SyncHeartbeatReply* syncHeartbeatReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
+cJSON* syncHeartbeatReply2Json(const SyncHeartbeatReply* pMsg);
+char* syncHeartbeatReply2Str(const SyncHeartbeatReply* pMsg);
+
+// for debug ----------------------
+void syncHeartbeatReplyPrint(const SyncHeartbeatReply* pMsg);
+void syncHeartbeatReplyPrint2(char* s, const SyncHeartbeatReply* pMsg);
+void syncHeartbeatReplyLog(const SyncHeartbeatReply* pMsg);
+void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg);
+
+// ---------------------------------------------
+typedef struct SyncPreSnapshot {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType;
+ SRaftId srcId;
+ SRaftId destId;
+
+ // private data
+ SyncTerm term;
+
+} SyncPreSnapshot;
+
+SyncPreSnapshot* syncPreSnapshotBuild(int32_t vgId);
+void syncPreSnapshotDestroy(SyncPreSnapshot* pMsg);
+void syncPreSnapshotSerialize(const SyncPreSnapshot* pMsg, char* buf, uint32_t bufLen);
+void syncPreSnapshotDeserialize(const char* buf, uint32_t len, SyncPreSnapshot* pMsg);
+char* syncPreSnapshotSerialize2(const SyncPreSnapshot* pMsg, uint32_t* len);
+SyncPreSnapshot* syncPreSnapshotDeserialize2(const char* buf, uint32_t len);
+void syncPreSnapshot2RpcMsg(const SyncPreSnapshot* pMsg, SRpcMsg* pRpcMsg);
+void syncPreSnapshotFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshot* pMsg);
+SyncPreSnapshot* syncPreSnapshotFromRpcMsg2(const SRpcMsg* pRpcMsg);
+cJSON* syncPreSnapshot2Json(const SyncPreSnapshot* pMsg);
+char* syncPreSnapshot2Str(const SyncPreSnapshot* pMsg);
+
+// for debug ----------------------
+void syncPreSnapshotPrint(const SyncPreSnapshot* pMsg);
+void syncPreSnapshotPrint2(char* s, const SyncPreSnapshot* pMsg);
+void syncPreSnapshotLog(const SyncPreSnapshot* pMsg);
+void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg);
+
+// ---------------------------------------------
+typedef struct SyncPreSnapshotReply {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType;
+ SRaftId srcId;
+ SRaftId destId;
+
+ // private data
+ SyncTerm term;
+ SyncIndex snapStart;
+
+} SyncPreSnapshotReply;
+
+SyncPreSnapshotReply* syncPreSnapshotReplyBuild(int32_t vgId);
+void syncPreSnapshotReplyDestroy(SyncPreSnapshotReply* pMsg);
+void syncPreSnapshotReplySerialize(const SyncPreSnapshotReply* pMsg, char* buf, uint32_t bufLen);
+void syncPreSnapshotReplyDeserialize(const char* buf, uint32_t len, SyncPreSnapshotReply* pMsg);
+char* syncPreSnapshotReplySerialize2(const SyncPreSnapshotReply* pMsg, uint32_t* len);
+SyncPreSnapshotReply* syncPreSnapshotReplyDeserialize2(const char* buf, uint32_t len);
+void syncPreSnapshotReply2RpcMsg(const SyncPreSnapshotReply* pMsg, SRpcMsg* pRpcMsg);
+void syncPreSnapshotReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshotReply* pMsg);
+SyncPreSnapshotReply* syncPreSnapshotReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
+cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg);
+char* syncPreSnapshotReply2Str(const SyncPreSnapshotReply* pMsg);
+
+// for debug ----------------------
+void syncPreSnapshotReplyPrint(const SyncPreSnapshotReply* pMsg);
+void syncPreSnapshotReplyPrint2(char* s, const SyncPreSnapshotReply* pMsg);
+void syncPreSnapshotReplyLog(const SyncPreSnapshotReply* pMsg);
+void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg);
+
+// ---------------------------------------------
+typedef struct SyncApplyMsg {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType; // user SyncApplyMsg msgType
+ uint32_t originalRpcType; // user RpcMsg msgType
+ SFsmCbMeta fsmMeta;
+ uint32_t dataLen; // user RpcMsg.contLen
+ char data[]; // user RpcMsg.pCont
+} SyncApplyMsg;
+
+SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen);
+SyncApplyMsg* syncApplyMsgBuild2(const SRpcMsg* pOriginalRpcMsg, int32_t vgId, SFsmCbMeta* pMeta);
+void syncApplyMsgDestroy(SyncApplyMsg* pMsg);
+void syncApplyMsgSerialize(const SyncApplyMsg* pMsg, char* buf, uint32_t bufLen);
+void syncApplyMsgDeserialize(const char* buf, uint32_t len, SyncApplyMsg* pMsg);
+char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len);
+SyncApplyMsg* syncApplyMsgDeserialize2(const char* buf, uint32_t len);
+void syncApplyMsg2RpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pRpcMsg); // SyncApplyMsg to SRpcMsg, put it into ApplyQ
+void syncApplyMsgFromRpcMsg(const SRpcMsg* pRpcMsg, SyncApplyMsg* pMsg); // get SRpcMsg from ApplyQ, to SyncApplyMsg
+SyncApplyMsg* syncApplyMsgFromRpcMsg2(const SRpcMsg* pRpcMsg);
+void syncApplyMsg2OriginalRpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pOriginalRpcMsg); // SyncApplyMsg to OriginalRpcMsg
+cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg);
+char* syncApplyMsg2Str(const SyncApplyMsg* pMsg);
+
+// for debug ----------------------
+void syncApplyMsgPrint(const SyncApplyMsg* pMsg);
+void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg);
+void syncApplyMsgLog(const SyncApplyMsg* pMsg);
+void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg);
+
+// ---------------------------------------------
+typedef struct SyncSnapshotSend {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType;
+ SRaftId srcId;
+ SRaftId destId;
+
+ SyncTerm term;
+ SyncIndex beginIndex; // snapshot.beginIndex
+ SyncIndex lastIndex; // snapshot.lastIndex
+ SyncTerm lastTerm; // snapshot.lastTerm
+ SyncIndex lastConfigIndex; // snapshot.lastConfigIndex
+ SSyncCfg lastConfig;
+ int64_t startTime;
+ int32_t seq;
+ uint32_t dataLen;
+ char data[];
+} SyncSnapshotSend;
+
+SyncSnapshotSend* syncSnapshotSendBuild(uint32_t dataLen, int32_t vgId);
+void syncSnapshotSendDestroy(SyncSnapshotSend* pMsg);
+void syncSnapshotSendSerialize(const SyncSnapshotSend* pMsg, char* buf, uint32_t bufLen);
+void syncSnapshotSendDeserialize(const char* buf, uint32_t len, SyncSnapshotSend* pMsg);
+char* syncSnapshotSendSerialize2(const SyncSnapshotSend* pMsg, uint32_t* len);
+SyncSnapshotSend* syncSnapshotSendDeserialize2(const char* buf, uint32_t len);
+void syncSnapshotSend2RpcMsg(const SyncSnapshotSend* pMsg, SRpcMsg* pRpcMsg);
+void syncSnapshotSendFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotSend* pMsg);
+SyncSnapshotSend* syncSnapshotSendFromRpcMsg2(const SRpcMsg* pRpcMsg);
+cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg);
+char* syncSnapshotSend2Str(const SyncSnapshotSend* pMsg);
+
+// for debug ----------------------
+void syncSnapshotSendPrint(const SyncSnapshotSend* pMsg);
+void syncSnapshotSendPrint2(char* s, const SyncSnapshotSend* pMsg);
+void syncSnapshotSendLog(const SyncSnapshotSend* pMsg);
+void syncSnapshotSendLog2(char* s, const SyncSnapshotSend* pMsg);
+
+// ---------------------------------------------
+typedef struct SyncSnapshotRsp {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType;
+ SRaftId srcId;
+ SRaftId destId;
+
+ SyncTerm term;
+ SyncIndex lastIndex;
+ SyncTerm lastTerm;
+ int64_t startTime;
+ int32_t ack;
+ int32_t code;
+ SyncIndex snapBeginIndex; // when ack = SYNC_SNAPSHOT_SEQ_BEGIN, it's valid
+} SyncSnapshotRsp;
+
+SyncSnapshotRsp* syncSnapshotRspBuild(int32_t vgId);
+void syncSnapshotRspDestroy(SyncSnapshotRsp* pMsg);
+void syncSnapshotRspSerialize(const SyncSnapshotRsp* pMsg, char* buf, uint32_t bufLen);
+void syncSnapshotRspDeserialize(const char* buf, uint32_t len, SyncSnapshotRsp* pMsg);
+char* syncSnapshotRspSerialize2(const SyncSnapshotRsp* pMsg, uint32_t* len);
+SyncSnapshotRsp* syncSnapshotRspDeserialize2(const char* buf, uint32_t len);
+void syncSnapshotRsp2RpcMsg(const SyncSnapshotRsp* pMsg, SRpcMsg* pRpcMsg);
+void syncSnapshotRspFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotRsp* pMsg);
+SyncSnapshotRsp* syncSnapshotRspFromRpcMsg2(const SRpcMsg* pRpcMsg);
+cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg);
+char* syncSnapshotRsp2Str(const SyncSnapshotRsp* pMsg);
+
+// for debug ----------------------
+void syncSnapshotRspPrint(const SyncSnapshotRsp* pMsg);
+void syncSnapshotRspPrint2(char* s, const SyncSnapshotRsp* pMsg);
+void syncSnapshotRspLog(const SyncSnapshotRsp* pMsg);
+void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg);
+
+// ---------------------------------------------
+typedef struct SyncLeaderTransfer {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType;
+ /*
+ SRaftId srcId;
+ SRaftId destId;
+ */
+ SNodeInfo newNodeInfo;
+ SRaftId newLeaderId;
+} SyncLeaderTransfer;
+
+SyncLeaderTransfer* syncLeaderTransferBuild(int32_t vgId);
+void syncLeaderTransferDestroy(SyncLeaderTransfer* pMsg);
+void syncLeaderTransferSerialize(const SyncLeaderTransfer* pMsg, char* buf, uint32_t bufLen);
+void syncLeaderTransferDeserialize(const char* buf, uint32_t len, SyncLeaderTransfer* pMsg);
+char* syncLeaderTransferSerialize2(const SyncLeaderTransfer* pMsg, uint32_t* len);
+SyncLeaderTransfer* syncLeaderTransferDeserialize2(const char* buf, uint32_t len);
+void syncLeaderTransfer2RpcMsg(const SyncLeaderTransfer* pMsg, SRpcMsg* pRpcMsg);
+void syncLeaderTransferFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLeaderTransfer* pMsg);
+SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg);
+cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg);
+char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg);
+
+typedef enum {
+ SYNC_LOCAL_CMD_STEP_DOWN = 100,
+ SYNC_LOCAL_CMD_FOLLOWER_CMT,
+} ESyncLocalCmd;
+
+const char* syncLocalCmdGetStr(int32_t cmd);
+
+typedef struct SyncLocalCmd {
+ uint32_t bytes;
+ int32_t vgId;
+ uint32_t msgType;
+ SRaftId srcId;
+ SRaftId destId;
+
+ int32_t cmd;
+ SyncTerm sdNewTerm; // step down new term
+ SyncIndex fcIndex;// follower commit index
+
+} SyncLocalCmd;
+
+SyncLocalCmd* syncLocalCmdBuild(int32_t vgId);
+void syncLocalCmdDestroy(SyncLocalCmd* pMsg);
+void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen);
+void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg);
+char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len);
+SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len);
+void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg);
+void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg);
+SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg);
+cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg);
+char* syncLocalCmd2Str(const SyncLocalCmd* pMsg);
// for debug ----------------------
-void syncRpcMsgPrint(SRpcMsg* pMsg);
-void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg);
-void syncRpcMsgLog(SRpcMsg* pMsg);
-void syncRpcMsgLog2(char* s, SRpcMsg* pMsg);
+void syncLocalCmdPrint(const SyncLocalCmd* pMsg);
+void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg);
+void syncLocalCmdLog(const SyncLocalCmd* pMsg);
+void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg);
+
+// on message ----------------------
+int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg);
+int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg);
+
+int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg);
+int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg);
+
+int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg);
+int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
+
+int32_t syncNodeOnPreSnapshot(SSyncNode* ths, SyncPreSnapshot* pMsg);
+int32_t syncNodeOnPreSnapshotReply(SSyncNode* ths, SyncPreSnapshotReply* pMsg);
+
+int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg);
+int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg);
+
+int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg);
+int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg);
+
+int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
+int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg);
+
+// -----------------------------------------
+
+// option ----------------------------------
+bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
+ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
+
// ---------------------------------------------
#ifdef __cplusplus
diff --git a/source/libs/sync/inc/syncTools.h b/source/libs/sync/inc/syncTools.h
deleted file mode 100644
index 3fb4a5ba0c6f3283c1ee602e7a84f3a8bc4aa25b..0000000000000000000000000000000000000000
--- a/source/libs/sync/inc/syncTools.h
+++ /dev/null
@@ -1,755 +0,0 @@
-/*
- * Copyright (c) 2019 TAOS Data, Inc.
- *
- * This program is free software: you can use, redistribute, and/or modify
- * it under the terms of the GNU Affero General Public License, version 3
- * or later ("AGPL"), as published by the Free Software Foundation.
- *
- * This program is distributed in the hope that it will be useful, but WITHOUT
- * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
- * FITNESS FOR A PARTICULAR PURPOSE.
- *
- * You should have received a copy of the GNU Affero General Public License
- * along with this program. If not, see .
- */
-
-#ifndef _TD_LIBS_SYNC_TOOLS_H
-#define _TD_LIBS_SYNC_TOOLS_H
-
-#ifdef __cplusplus
-extern "C" {
-#endif
-
-// ------------------ ds -------------------
-typedef struct SRaftId {
- SyncNodeId addr;
- SyncGroupId vgId;
-} SRaftId;
-
-// ------------------ for debug -------------------
-void syncRpcMsgPrint(SRpcMsg* pMsg);
-void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg);
-void syncRpcMsgLog(SRpcMsg* pMsg);
-void syncRpcMsgLog2(char* s, SRpcMsg* pMsg);
-
-// ------------------ for compile -------------------
-typedef struct SSyncBuffer {
- void* data;
- size_t len;
-} SSyncBuffer;
-
-typedef struct SNodesRole {
- int32_t replicaNum;
- SNodeInfo nodeInfo[TSDB_MAX_REPLICA];
- ESyncState role[TSDB_MAX_REPLICA];
-} SNodesRole;
-
-typedef struct SStateMgr {
- void* data;
-
- int32_t (*getCurrentTerm)(struct SStateMgr* pMgr, SyncTerm* pCurrentTerm);
- int32_t (*persistCurrentTerm)(struct SStateMgr* pMgr, SyncTerm pCurrentTerm);
-
- int32_t (*getVoteFor)(struct SStateMgr* pMgr, SyncNodeId* pVoteFor);
- int32_t (*persistVoteFor)(struct SStateMgr* pMgr, SyncNodeId voteFor);
-
- int32_t (*getSyncCfg)(struct SStateMgr* pMgr, SSyncCfg* pSyncCfg);
- int32_t (*persistSyncCfg)(struct SStateMgr* pMgr, SSyncCfg* pSyncCfg);
-
-} SStateMgr;
-
-// ------------------ for message process -------------------
-
-// ---------------------------------------------
-typedef struct SyncPing {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType;
- SRaftId srcId;
- SRaftId destId;
- // private data
- uint32_t dataLen;
- char data[];
-} SyncPing;
-
-SyncPing* syncPingBuild(uint32_t dataLen);
-SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str);
-SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId);
-void syncPingDestroy(SyncPing* pMsg);
-void syncPingSerialize(const SyncPing* pMsg, char* buf, uint32_t bufLen);
-void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg);
-char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len);
-SyncPing* syncPingDeserialize2(const char* buf, uint32_t len);
-int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen);
-SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen);
-void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg);
-void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg);
-SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg);
-cJSON* syncPing2Json(const SyncPing* pMsg);
-char* syncPing2Str(const SyncPing* pMsg);
-
-// for debug ----------------------
-void syncPingPrint(const SyncPing* pMsg);
-void syncPingPrint2(char* s, const SyncPing* pMsg);
-void syncPingLog(const SyncPing* pMsg);
-void syncPingLog2(char* s, const SyncPing* pMsg);
-
-// ---------------------------------------------
-typedef struct SyncPingReply {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType;
- SRaftId srcId;
- SRaftId destId;
- // private data
- uint32_t dataLen;
- char data[];
-} SyncPingReply;
-
-SyncPingReply* syncPingReplyBuild(uint32_t dataLen);
-SyncPingReply* syncPingReplyBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str);
-SyncPingReply* syncPingReplyBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId);
-void syncPingReplyDestroy(SyncPingReply* pMsg);
-void syncPingReplySerialize(const SyncPingReply* pMsg, char* buf, uint32_t bufLen);
-void syncPingReplyDeserialize(const char* buf, uint32_t len, SyncPingReply* pMsg);
-char* syncPingReplySerialize2(const SyncPingReply* pMsg, uint32_t* len);
-SyncPingReply* syncPingReplyDeserialize2(const char* buf, uint32_t len);
-int32_t syncPingReplySerialize3(const SyncPingReply* pMsg, char* buf, int32_t bufLen);
-SyncPingReply* syncPingReplyDeserialize3(void* buf, int32_t bufLen);
-void syncPingReply2RpcMsg(const SyncPingReply* pMsg, SRpcMsg* pRpcMsg);
-void syncPingReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPingReply* pMsg);
-SyncPingReply* syncPingReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
-cJSON* syncPingReply2Json(const SyncPingReply* pMsg);
-char* syncPingReply2Str(const SyncPingReply* pMsg);
-
-// for debug ----------------------
-void syncPingReplyPrint(const SyncPingReply* pMsg);
-void syncPingReplyPrint2(char* s, const SyncPingReply* pMsg);
-void syncPingReplyLog(const SyncPingReply* pMsg);
-void syncPingReplyLog2(char* s, const SyncPingReply* pMsg);
-
-// ---------------------------------------------
-typedef enum ESyncTimeoutType {
- SYNC_TIMEOUT_PING = 100,
- SYNC_TIMEOUT_ELECTION,
- SYNC_TIMEOUT_HEARTBEAT,
-} ESyncTimeoutType;
-
-const char* syncTimerTypeStr(enum ESyncTimeoutType timerType);
-
-typedef struct SyncTimeout {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType;
- ESyncTimeoutType timeoutType;
- uint64_t logicClock;
- int32_t timerMS;
- void* data; // need optimized
-} SyncTimeout;
-
-SyncTimeout* syncTimeoutBuild();
-SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, uint64_t logicClock, int32_t timerMS, int32_t vgId,
- void* data);
-void syncTimeoutDestroy(SyncTimeout* pMsg);
-void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen);
-void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg);
-char* syncTimeoutSerialize2(const SyncTimeout* pMsg, uint32_t* len);
-SyncTimeout* syncTimeoutDeserialize2(const char* buf, uint32_t len);
-void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg);
-void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg);
-SyncTimeout* syncTimeoutFromRpcMsg2(const SRpcMsg* pRpcMsg);
-cJSON* syncTimeout2Json(const SyncTimeout* pMsg);
-char* syncTimeout2Str(const SyncTimeout* pMsg);
-
-// for debug ----------------------
-void syncTimeoutPrint(const SyncTimeout* pMsg);
-void syncTimeoutPrint2(char* s, const SyncTimeout* pMsg);
-void syncTimeoutLog(const SyncTimeout* pMsg);
-void syncTimeoutLog2(char* s, const SyncTimeout* pMsg);
-
-// ---------------------------------------------
-typedef struct SyncClientRequest {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST
- uint32_t originalRpcType; // origin RpcMsg msgType
- uint64_t seqNum;
- bool isWeak;
- uint32_t dataLen; // origin RpcMsg.contLen
- char data[]; // origin RpcMsg.pCont
-} SyncClientRequest;
-
-SyncClientRequest* syncClientRequestAlloc(uint32_t dataLen);
-int32_t syncClientRequestBuildFromRpcMsg(SRpcMsg* pClientRequestRpcMsg, const SRpcMsg* pOriginalRpcMsg, uint64_t seqNum,
- bool isWeak, int32_t vgId);
-int32_t syncClientRequestBuildFromNoopEntry(SRpcMsg* pClientRequestRpcMsg, const SSyncRaftEntry* pEntry, int32_t vgId);
-void syncClientRequest2RpcMsg(const SyncClientRequest* pMsg, SRpcMsg* pRpcMsg); // step 2
-void syncClientRequestFromRpcMsg(const SRpcMsg* pRpcMsg, SyncClientRequest* pMsg);
-cJSON* syncClientRequest2Json(const SyncClientRequest* pMsg);
-char* syncClientRequest2Str(const SyncClientRequest* pMsg);
-
-// for debug ----------------------
-void syncClientRequestPrint(const SyncClientRequest* pMsg);
-void syncClientRequestPrint2(char* s, const SyncClientRequest* pMsg);
-void syncClientRequestLog(const SyncClientRequest* pMsg);
-void syncClientRequestLog2(char* s, const SyncClientRequest* pMsg);
-
-// ---------------------------------------------
-typedef struct SRaftMeta {
- uint64_t seqNum;
- bool isWeak;
-} SRaftMeta;
-
-// block1:
-// block2: SRaftMeta array
-// block3: rpc msg array (with pCont pointer)
-
-typedef struct SyncClientRequestBatch {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType; // TDMT_SYNC_CLIENT_REQUEST_BATCH
- uint32_t dataCount;
- uint32_t dataLen;
- char data[]; // block2, block3
-} SyncClientRequestBatch;
-
-SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize,
- int32_t vgId);
-void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg);
-void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg);
-void syncClientRequestBatchDestroyDeep(SyncClientRequestBatch* pMsg);
-SRaftMeta* syncClientRequestBatchMetaArr(const SyncClientRequestBatch* pSyncMsg);
-SRpcMsg* syncClientRequestBatchRpcMsgArr(const SyncClientRequestBatch* pSyncMsg);
-SyncClientRequestBatch* syncClientRequestBatchFromRpcMsg(const SRpcMsg* pRpcMsg);
-cJSON* syncClientRequestBatch2Json(const SyncClientRequestBatch* pMsg);
-char* syncClientRequestBatch2Str(const SyncClientRequestBatch* pMsg);
-
-// for debug ----------------------
-void syncClientRequestBatchPrint(const SyncClientRequestBatch* pMsg);
-void syncClientRequestBatchPrint2(char* s, const SyncClientRequestBatch* pMsg);
-void syncClientRequestBatchLog(const SyncClientRequestBatch* pMsg);
-void syncClientRequestBatchLog2(char* s, const SyncClientRequestBatch* pMsg);
-
-// ---------------------------------------------
-typedef struct SyncClientRequestReply {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType;
- int32_t errCode;
- SRaftId leaderHint;
-} SyncClientRequestReply;
-
-// ---------------------------------------------
-typedef struct SyncRequestVote {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType;
- SRaftId srcId;
- SRaftId destId;
- // private data
- SyncTerm term;
- SyncIndex lastLogIndex;
- SyncTerm lastLogTerm;
-} SyncRequestVote;
-
-SyncRequestVote* syncRequestVoteBuild(int32_t vgId);
-void syncRequestVoteDestroy(SyncRequestVote* pMsg);
-void syncRequestVoteSerialize(const SyncRequestVote* pMsg, char* buf, uint32_t bufLen);
-void syncRequestVoteDeserialize(const char* buf, uint32_t len, SyncRequestVote* pMsg);
-char* syncRequestVoteSerialize2(const SyncRequestVote* pMsg, uint32_t* len);
-SyncRequestVote* syncRequestVoteDeserialize2(const char* buf, uint32_t len);
-void syncRequestVote2RpcMsg(const SyncRequestVote* pMsg, SRpcMsg* pRpcMsg);
-void syncRequestVoteFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVote* pMsg);
-SyncRequestVote* syncRequestVoteFromRpcMsg2(const SRpcMsg* pRpcMsg);
-cJSON* syncRequestVote2Json(const SyncRequestVote* pMsg);
-char* syncRequestVote2Str(const SyncRequestVote* pMsg);
-
-// for debug ----------------------
-void syncRequestVotePrint(const SyncRequestVote* pMsg);
-void syncRequestVotePrint2(char* s, const SyncRequestVote* pMsg);
-void syncRequestVoteLog(const SyncRequestVote* pMsg);
-void syncRequestVoteLog2(char* s, const SyncRequestVote* pMsg);
-
-// ---------------------------------------------
-typedef struct SyncRequestVoteReply {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType;
- SRaftId srcId;
- SRaftId destId;
- // private data
- SyncTerm term;
- bool voteGranted;
-} SyncRequestVoteReply;
-
-SyncRequestVoteReply* syncRequestVoteReplyBuild(int32_t vgId);
-void syncRequestVoteReplyDestroy(SyncRequestVoteReply* pMsg);
-void syncRequestVoteReplySerialize(const SyncRequestVoteReply* pMsg, char* buf, uint32_t bufLen);
-void syncRequestVoteReplyDeserialize(const char* buf, uint32_t len, SyncRequestVoteReply* pMsg);
-char* syncRequestVoteReplySerialize2(const SyncRequestVoteReply* pMsg, uint32_t* len);
-SyncRequestVoteReply* syncRequestVoteReplyDeserialize2(const char* buf, uint32_t len);
-void syncRequestVoteReply2RpcMsg(const SyncRequestVoteReply* pMsg, SRpcMsg* pRpcMsg);
-void syncRequestVoteReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncRequestVoteReply* pMsg);
-SyncRequestVoteReply* syncRequestVoteReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
-cJSON* syncRequestVoteReply2Json(const SyncRequestVoteReply* pMsg);
-char* syncRequestVoteReply2Str(const SyncRequestVoteReply* pMsg);
-
-// for debug ----------------------
-void syncRequestVoteReplyPrint(const SyncRequestVoteReply* pMsg);
-void syncRequestVoteReplyPrint2(char* s, const SyncRequestVoteReply* pMsg);
-void syncRequestVoteReplyLog(const SyncRequestVoteReply* pMsg);
-void syncRequestVoteReplyLog2(char* s, const SyncRequestVoteReply* pMsg);
-
-// ---------------------------------------------
-// data: entry
-
-typedef struct SyncAppendEntries {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType;
- SRaftId srcId;
- SRaftId destId;
-
- // private data
- SyncTerm term;
- SyncIndex prevLogIndex;
- SyncTerm prevLogTerm;
- SyncIndex commitIndex;
- SyncTerm privateTerm;
- uint32_t dataLen;
- char data[];
-} SyncAppendEntries;
-
-SyncAppendEntries* syncAppendEntriesBuild(uint32_t dataLen, int32_t vgId);
-void syncAppendEntriesDestroy(SyncAppendEntries* pMsg);
-void syncAppendEntriesSerialize(const SyncAppendEntries* pMsg, char* buf, uint32_t bufLen);
-void syncAppendEntriesDeserialize(const char* buf, uint32_t len, SyncAppendEntries* pMsg);
-char* syncAppendEntriesSerialize2(const SyncAppendEntries* pMsg, uint32_t* len);
-SyncAppendEntries* syncAppendEntriesDeserialize2(const char* buf, uint32_t len);
-void syncAppendEntries2RpcMsg(const SyncAppendEntries* pMsg, SRpcMsg* pRpcMsg);
-void syncAppendEntriesFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntries* pMsg);
-SyncAppendEntries* syncAppendEntriesFromRpcMsg2(const SRpcMsg* pRpcMsg);
-cJSON* syncAppendEntries2Json(const SyncAppendEntries* pMsg);
-char* syncAppendEntries2Str(const SyncAppendEntries* pMsg);
-
-// for debug ----------------------
-void syncAppendEntriesPrint(const SyncAppendEntries* pMsg);
-void syncAppendEntriesPrint2(char* s, const SyncAppendEntries* pMsg);
-void syncAppendEntriesLog(const SyncAppendEntries* pMsg);
-void syncAppendEntriesLog2(char* s, const SyncAppendEntries* pMsg);
-
-// ---------------------------------------------
-
-typedef struct SOffsetAndContLen {
- int32_t offset;
- int32_t contLen;
-} SOffsetAndContLen;
-
-// data:
-// block1: SOffsetAndContLen Array
-// block2: entry Array
-
-typedef struct SyncAppendEntriesBatch {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType;
- SRaftId srcId;
- SRaftId destId;
-
- // private data
- SyncTerm term;
- SyncIndex prevLogIndex;
- SyncTerm prevLogTerm;
- SyncIndex commitIndex;
- SyncTerm privateTerm;
- int32_t dataCount;
- uint32_t dataLen;
- char data[]; // block1, block2
-} SyncAppendEntriesBatch;
-
-SyncAppendEntriesBatch* syncAppendEntriesBatchBuild(SSyncRaftEntry** entryPArr, int32_t arrSize, int32_t vgId);
-SOffsetAndContLen* syncAppendEntriesBatchMetaTableArray(SyncAppendEntriesBatch* pMsg);
-void syncAppendEntriesBatchDestroy(SyncAppendEntriesBatch* pMsg);
-void syncAppendEntriesBatchSerialize(const SyncAppendEntriesBatch* pMsg, char* buf, uint32_t bufLen);
-void syncAppendEntriesBatchDeserialize(const char* buf, uint32_t len, SyncAppendEntriesBatch* pMsg);
-char* syncAppendEntriesBatchSerialize2(const SyncAppendEntriesBatch* pMsg, uint32_t* len);
-SyncAppendEntriesBatch* syncAppendEntriesBatchDeserialize2(const char* buf, uint32_t len);
-void syncAppendEntriesBatch2RpcMsg(const SyncAppendEntriesBatch* pMsg, SRpcMsg* pRpcMsg);
-void syncAppendEntriesBatchFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesBatch* pMsg);
-SyncAppendEntriesBatch* syncAppendEntriesBatchFromRpcMsg2(const SRpcMsg* pRpcMsg);
-
-// ---------------------------------------------
-typedef struct SyncAppendEntriesReply {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType;
- SRaftId srcId;
- SRaftId destId;
- // private data
- SyncTerm term;
- SyncTerm privateTerm;
- bool success;
- SyncIndex matchIndex;
- SyncIndex lastSendIndex;
- int64_t startTime;
-} SyncAppendEntriesReply;
-
-SyncAppendEntriesReply* syncAppendEntriesReplyBuild(int32_t vgId);
-void syncAppendEntriesReplyDestroy(SyncAppendEntriesReply* pMsg);
-void syncAppendEntriesReplySerialize(const SyncAppendEntriesReply* pMsg, char* buf, uint32_t bufLen);
-void syncAppendEntriesReplyDeserialize(const char* buf, uint32_t len, SyncAppendEntriesReply* pMsg);
-char* syncAppendEntriesReplySerialize2(const SyncAppendEntriesReply* pMsg, uint32_t* len);
-SyncAppendEntriesReply* syncAppendEntriesReplyDeserialize2(const char* buf, uint32_t len);
-void syncAppendEntriesReply2RpcMsg(const SyncAppendEntriesReply* pMsg, SRpcMsg* pRpcMsg);
-void syncAppendEntriesReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncAppendEntriesReply* pMsg);
-SyncAppendEntriesReply* syncAppendEntriesReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
-cJSON* syncAppendEntriesReply2Json(const SyncAppendEntriesReply* pMsg);
-char* syncAppendEntriesReply2Str(const SyncAppendEntriesReply* pMsg);
-
-// for debug ----------------------
-void syncAppendEntriesReplyPrint(const SyncAppendEntriesReply* pMsg);
-void syncAppendEntriesReplyPrint2(char* s, const SyncAppendEntriesReply* pMsg);
-void syncAppendEntriesReplyLog(const SyncAppendEntriesReply* pMsg);
-void syncAppendEntriesReplyLog2(char* s, const SyncAppendEntriesReply* pMsg);
-
-// ---------------------------------------------
-typedef struct SyncHeartbeat {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType;
- SRaftId srcId;
- SRaftId destId;
-
- // private data
- SyncTerm term;
- SyncIndex commitIndex;
- SyncTerm privateTerm;
- SyncTerm minMatchIndex;
-
-} SyncHeartbeat;
-
-SyncHeartbeat* syncHeartbeatBuild(int32_t vgId);
-void syncHeartbeatDestroy(SyncHeartbeat* pMsg);
-void syncHeartbeatSerialize(const SyncHeartbeat* pMsg, char* buf, uint32_t bufLen);
-void syncHeartbeatDeserialize(const char* buf, uint32_t len, SyncHeartbeat* pMsg);
-char* syncHeartbeatSerialize2(const SyncHeartbeat* pMsg, uint32_t* len);
-SyncHeartbeat* syncHeartbeatDeserialize2(const char* buf, uint32_t len);
-void syncHeartbeat2RpcMsg(const SyncHeartbeat* pMsg, SRpcMsg* pRpcMsg);
-void syncHeartbeatFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeat* pMsg);
-SyncHeartbeat* syncHeartbeatFromRpcMsg2(const SRpcMsg* pRpcMsg);
-cJSON* syncHeartbeat2Json(const SyncHeartbeat* pMsg);
-char* syncHeartbeat2Str(const SyncHeartbeat* pMsg);
-
-// for debug ----------------------
-void syncHeartbeatPrint(const SyncHeartbeat* pMsg);
-void syncHeartbeatPrint2(char* s, const SyncHeartbeat* pMsg);
-void syncHeartbeatLog(const SyncHeartbeat* pMsg);
-void syncHeartbeatLog2(char* s, const SyncHeartbeat* pMsg);
-
-// ---------------------------------------------
-typedef struct SyncHeartbeatReply {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType;
- SRaftId srcId;
- SRaftId destId;
-
- // private data
- SyncTerm term;
- SyncTerm privateTerm;
- int64_t startTime;
-} SyncHeartbeatReply;
-
-SyncHeartbeatReply* syncHeartbeatReplyBuild(int32_t vgId);
-void syncHeartbeatReplyDestroy(SyncHeartbeatReply* pMsg);
-void syncHeartbeatReplySerialize(const SyncHeartbeatReply* pMsg, char* buf, uint32_t bufLen);
-void syncHeartbeatReplyDeserialize(const char* buf, uint32_t len, SyncHeartbeatReply* pMsg);
-char* syncHeartbeatReplySerialize2(const SyncHeartbeatReply* pMsg, uint32_t* len);
-SyncHeartbeatReply* syncHeartbeatReplyDeserialize2(const char* buf, uint32_t len);
-void syncHeartbeatReply2RpcMsg(const SyncHeartbeatReply* pMsg, SRpcMsg* pRpcMsg);
-void syncHeartbeatReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncHeartbeatReply* pMsg);
-SyncHeartbeatReply* syncHeartbeatReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
-cJSON* syncHeartbeatReply2Json(const SyncHeartbeatReply* pMsg);
-char* syncHeartbeatReply2Str(const SyncHeartbeatReply* pMsg);
-
-// for debug ----------------------
-void syncHeartbeatReplyPrint(const SyncHeartbeatReply* pMsg);
-void syncHeartbeatReplyPrint2(char* s, const SyncHeartbeatReply* pMsg);
-void syncHeartbeatReplyLog(const SyncHeartbeatReply* pMsg);
-void syncHeartbeatReplyLog2(char* s, const SyncHeartbeatReply* pMsg);
-
-// ---------------------------------------------
-typedef struct SyncPreSnapshot {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType;
- SRaftId srcId;
- SRaftId destId;
-
- // private data
- SyncTerm term;
-
-} SyncPreSnapshot;
-
-SyncPreSnapshot* syncPreSnapshotBuild(int32_t vgId);
-void syncPreSnapshotDestroy(SyncPreSnapshot* pMsg);
-void syncPreSnapshotSerialize(const SyncPreSnapshot* pMsg, char* buf, uint32_t bufLen);
-void syncPreSnapshotDeserialize(const char* buf, uint32_t len, SyncPreSnapshot* pMsg);
-char* syncPreSnapshotSerialize2(const SyncPreSnapshot* pMsg, uint32_t* len);
-SyncPreSnapshot* syncPreSnapshotDeserialize2(const char* buf, uint32_t len);
-void syncPreSnapshot2RpcMsg(const SyncPreSnapshot* pMsg, SRpcMsg* pRpcMsg);
-void syncPreSnapshotFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshot* pMsg);
-SyncPreSnapshot* syncPreSnapshotFromRpcMsg2(const SRpcMsg* pRpcMsg);
-cJSON* syncPreSnapshot2Json(const SyncPreSnapshot* pMsg);
-char* syncPreSnapshot2Str(const SyncPreSnapshot* pMsg);
-
-// for debug ----------------------
-void syncPreSnapshotPrint(const SyncPreSnapshot* pMsg);
-void syncPreSnapshotPrint2(char* s, const SyncPreSnapshot* pMsg);
-void syncPreSnapshotLog(const SyncPreSnapshot* pMsg);
-void syncPreSnapshotLog2(char* s, const SyncPreSnapshot* pMsg);
-
-// ---------------------------------------------
-typedef struct SyncPreSnapshotReply {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType;
- SRaftId srcId;
- SRaftId destId;
-
- // private data
- SyncTerm term;
- SyncIndex snapStart;
-
-} SyncPreSnapshotReply;
-
-SyncPreSnapshotReply* syncPreSnapshotReplyBuild(int32_t vgId);
-void syncPreSnapshotReplyDestroy(SyncPreSnapshotReply* pMsg);
-void syncPreSnapshotReplySerialize(const SyncPreSnapshotReply* pMsg, char* buf, uint32_t bufLen);
-void syncPreSnapshotReplyDeserialize(const char* buf, uint32_t len, SyncPreSnapshotReply* pMsg);
-char* syncPreSnapshotReplySerialize2(const SyncPreSnapshotReply* pMsg, uint32_t* len);
-SyncPreSnapshotReply* syncPreSnapshotReplyDeserialize2(const char* buf, uint32_t len);
-void syncPreSnapshotReply2RpcMsg(const SyncPreSnapshotReply* pMsg, SRpcMsg* pRpcMsg);
-void syncPreSnapshotReplyFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPreSnapshotReply* pMsg);
-SyncPreSnapshotReply* syncPreSnapshotReplyFromRpcMsg2(const SRpcMsg* pRpcMsg);
-cJSON* syncPreSnapshotReply2Json(const SyncPreSnapshotReply* pMsg);
-char* syncPreSnapshotReply2Str(const SyncPreSnapshotReply* pMsg);
-
-// for debug ----------------------
-void syncPreSnapshotReplyPrint(const SyncPreSnapshotReply* pMsg);
-void syncPreSnapshotReplyPrint2(char* s, const SyncPreSnapshotReply* pMsg);
-void syncPreSnapshotReplyLog(const SyncPreSnapshotReply* pMsg);
-void syncPreSnapshotReplyLog2(char* s, const SyncPreSnapshotReply* pMsg);
-
-// ---------------------------------------------
-typedef struct SyncApplyMsg {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType; // user SyncApplyMsg msgType
- uint32_t originalRpcType; // user RpcMsg msgType
- SFsmCbMeta fsmMeta;
- uint32_t dataLen; // user RpcMsg.contLen
- char data[]; // user RpcMsg.pCont
-} SyncApplyMsg;
-
-SyncApplyMsg* syncApplyMsgBuild(uint32_t dataLen);
-SyncApplyMsg* syncApplyMsgBuild2(const SRpcMsg* pOriginalRpcMsg, int32_t vgId, SFsmCbMeta* pMeta);
-void syncApplyMsgDestroy(SyncApplyMsg* pMsg);
-void syncApplyMsgSerialize(const SyncApplyMsg* pMsg, char* buf, uint32_t bufLen);
-void syncApplyMsgDeserialize(const char* buf, uint32_t len, SyncApplyMsg* pMsg);
-char* syncApplyMsgSerialize2(const SyncApplyMsg* pMsg, uint32_t* len);
-SyncApplyMsg* syncApplyMsgDeserialize2(const char* buf, uint32_t len);
-void syncApplyMsg2RpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pRpcMsg); // SyncApplyMsg to SRpcMsg, put it into ApplyQ
-void syncApplyMsgFromRpcMsg(const SRpcMsg* pRpcMsg, SyncApplyMsg* pMsg); // get SRpcMsg from ApplyQ, to SyncApplyMsg
-SyncApplyMsg* syncApplyMsgFromRpcMsg2(const SRpcMsg* pRpcMsg);
-void syncApplyMsg2OriginalRpcMsg(const SyncApplyMsg* pMsg, SRpcMsg* pOriginalRpcMsg); // SyncApplyMsg to OriginalRpcMsg
-cJSON* syncApplyMsg2Json(const SyncApplyMsg* pMsg);
-char* syncApplyMsg2Str(const SyncApplyMsg* pMsg);
-
-// for debug ----------------------
-void syncApplyMsgPrint(const SyncApplyMsg* pMsg);
-void syncApplyMsgPrint2(char* s, const SyncApplyMsg* pMsg);
-void syncApplyMsgLog(const SyncApplyMsg* pMsg);
-void syncApplyMsgLog2(char* s, const SyncApplyMsg* pMsg);
-
-// ---------------------------------------------
-typedef struct SyncSnapshotSend {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType;
- SRaftId srcId;
- SRaftId destId;
-
- SyncTerm term;
- SyncIndex beginIndex; // snapshot.beginIndex
- SyncIndex lastIndex; // snapshot.lastIndex
- SyncTerm lastTerm; // snapshot.lastTerm
- SyncIndex lastConfigIndex; // snapshot.lastConfigIndex
- SSyncCfg lastConfig;
- int64_t startTime;
- int32_t seq;
- uint32_t dataLen;
- char data[];
-} SyncSnapshotSend;
-
-SyncSnapshotSend* syncSnapshotSendBuild(uint32_t dataLen, int32_t vgId);
-void syncSnapshotSendDestroy(SyncSnapshotSend* pMsg);
-void syncSnapshotSendSerialize(const SyncSnapshotSend* pMsg, char* buf, uint32_t bufLen);
-void syncSnapshotSendDeserialize(const char* buf, uint32_t len, SyncSnapshotSend* pMsg);
-char* syncSnapshotSendSerialize2(const SyncSnapshotSend* pMsg, uint32_t* len);
-SyncSnapshotSend* syncSnapshotSendDeserialize2(const char* buf, uint32_t len);
-void syncSnapshotSend2RpcMsg(const SyncSnapshotSend* pMsg, SRpcMsg* pRpcMsg);
-void syncSnapshotSendFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotSend* pMsg);
-SyncSnapshotSend* syncSnapshotSendFromRpcMsg2(const SRpcMsg* pRpcMsg);
-cJSON* syncSnapshotSend2Json(const SyncSnapshotSend* pMsg);
-char* syncSnapshotSend2Str(const SyncSnapshotSend* pMsg);
-
-// for debug ----------------------
-void syncSnapshotSendPrint(const SyncSnapshotSend* pMsg);
-void syncSnapshotSendPrint2(char* s, const SyncSnapshotSend* pMsg);
-void syncSnapshotSendLog(const SyncSnapshotSend* pMsg);
-void syncSnapshotSendLog2(char* s, const SyncSnapshotSend* pMsg);
-
-// ---------------------------------------------
-typedef struct SyncSnapshotRsp {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType;
- SRaftId srcId;
- SRaftId destId;
-
- SyncTerm term;
- SyncIndex lastIndex;
- SyncTerm lastTerm;
- int64_t startTime;
- int32_t ack;
- int32_t code;
- SyncIndex snapBeginIndex; // when ack = SYNC_SNAPSHOT_SEQ_BEGIN, it's valid
-} SyncSnapshotRsp;
-
-SyncSnapshotRsp* syncSnapshotRspBuild(int32_t vgId);
-void syncSnapshotRspDestroy(SyncSnapshotRsp* pMsg);
-void syncSnapshotRspSerialize(const SyncSnapshotRsp* pMsg, char* buf, uint32_t bufLen);
-void syncSnapshotRspDeserialize(const char* buf, uint32_t len, SyncSnapshotRsp* pMsg);
-char* syncSnapshotRspSerialize2(const SyncSnapshotRsp* pMsg, uint32_t* len);
-SyncSnapshotRsp* syncSnapshotRspDeserialize2(const char* buf, uint32_t len);
-void syncSnapshotRsp2RpcMsg(const SyncSnapshotRsp* pMsg, SRpcMsg* pRpcMsg);
-void syncSnapshotRspFromRpcMsg(const SRpcMsg* pRpcMsg, SyncSnapshotRsp* pMsg);
-SyncSnapshotRsp* syncSnapshotRspFromRpcMsg2(const SRpcMsg* pRpcMsg);
-cJSON* syncSnapshotRsp2Json(const SyncSnapshotRsp* pMsg);
-char* syncSnapshotRsp2Str(const SyncSnapshotRsp* pMsg);
-
-// for debug ----------------------
-void syncSnapshotRspPrint(const SyncSnapshotRsp* pMsg);
-void syncSnapshotRspPrint2(char* s, const SyncSnapshotRsp* pMsg);
-void syncSnapshotRspLog(const SyncSnapshotRsp* pMsg);
-void syncSnapshotRspLog2(char* s, const SyncSnapshotRsp* pMsg);
-
-// ---------------------------------------------
-typedef struct SyncLeaderTransfer {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType;
- /*
- SRaftId srcId;
- SRaftId destId;
- */
- SNodeInfo newNodeInfo;
- SRaftId newLeaderId;
-} SyncLeaderTransfer;
-
-SyncLeaderTransfer* syncLeaderTransferBuild(int32_t vgId);
-void syncLeaderTransferDestroy(SyncLeaderTransfer* pMsg);
-void syncLeaderTransferSerialize(const SyncLeaderTransfer* pMsg, char* buf, uint32_t bufLen);
-void syncLeaderTransferDeserialize(const char* buf, uint32_t len, SyncLeaderTransfer* pMsg);
-char* syncLeaderTransferSerialize2(const SyncLeaderTransfer* pMsg, uint32_t* len);
-SyncLeaderTransfer* syncLeaderTransferDeserialize2(const char* buf, uint32_t len);
-void syncLeaderTransfer2RpcMsg(const SyncLeaderTransfer* pMsg, SRpcMsg* pRpcMsg);
-void syncLeaderTransferFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLeaderTransfer* pMsg);
-SyncLeaderTransfer* syncLeaderTransferFromRpcMsg2(const SRpcMsg* pRpcMsg);
-cJSON* syncLeaderTransfer2Json(const SyncLeaderTransfer* pMsg);
-char* syncLeaderTransfer2Str(const SyncLeaderTransfer* pMsg);
-
-typedef enum {
- SYNC_LOCAL_CMD_STEP_DOWN = 100,
- SYNC_LOCAL_CMD_FOLLOWER_CMT,
-} ESyncLocalCmd;
-
-const char* syncLocalCmdGetStr(int32_t cmd);
-
-typedef struct SyncLocalCmd {
- uint32_t bytes;
- int32_t vgId;
- uint32_t msgType;
- SRaftId srcId;
- SRaftId destId;
-
- int32_t cmd;
- SyncTerm sdNewTerm; // step down new term
- SyncIndex fcIndex;// follower commit index
-
-} SyncLocalCmd;
-
-SyncLocalCmd* syncLocalCmdBuild(int32_t vgId);
-void syncLocalCmdDestroy(SyncLocalCmd* pMsg);
-void syncLocalCmdSerialize(const SyncLocalCmd* pMsg, char* buf, uint32_t bufLen);
-void syncLocalCmdDeserialize(const char* buf, uint32_t len, SyncLocalCmd* pMsg);
-char* syncLocalCmdSerialize2(const SyncLocalCmd* pMsg, uint32_t* len);
-SyncLocalCmd* syncLocalCmdDeserialize2(const char* buf, uint32_t len);
-void syncLocalCmd2RpcMsg(const SyncLocalCmd* pMsg, SRpcMsg* pRpcMsg);
-void syncLocalCmdFromRpcMsg(const SRpcMsg* pRpcMsg, SyncLocalCmd* pMsg);
-SyncLocalCmd* syncLocalCmdFromRpcMsg2(const SRpcMsg* pRpcMsg);
-cJSON* syncLocalCmd2Json(const SyncLocalCmd* pMsg);
-char* syncLocalCmd2Str(const SyncLocalCmd* pMsg);
-
-// for debug ----------------------
-void syncLocalCmdPrint(const SyncLocalCmd* pMsg);
-void syncLocalCmdPrint2(char* s, const SyncLocalCmd* pMsg);
-void syncLocalCmdLog(const SyncLocalCmd* pMsg);
-void syncLocalCmdLog2(char* s, const SyncLocalCmd* pMsg);
-
-// on message ----------------------
-int32_t syncNodeOnPing(SSyncNode* ths, SyncPing* pMsg);
-int32_t syncNodeOnPingReply(SSyncNode* ths, SyncPingReply* pMsg);
-
-int32_t syncNodeOnRequestVote(SSyncNode* ths, SyncRequestVote* pMsg);
-int32_t syncNodeOnRequestVoteReply(SSyncNode* ths, SyncRequestVoteReply* pMsg);
-
-int32_t syncNodeOnAppendEntries(SSyncNode* ths, SyncAppendEntries* pMsg);
-int32_t syncNodeOnAppendEntriesReply(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
-
-int32_t syncNodeOnPreSnapshot(SSyncNode* ths, SyncPreSnapshot* pMsg);
-int32_t syncNodeOnPreSnapshotReply(SSyncNode* ths, SyncPreSnapshotReply* pMsg);
-
-int32_t syncNodeOnSnapshot(SSyncNode* ths, SyncSnapshotSend* pMsg);
-int32_t syncNodeOnSnapshotReply(SSyncNode* ths, SyncSnapshotRsp* pMsg);
-
-int32_t syncNodeOnHeartbeat(SSyncNode* ths, SyncHeartbeat* pMsg);
-int32_t syncNodeOnHeartbeatReply(SSyncNode* ths, SyncHeartbeatReply* pMsg);
-
-int32_t syncNodeOnClientRequest(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
-int32_t syncNodeOnLocalCmd(SSyncNode* ths, SyncLocalCmd* pMsg);
-
-// -----------------------------------------
-typedef int32_t (*FpOnPingCb)(SSyncNode* ths, SyncPing* pMsg);
-typedef int32_t (*FpOnPingReplyCb)(SSyncNode* ths, SyncPingReply* pMsg);
-typedef int32_t (*FpOnClientRequestCb)(SSyncNode* ths, SRpcMsg* pMsg, SyncIndex* pRetIndex);
-typedef int32_t (*FpOnRequestVoteCb)(SSyncNode* ths, SyncRequestVote* pMsg);
-typedef int32_t (*FpOnRequestVoteReplyCb)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
-typedef int32_t (*FpOnAppendEntriesCb)(SSyncNode* ths, SyncAppendEntries* pMsg);
-typedef int32_t (*FpOnAppendEntriesReplyCb)(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
-typedef int32_t (*FpOnTimeoutCb)(SSyncNode* pSyncNode, SyncTimeout* pMsg);
-typedef int32_t (*FpOnSnapshotCb)(SSyncNode* ths, SyncSnapshotSend* pMsg);
-typedef int32_t (*FpOnSnapshotReplyCb)(SSyncNode* ths, SyncSnapshotRsp* pMsg);
-
-// option ----------------------------------
-bool syncNodeSnapshotEnable(SSyncNode* pSyncNode);
-ESyncStrategy syncNodeStrategy(SSyncNode* pSyncNode);
-
-// ---------------------------------------------
-
-#ifdef __cplusplus
-}
-#endif
-
-#endif /*_TD_LIBS_SYNC_TOOLS_H*/
diff --git a/source/libs/sync/src/syncAppendEntries.c b/source/libs/sync/src/syncAppendEntries.c
index 792ce67cd4c9be0eef09fe6b11ba0857ffb92f1d..c9c1baa4bc4fa37b69ec8d4cbb65c271494385ae 100644
--- a/source/libs/sync/src/syncAppendEntries.c
+++ b/source/libs/sync/src/syncAppendEntries.c
@@ -13,15 +13,11 @@
* along with this program. If not, see .
*/
+#define _DEFAULT_SOURCE
#include "syncAppendEntries.h"
-#include "syncInt.h"
-#include "syncRaftCfg.h"
+#include "syncMessage.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
-#include "syncSnapshot.h"
-#include "syncUtil.h"
-#include "syncVoteMgr.h"
-#include "wal.h"
// TLA+ Spec
// HandleAppendEntriesRequest(i, j, m) ==
diff --git a/source/libs/sync/src/syncAppendEntriesReply.c b/source/libs/sync/src/syncAppendEntriesReply.c
index cf7c391a1d6c69b358b193666c7edc2d7f434fab..53d6b5d92f672f2ea7a1dd653be440c140318005 100644
--- a/source/libs/sync/src/syncAppendEntriesReply.c
+++ b/source/libs/sync/src/syncAppendEntriesReply.c
@@ -13,17 +13,14 @@
* along with this program. If not, see .
*/
+#define _DEFAULT_SOURCE
#include "syncAppendEntriesReply.h"
+#include "syncMessage.h"
#include "syncCommit.h"
#include "syncIndexMgr.h"
-#include "syncInt.h"
-#include "syncRaftCfg.h"
-#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncReplication.h"
#include "syncSnapshot.h"
-#include "syncUtil.h"
-#include "syncVoteMgr.h"
// TLA+ Spec
// HandleAppendEntriesResponse(i, j, m) ==
diff --git a/source/libs/sync/src/syncCommit.c b/source/libs/sync/src/syncCommit.c
index a951b78e1ea44cbcad16e12739c6006e33d2c7e1..d2320fc6beec2a4fba966f06a0471e168f4cdf49 100644
--- a/source/libs/sync/src/syncCommit.c
+++ b/source/libs/sync/src/syncCommit.c
@@ -13,10 +13,9 @@
* along with this program. If not, see .
*/
+#define _DEFAULT_SOURCE
#include "syncCommit.h"
#include "syncIndexMgr.h"
-#include "syncInt.h"
-#include "syncRaftCfg.h"
#include "syncRaftLog.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c
index 85e45728775b57b674a2f11a6feb72580e9f279f..123ce5b581447c7fe05037ededbca8d212a29e5c 100644
--- a/source/libs/sync/src/syncElection.c
+++ b/source/libs/sync/src/syncElection.c
@@ -18,8 +18,8 @@
#include "syncMessage.h"
#include "syncRaftCfg.h"
#include "syncRaftStore.h"
-#include "syncVoteMgr.h"
#include "syncUtil.h"
+#include "syncVoteMgr.h"
// TLA+ Spec
// RequestVote(i, j) ==
diff --git a/source/libs/sync/src/syncEnv.c b/source/libs/sync/src/syncEnv.c
index 3f3b794f46cb5c3b82b7d5476f400b71aa066c8e..a0e0a5a2c2df3ee4bbc593d11f3ec931fded140a 100644
--- a/source/libs/sync/src/syncEnv.c
+++ b/source/libs/sync/src/syncEnv.c
@@ -105,6 +105,7 @@ void syncEnvStopTimer() {
#endif
static void syncEnvTick(void *param, void *tmrId) {
+#if 0
SSyncEnv *pSyncEnv = param;
if (atomic_load_64(&gSyncEnv.envTickTimerLogicClockUser) <= atomic_load_64(&gSyncEnv.envTickTimerLogicClock)) {
gSyncEnv.envTickTimerCounter++;
@@ -121,4 +122,5 @@ static void syncEnvTick(void *param, void *tmrId) {
gSyncEnv.envTickTimerLogicClockUser, gSyncEnv.envTickTimerLogicClock, gSyncEnv.envTickTimerCounter,
gSyncEnv.envTickTimerMS, tmrId);
}
+#endif
}
diff --git a/source/libs/sync/src/syncIndexMgr.c b/source/libs/sync/src/syncIndexMgr.c
index 8e78aeedc335c1368dc62435cf21dbb205316e11..ca5e531528afd76fbbb68e61cbe3224e337b89a6 100644
--- a/source/libs/sync/src/syncIndexMgr.c
+++ b/source/libs/sync/src/syncIndexMgr.c
@@ -13,18 +13,16 @@
* along with this program. If not, see .
*/
+#define _DEFAULT_SOURCE
#include "syncIndexMgr.h"
#include "syncUtil.h"
-// SMatchIndex -----------------------------
-
SSyncIndexMgr *syncIndexMgrCreate(SSyncNode *pSyncNode) {
- SSyncIndexMgr *pSyncIndexMgr = taosMemoryMalloc(sizeof(SSyncIndexMgr));
+ SSyncIndexMgr *pSyncIndexMgr = taosMemoryCalloc(1, sizeof(SSyncIndexMgr));
if (pSyncIndexMgr == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
- memset(pSyncIndexMgr, 0, sizeof(SSyncIndexMgr));
pSyncIndexMgr->replicas = &(pSyncNode->replicasId);
pSyncIndexMgr->replicaNum = pSyncNode->replicaNum;
@@ -97,54 +95,6 @@ SyncIndex syncIndexMgrGetIndex(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaf
return SYNC_INDEX_INVALID;
}
-cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
- char u64buf[128] = {0};
- cJSON *pRoot = cJSON_CreateObject();
-
- if (pSyncIndexMgr != NULL) {
- cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncIndexMgr->replicaNum);
- cJSON *pReplicas = cJSON_CreateArray();
- cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
- for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
- cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pSyncIndexMgr->replicas))[i]));
- }
-
- {
- int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum);
- for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
- arr[i] = pSyncIndexMgr->index[i];
- }
- cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum);
- taosMemoryFree(arr);
- cJSON_AddItemToObject(pRoot, "index", pIndex);
- }
-
- {
- int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum);
- for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
- arr[i] = pSyncIndexMgr->privateTerm[i];
- }
- cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum);
- taosMemoryFree(arr);
- cJSON_AddItemToObject(pRoot, "privateTerm", pIndex);
- }
-
- snprintf(u64buf, sizeof(u64buf), "%p", pSyncIndexMgr->pSyncNode);
- cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
- }
-
- cJSON *pJson = cJSON_CreateObject();
- cJSON_AddItemToObject(pJson, "pSyncIndexMgr", pRoot);
- return pJson;
-}
-
-char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
- cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
- char *serialized = cJSON_Print(pJson);
- cJSON_Delete(pJson);
- return serialized;
-}
-
void syncIndexMgrSetStartTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, int64_t startTime) {
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
@@ -201,35 +151,6 @@ int64_t syncIndexMgrGetRecvTime(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRa
return -1;
}
-// for debug -------------------
-void syncIndexMgrPrint(SSyncIndexMgr *pObj) {
- char *serialized = syncIndexMgr2Str(pObj);
- printf("syncIndexMgrPrint | len:%" PRIu64 " | %s \n", (uint64_t)strlen(serialized), serialized);
- fflush(NULL);
- taosMemoryFree(serialized);
-}
-
-void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj) {
- char *serialized = syncIndexMgr2Str(pObj);
- printf("syncIndexMgrPrint2 | len:%" PRIu64 " | %s | %s \n", (uint64_t)strlen(serialized), s, serialized);
- fflush(NULL);
- taosMemoryFree(serialized);
-}
-
-void syncIndexMgrLog(SSyncIndexMgr *pObj) {
- char *serialized = syncIndexMgr2Str(pObj);
- sTrace("syncIndexMgrLog | len:%" PRIu64 " | %s", (uint64_t)strlen(serialized), serialized);
- taosMemoryFree(serialized);
-}
-
-void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj) {
- if (gRaftDetailLog) {
- char *serialized = syncIndexMgr2Str(pObj);
- sTrace("syncIndexMgrLog2 | len:%" PRIu64 " | %s | %s", (uint64_t)strlen(serialized), s, serialized);
- taosMemoryFree(serialized);
- }
-}
-
void syncIndexMgrSetTerm(SSyncIndexMgr *pSyncIndexMgr, const SRaftId *pRaftId, SyncTerm term) {
for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
if (syncUtilSameId(&((*(pSyncIndexMgr->replicas))[i]), pRaftId)) {
diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c
index 3fcb563f3bced428398525c189fc2ead28b57dec..565b21eb555876cbab0d3f8353695298a887e1be 100644
--- a/source/libs/sync/src/syncMessage.c
+++ b/source/libs/sync/src/syncMessage.c
@@ -13,6 +13,7 @@
* along with this program. If not, see .
*/
+#define _DEFAULT_SOURCE
#include "syncMessage.h"
#include "syncRaftCfg.h"
#include "syncRaftEntry.h"
@@ -150,31 +151,6 @@ void syncTimeoutLog2(char* s, const SyncTimeout* pMsg) {
}
}
-// ---- message process SyncPing----
-SyncPing* syncPingBuild(uint32_t dataLen) {
- uint32_t bytes = sizeof(SyncPing) + dataLen;
- SyncPing* pMsg = taosMemoryMalloc(bytes);
- memset(pMsg, 0, bytes);
- pMsg->bytes = bytes;
- pMsg->msgType = TDMT_SYNC_PING;
- pMsg->dataLen = dataLen;
- return pMsg;
-}
-
-SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str) {
- uint32_t dataLen = strlen(str) + 1;
- SyncPing* pMsg = syncPingBuild(dataLen);
- pMsg->vgId = vgId;
- pMsg->srcId = *srcId;
- pMsg->destId = *destId;
- snprintf(pMsg->data, pMsg->dataLen, "%s", str);
- return pMsg;
-}
-
-SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId) {
- SyncPing* pMsg = syncPingBuild2(srcId, destId, vgId, "ping");
- return pMsg;
-}
void syncPingDestroy(SyncPing* pMsg) {
if (pMsg != NULL) {
@@ -193,16 +169,6 @@ void syncPingDeserialize(const char* buf, uint32_t len, SyncPing* pMsg) {
ASSERT(pMsg->bytes == sizeof(SyncPing) + pMsg->dataLen);
}
-char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len) {
- char* buf = taosMemoryMalloc(pMsg->bytes);
- ASSERT(buf != NULL);
- syncPingSerialize(pMsg, buf, pMsg->bytes);
- if (len != NULL) {
- *len = pMsg->bytes;
- }
- return buf;
-}
-
SyncPing* syncPingDeserialize2(const char* buf, uint32_t len) {
uint32_t bytes = *((uint32_t*)buf);
SyncPing* pMsg = taosMemoryMalloc(bytes);
@@ -212,117 +178,6 @@ SyncPing* syncPingDeserialize2(const char* buf, uint32_t len) {
return pMsg;
}
-int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen) {
- SEncoder encoder = {0};
- tEncoderInit(&encoder, buf, bufLen);
- if (tStartEncode(&encoder) < 0) {
- return -1;
- }
-
- if (tEncodeU32(&encoder, pMsg->bytes) < 0) {
- return -1;
- }
- if (tEncodeI32(&encoder, pMsg->vgId) < 0) {
- return -1;
- }
- if (tEncodeU32(&encoder, pMsg->msgType) < 0) {
- return -1;
- }
- if (tEncodeU64(&encoder, pMsg->srcId.addr) < 0) {
- return -1;
- }
- if (tEncodeI32(&encoder, pMsg->srcId.vgId) < 0) {
- return -1;
- }
- if (tEncodeU64(&encoder, pMsg->destId.addr) < 0) {
- return -1;
- }
- if (tEncodeI32(&encoder, pMsg->destId.vgId) < 0) {
- return -1;
- }
- if (tEncodeU32(&encoder, pMsg->dataLen) < 0) {
- return -1;
- }
- if (tEncodeBinary(&encoder, pMsg->data, pMsg->dataLen)) {
- return -1;
- }
-
- tEndEncode(&encoder);
- int32_t tlen = encoder.pos;
- tEncoderClear(&encoder);
- return tlen;
-}
-
-SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) {
- SDecoder decoder = {0};
- tDecoderInit(&decoder, buf, bufLen);
- if (tStartDecode(&decoder) < 0) {
- return NULL;
- }
-
- SyncPing* pMsg = NULL;
- uint32_t bytes;
- if (tDecodeU32(&decoder, &bytes) < 0) {
- return NULL;
- }
-
- pMsg = taosMemoryMalloc(bytes);
- ASSERT(pMsg != NULL);
- pMsg->bytes = bytes;
-
- if (tDecodeI32(&decoder, &pMsg->vgId) < 0) {
- taosMemoryFree(pMsg);
- return NULL;
- }
- if (tDecodeU32(&decoder, &pMsg->msgType) < 0) {
- taosMemoryFree(pMsg);
- return NULL;
- }
- if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) {
- taosMemoryFree(pMsg);
- return NULL;
- }
- if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) {
- taosMemoryFree(pMsg);
- return NULL;
- }
- if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) {
- taosMemoryFree(pMsg);
- return NULL;
- }
- if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) {
- taosMemoryFree(pMsg);
- return NULL;
- }
- if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) {
- taosMemoryFree(pMsg);
- return NULL;
- }
- uint32_t len;
- char* data = NULL;
- if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) {
- taosMemoryFree(pMsg);
- return NULL;
- }
- ASSERT(len == pMsg->dataLen);
- memcpy(pMsg->data, data, len);
-
- tEndDecode(&decoder);
- tDecoderClear(&decoder);
- return pMsg;
-}
-
-void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) {
- memset(pRpcMsg, 0, sizeof(*pRpcMsg));
- pRpcMsg->msgType = pMsg->msgType;
- pRpcMsg->contLen = pMsg->bytes;
- pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
- syncPingSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
-}
-
-void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) {
- syncPingDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
-}
SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) {
SyncPing* pMsg = syncPingDeserialize2(pRpcMsg->pCont, pRpcMsg->contLen);
@@ -330,96 +185,6 @@ SyncPing* syncPingFromRpcMsg2(const SRpcMsg* pRpcMsg) {
return pMsg;
}
-cJSON* syncPing2Json(const SyncPing* pMsg) {
- char u64buf[128] = {0};
- cJSON* pRoot = cJSON_CreateObject();
-
- if (pMsg != NULL) {
- cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
- cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
- cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
-
- cJSON* pSrcId = cJSON_CreateObject();
- snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
- cJSON_AddStringToObject(pSrcId, "addr", u64buf);
- {
- uint64_t u64 = pMsg->srcId.addr;
- cJSON* pTmp = pSrcId;
- char host[128] = {0};
- uint16_t port;
- syncUtilU642Addr(u64, host, sizeof(host), &port);
- cJSON_AddStringToObject(pTmp, "addr_host", host);
- cJSON_AddNumberToObject(pTmp, "addr_port", port);
- }
- cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
- cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
-
- cJSON* pDestId = cJSON_CreateObject();
- snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
- cJSON_AddStringToObject(pDestId, "addr", u64buf);
- {
- uint64_t u64 = pMsg->destId.addr;
- cJSON* pTmp = pDestId;
- char host[128] = {0};
- uint16_t port;
- syncUtilU642Addr(u64, host, sizeof(host), &port);
- cJSON_AddStringToObject(pTmp, "addr_host", host);
- cJSON_AddNumberToObject(pTmp, "addr_port", port);
- }
- cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
- cJSON_AddItemToObject(pRoot, "destId", pDestId);
-
- cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
- char* s;
- s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen);
- cJSON_AddStringToObject(pRoot, "data", s);
- taosMemoryFree(s);
- s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen);
- cJSON_AddStringToObject(pRoot, "data2", s);
- taosMemoryFree(s);
- }
-
- cJSON* pJson = cJSON_CreateObject();
- cJSON_AddItemToObject(pJson, "SyncPing", pRoot);
- return pJson;
-}
-
-char* syncPing2Str(const SyncPing* pMsg) {
- cJSON* pJson = syncPing2Json(pMsg);
- char* serialized = cJSON_Print(pJson);
- cJSON_Delete(pJson);
- return serialized;
-}
-
-// for debug ----------------------
-void syncPingPrint(const SyncPing* pMsg) {
- char* serialized = syncPing2Str(pMsg);
- printf("syncPingPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
- fflush(NULL);
- taosMemoryFree(serialized);
-}
-
-void syncPingPrint2(char* s, const SyncPing* pMsg) {
- char* serialized = syncPing2Str(pMsg);
- printf("syncPingPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
- fflush(NULL);
- taosMemoryFree(serialized);
-}
-
-void syncPingLog(const SyncPing* pMsg) {
- char* serialized = syncPing2Str(pMsg);
- sTrace("syncPingLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
- taosMemoryFree(serialized);
-}
-
-void syncPingLog2(char* s, const SyncPing* pMsg) {
- if (gRaftDetailLog) {
- char* serialized = syncPing2Str(pMsg);
- sTrace("syncPingLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
- taosMemoryFree(serialized);
- }
-}
-
// ---- message process SyncPingReply----
SyncPingReply* syncPingReplyBuild(uint32_t dataLen) {
uint32_t bytes = sizeof(SyncPingReply) + dataLen;
diff --git a/source/libs/sync/src/syncRequestVote.c b/source/libs/sync/src/syncRequestVote.c
index f2b75def6b58425e68d78a4d1f02cf0b69a887c3..bf44341acd16bfbef42351d911fe1ee44aad9a44 100644
--- a/source/libs/sync/src/syncRequestVote.c
+++ b/source/libs/sync/src/syncRequestVote.c
@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "syncRequestVote.h"
+#include "syncMessage.h"
#include "syncRaftCfg.h"
#include "syncRaftStore.h"
#include "syncUtil.h"
diff --git a/source/libs/sync/src/syncRequestVoteReply.c b/source/libs/sync/src/syncRequestVoteReply.c
index 02b9bb40acbe2ab8c98a677a1c80c00ab2940b20..1acf16507aad7b8b8bafd438b288dd56ad7f0f91 100644
--- a/source/libs/sync/src/syncRequestVoteReply.c
+++ b/source/libs/sync/src/syncRequestVoteReply.c
@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "syncRequestVoteReply.h"
+#include "syncMessage.h"
#include "syncRaftStore.h"
#include "syncVoteMgr.h"
diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c
index 4ca4e26becfad520702711636846fa353b4ab55e..8a0a35ce336bfa9d1ed1d01fb96b79a98da24edb 100644
--- a/source/libs/sync/src/syncVoteMgr.c
+++ b/source/libs/sync/src/syncVoteMgr.c
@@ -15,6 +15,7 @@
#define _DEFAULT_SOURCE
#include "syncVoteMgr.h"
+#include "syncMessage.h"
#include "syncUtil.h"
static void voteGrantedClearVotes(SVotesGranted *pVotesGranted) {
diff --git a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
index fb227dfad34d7bfcdcea58806ce7ab00317d3bcd..c29def9ca33849a2aee27f859fb2e2d257cdb32a 100644
--- a/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
+++ b/source/libs/sync/test/syncConfigChangeSnapshotTest.cpp
@@ -143,7 +143,7 @@ int32_t SnapshotDoWrite(struct SSyncFSM* pFsm, void* pWriter, void* pBuf, int32_
void RestoreFinishCb(struct SSyncFSM* pFsm) { sTrace("==callback== ==RestoreFinishCb=="); }
void ReConfigCb(struct SSyncFSM* pFsm, const SRpcMsg* pMsg, SReConfigCbMeta* cbMeta) {
- sTrace("==callback== ==ReConfigCb== flag:0x%" PRIx64 ", index:%" PRId64 ", code:%d, currentTerm:%" PRIu64
+ sTrace("==callback== ==ReConfigCb== flag:%" PRIx64 ", index:%" PRId64 ", code:%d, currentTerm:%" PRIu64
", term:%" PRIu64,
cbMeta->flag, cbMeta->index, cbMeta->code, cbMeta->currentTerm, cbMeta->term);
}
diff --git a/source/libs/sync/test/sync_test_lib/inc/syncTest.h b/source/libs/sync/test/sync_test_lib/inc/syncTest.h
index 50d9d12eeed947cf2fefda1966e5b4abc188c003..93da2df5a20b8cda6c8eab60cb3566bcaef596af 100644
--- a/source/libs/sync/test/sync_test_lib/inc/syncTest.h
+++ b/source/libs/sync/test/sync_test_lib/inc/syncTest.h
@@ -110,7 +110,37 @@ char* snapshotSender2Str(SSyncSnapshotSender* pSender);
cJSON* snapshotReceiver2Json(SSyncSnapshotReceiver* pReceiver);
char* snapshotReceiver2Str(SSyncSnapshotReceiver* pReceiver);
-
+cJSON* syncIndexMgr2Json(SSyncIndexMgr* pSyncIndexMgr);
+char* syncIndexMgr2Str(SSyncIndexMgr* pSyncIndexMgr);
+void syncIndexMgrPrint(SSyncIndexMgr* pObj);
+void syncIndexMgrPrint2(char* s, SSyncIndexMgr* pObj);
+void syncIndexMgrLog(SSyncIndexMgr* pObj);
+void syncIndexMgrLog2(char* s, SSyncIndexMgr* pObj);
+
+cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg);
+cJSON* syncRpcUnknownMsg2Json();
+char* syncRpcMsg2Str(SRpcMsg* pRpcMsg);
+void syncRpcMsgPrint(SRpcMsg* pMsg);
+void syncRpcMsgPrint2(char* s, SRpcMsg* pMsg);
+void syncRpcMsgLog(SRpcMsg* pMsg);
+void syncRpcMsgLog2(char* s, SRpcMsg* pMsg);
+
+
+// origin syncMessage
+SyncPing* syncPingBuild(uint32_t dataLen);
+SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str);
+SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId);
+char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len);
+int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen);
+SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen);
+void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg);
+void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg);
+cJSON* syncPing2Json(const SyncPing* pMsg);
+char* syncPing2Str(const SyncPing* pMsg);
+void syncPingPrint(const SyncPing* pMsg);
+void syncPingPrint2(char* s, const SyncPing* pMsg);
+void syncPingLog(const SyncPing* pMsg);
+void syncPingLog2(char* s, const SyncPing* pMsg);
#ifdef __cplusplus
}
diff --git a/source/libs/sync/test/sync_test_lib/src/syncIndexMgrDebug.c b/source/libs/sync/test/sync_test_lib/src/syncIndexMgrDebug.c
new file mode 100644
index 0000000000000000000000000000000000000000..1d3198c51d285f7a3de97e1c59b0676e1b15142c
--- /dev/null
+++ b/source/libs/sync/test/sync_test_lib/src/syncIndexMgrDebug.c
@@ -0,0 +1,93 @@
+/*
+ * Copyright (c) 2019 TAOS Data, Inc.
+ *
+ * This program is free software: you can use, redistribute, and/or modify
+ * it under the terms of the GNU Affero General Public License, version 3
+ * or later ("AGPL"), as published by the Free Software Foundation.
+ *
+ * This program is distributed in the hope that it will be useful, but WITHOUT
+ * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
+ * FITNESS FOR A PARTICULAR PURPOSE.
+ *
+ * You should have received a copy of the GNU Affero General Public License
+ * along with this program. If not, see .
+ */
+
+#define _DEFAULT_SOURCE
+#include "syncTest.h"
+
+void syncIndexMgrPrint(SSyncIndexMgr *pObj) {
+ char *serialized = syncIndexMgr2Str(pObj);
+ printf("syncIndexMgrPrint | len:%" PRIu64 " | %s \n", (uint64_t)strlen(serialized), serialized);
+ fflush(NULL);
+ taosMemoryFree(serialized);
+}
+
+void syncIndexMgrPrint2(char *s, SSyncIndexMgr *pObj) {
+ char *serialized = syncIndexMgr2Str(pObj);
+ printf("syncIndexMgrPrint2 | len:%" PRIu64 " | %s | %s \n", (uint64_t)strlen(serialized), s, serialized);
+ fflush(NULL);
+ taosMemoryFree(serialized);
+}
+
+void syncIndexMgrLog(SSyncIndexMgr *pObj) {
+ char *serialized = syncIndexMgr2Str(pObj);
+ sTrace("syncIndexMgrLog | len:%" PRIu64 " | %s", (uint64_t)strlen(serialized), serialized);
+ taosMemoryFree(serialized);
+}
+
+void syncIndexMgrLog2(char *s, SSyncIndexMgr *pObj) {
+ if (gRaftDetailLog) {
+ char *serialized = syncIndexMgr2Str(pObj);
+ sTrace("syncIndexMgrLog2 | len:%" PRIu64 " | %s | %s", (uint64_t)strlen(serialized), s, serialized);
+ taosMemoryFree(serialized);
+ }
+}
+
+cJSON *syncIndexMgr2Json(SSyncIndexMgr *pSyncIndexMgr) {
+ char u64buf[128] = {0};
+ cJSON *pRoot = cJSON_CreateObject();
+
+ if (pSyncIndexMgr != NULL) {
+ cJSON_AddNumberToObject(pRoot, "replicaNum", pSyncIndexMgr->replicaNum);
+ cJSON *pReplicas = cJSON_CreateArray();
+ cJSON_AddItemToObject(pRoot, "replicas", pReplicas);
+ for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
+ cJSON_AddItemToArray(pReplicas, syncUtilRaftId2Json(&(*(pSyncIndexMgr->replicas))[i]));
+ }
+
+ {
+ int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum);
+ for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
+ arr[i] = pSyncIndexMgr->index[i];
+ }
+ cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum);
+ taosMemoryFree(arr);
+ cJSON_AddItemToObject(pRoot, "index", pIndex);
+ }
+
+ {
+ int *arr = (int *)taosMemoryMalloc(sizeof(int) * pSyncIndexMgr->replicaNum);
+ for (int i = 0; i < pSyncIndexMgr->replicaNum; ++i) {
+ arr[i] = pSyncIndexMgr->privateTerm[i];
+ }
+ cJSON *pIndex = cJSON_CreateIntArray(arr, pSyncIndexMgr->replicaNum);
+ taosMemoryFree(arr);
+ cJSON_AddItemToObject(pRoot, "privateTerm", pIndex);
+ }
+
+ snprintf(u64buf, sizeof(u64buf), "%p", pSyncIndexMgr->pSyncNode);
+ cJSON_AddStringToObject(pRoot, "pSyncNode", u64buf);
+ }
+
+ cJSON *pJson = cJSON_CreateObject();
+ cJSON_AddItemToObject(pJson, "pSyncIndexMgr", pRoot);
+ return pJson;
+}
+
+char *syncIndexMgr2Str(SSyncIndexMgr *pSyncIndexMgr) {
+ cJSON *pJson = syncIndexMgr2Json(pSyncIndexMgr);
+ char *serialized = cJSON_Print(pJson);
+ cJSON_Delete(pJson);
+ return serialized;
+}
diff --git a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c
index 012382d69d3bcda0a4f6cd2c2daae6f9b7830a99..cdd2ae045d4b60c2480ba16101ad94861015e0a8 100644
--- a/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c
+++ b/source/libs/sync/test/sync_test_lib/src/syncMessageDebug.c
@@ -16,6 +16,244 @@
#define _DEFAULT_SOURCE
#include "syncTest.h"
+// ---- message process SyncPing----
+SyncPing* syncPingBuild(uint32_t dataLen) {
+ uint32_t bytes = sizeof(SyncPing) + dataLen;
+ SyncPing* pMsg = taosMemoryMalloc(bytes);
+ memset(pMsg, 0, bytes);
+ pMsg->bytes = bytes;
+ pMsg->msgType = TDMT_SYNC_PING;
+ pMsg->dataLen = dataLen;
+ return pMsg;
+}
+
+SyncPing* syncPingBuild2(const SRaftId* srcId, const SRaftId* destId, int32_t vgId, const char* str) {
+ uint32_t dataLen = strlen(str) + 1;
+ SyncPing* pMsg = syncPingBuild(dataLen);
+ pMsg->vgId = vgId;
+ pMsg->srcId = *srcId;
+ pMsg->destId = *destId;
+ snprintf(pMsg->data, pMsg->dataLen, "%s", str);
+ return pMsg;
+}
+
+SyncPing* syncPingBuild3(const SRaftId* srcId, const SRaftId* destId, int32_t vgId) {
+ SyncPing* pMsg = syncPingBuild2(srcId, destId, vgId, "ping");
+ return pMsg;
+}
+
+char* syncPingSerialize2(const SyncPing* pMsg, uint32_t* len) {
+ char* buf = taosMemoryMalloc(pMsg->bytes);
+ ASSERT(buf != NULL);
+ syncPingSerialize(pMsg, buf, pMsg->bytes);
+ if (len != NULL) {
+ *len = pMsg->bytes;
+ }
+ return buf;
+}
+
+void syncPing2RpcMsg(const SyncPing* pMsg, SRpcMsg* pRpcMsg) {
+ memset(pRpcMsg, 0, sizeof(*pRpcMsg));
+ pRpcMsg->msgType = pMsg->msgType;
+ pRpcMsg->contLen = pMsg->bytes;
+ pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
+ syncPingSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
+}
+
+void syncPingFromRpcMsg(const SRpcMsg* pRpcMsg, SyncPing* pMsg) {
+ syncPingDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
+}
+
+SyncPing* syncPingDeserialize3(void* buf, int32_t bufLen) {
+ SDecoder decoder = {0};
+ tDecoderInit(&decoder, buf, bufLen);
+ if (tStartDecode(&decoder) < 0) {
+ return NULL;
+ }
+
+ SyncPing* pMsg = NULL;
+ uint32_t bytes;
+ if (tDecodeU32(&decoder, &bytes) < 0) {
+ return NULL;
+ }
+
+ pMsg = taosMemoryMalloc(bytes);
+ ASSERT(pMsg != NULL);
+ pMsg->bytes = bytes;
+
+ if (tDecodeI32(&decoder, &pMsg->vgId) < 0) {
+ taosMemoryFree(pMsg);
+ return NULL;
+ }
+ if (tDecodeU32(&decoder, &pMsg->msgType) < 0) {
+ taosMemoryFree(pMsg);
+ return NULL;
+ }
+ if (tDecodeU64(&decoder, &pMsg->srcId.addr) < 0) {
+ taosMemoryFree(pMsg);
+ return NULL;
+ }
+ if (tDecodeI32(&decoder, &pMsg->srcId.vgId) < 0) {
+ taosMemoryFree(pMsg);
+ return NULL;
+ }
+ if (tDecodeU64(&decoder, &pMsg->destId.addr) < 0) {
+ taosMemoryFree(pMsg);
+ return NULL;
+ }
+ if (tDecodeI32(&decoder, &pMsg->destId.vgId) < 0) {
+ taosMemoryFree(pMsg);
+ return NULL;
+ }
+ if (tDecodeU32(&decoder, &pMsg->dataLen) < 0) {
+ taosMemoryFree(pMsg);
+ return NULL;
+ }
+ uint32_t len;
+ char* data = NULL;
+ if (tDecodeBinary(&decoder, (uint8_t**)(&data), &len) < 0) {
+ taosMemoryFree(pMsg);
+ return NULL;
+ }
+ ASSERT(len == pMsg->dataLen);
+ memcpy(pMsg->data, data, len);
+
+ tEndDecode(&decoder);
+ tDecoderClear(&decoder);
+ return pMsg;
+}
+
+int32_t syncPingSerialize3(const SyncPing* pMsg, char* buf, int32_t bufLen) {
+ SEncoder encoder = {0};
+ tEncoderInit(&encoder, buf, bufLen);
+ if (tStartEncode(&encoder) < 0) {
+ return -1;
+ }
+
+ if (tEncodeU32(&encoder, pMsg->bytes) < 0) {
+ return -1;
+ }
+ if (tEncodeI32(&encoder, pMsg->vgId) < 0) {
+ return -1;
+ }
+ if (tEncodeU32(&encoder, pMsg->msgType) < 0) {
+ return -1;
+ }
+ if (tEncodeU64(&encoder, pMsg->srcId.addr) < 0) {
+ return -1;
+ }
+ if (tEncodeI32(&encoder, pMsg->srcId.vgId) < 0) {
+ return -1;
+ }
+ if (tEncodeU64(&encoder, pMsg->destId.addr) < 0) {
+ return -1;
+ }
+ if (tEncodeI32(&encoder, pMsg->destId.vgId) < 0) {
+ return -1;
+ }
+ if (tEncodeU32(&encoder, pMsg->dataLen) < 0) {
+ return -1;
+ }
+ if (tEncodeBinary(&encoder, pMsg->data, pMsg->dataLen)) {
+ return -1;
+ }
+
+ tEndEncode(&encoder);
+ int32_t tlen = encoder.pos;
+ tEncoderClear(&encoder);
+ return tlen;
+}
+
+cJSON* syncPing2Json(const SyncPing* pMsg) {
+ char u64buf[128] = {0};
+ cJSON* pRoot = cJSON_CreateObject();
+
+ if (pMsg != NULL) {
+ cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
+ cJSON_AddNumberToObject(pRoot, "vgId", pMsg->vgId);
+ cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
+
+ cJSON* pSrcId = cJSON_CreateObject();
+ snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->srcId.addr);
+ cJSON_AddStringToObject(pSrcId, "addr", u64buf);
+ {
+ uint64_t u64 = pMsg->srcId.addr;
+ cJSON* pTmp = pSrcId;
+ char host[128] = {0};
+ uint16_t port;
+ syncUtilU642Addr(u64, host, sizeof(host), &port);
+ cJSON_AddStringToObject(pTmp, "addr_host", host);
+ cJSON_AddNumberToObject(pTmp, "addr_port", port);
+ }
+ cJSON_AddNumberToObject(pSrcId, "vgId", pMsg->srcId.vgId);
+ cJSON_AddItemToObject(pRoot, "srcId", pSrcId);
+
+ cJSON* pDestId = cJSON_CreateObject();
+ snprintf(u64buf, sizeof(u64buf), "%" PRIu64, pMsg->destId.addr);
+ cJSON_AddStringToObject(pDestId, "addr", u64buf);
+ {
+ uint64_t u64 = pMsg->destId.addr;
+ cJSON* pTmp = pDestId;
+ char host[128] = {0};
+ uint16_t port;
+ syncUtilU642Addr(u64, host, sizeof(host), &port);
+ cJSON_AddStringToObject(pTmp, "addr_host", host);
+ cJSON_AddNumberToObject(pTmp, "addr_port", port);
+ }
+ cJSON_AddNumberToObject(pDestId, "vgId", pMsg->destId.vgId);
+ cJSON_AddItemToObject(pRoot, "destId", pDestId);
+
+ cJSON_AddNumberToObject(pRoot, "dataLen", pMsg->dataLen);
+ char* s;
+ s = syncUtilPrintBin((char*)(pMsg->data), pMsg->dataLen);
+ cJSON_AddStringToObject(pRoot, "data", s);
+ taosMemoryFree(s);
+ s = syncUtilPrintBin2((char*)(pMsg->data), pMsg->dataLen);
+ cJSON_AddStringToObject(pRoot, "data2", s);
+ taosMemoryFree(s);
+ }
+
+ cJSON* pJson = cJSON_CreateObject();
+ cJSON_AddItemToObject(pJson, "SyncPing", pRoot);
+ return pJson;
+}
+
+char* syncPing2Str(const SyncPing* pMsg) {
+ cJSON* pJson = syncPing2Json(pMsg);
+ char* serialized = cJSON_Print(pJson);
+ cJSON_Delete(pJson);
+ return serialized;
+}
+
+// for debug ----------------------
+void syncPingPrint(const SyncPing* pMsg) {
+ char* serialized = syncPing2Str(pMsg);
+ printf("syncPingPrint | len:%d | %s \n", (int32_t)strlen(serialized), serialized);
+ fflush(NULL);
+ taosMemoryFree(serialized);
+}
+
+void syncPingPrint2(char* s, const SyncPing* pMsg) {
+ char* serialized = syncPing2Str(pMsg);
+ printf("syncPingPrint2 | len:%d | %s | %s \n", (int32_t)strlen(serialized), s, serialized);
+ fflush(NULL);
+ taosMemoryFree(serialized);
+}
+
+void syncPingLog(const SyncPing* pMsg) {
+ char* serialized = syncPing2Str(pMsg);
+ sTrace("syncPingLog | len:%d | %s", (int32_t)strlen(serialized), serialized);
+ taosMemoryFree(serialized);
+}
+
+void syncPingLog2(char* s, const SyncPing* pMsg) {
+ if (gRaftDetailLog) {
+ char* serialized = syncPing2Str(pMsg);
+ sTrace("syncPingLog2 | len:%d | %s | %s", (int32_t)strlen(serialized), s, serialized);
+ taosMemoryFree(serialized);
+ }
+}
+
// ---------------------------------------------
cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
cJSON* pRoot;
diff --git a/source/util/src/terror.c b/source/util/src/terror.c
index 0e6568d692996b84d0996c315ab8934a73aaf64d..a1162d2e94b089a66f199172bccc9641df5474cc 100644
--- a/source/util/src/terror.c
+++ b/source/util/src/terror.c
@@ -521,7 +521,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TIMELINE_FUNC, "Invalid timeline fu
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_PASSWD, "Invalid password")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_ALTER_TABLE, "Invalid alter table statement")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_CANNOT_DROP_PRIMARY_KEY, "Primary timestamp column cannot be dropped")
-TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_MODIFY_COL, "Only binary/nchar column length could be modified")
+TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_MODIFY_COL, "Only binary/nchar column length could be modified, and the length can only be increased, not decreased")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_TBNAME, "Invalid tbname pseudo column")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_INVALID_FUNCTION_NAME, "Invalid function name")
TAOS_DEFINE_ERROR(TSDB_CODE_PAR_COMMENT_TOO_LONG, "Comment too long")
diff --git a/tests/script/tsim/stream/state0.sim b/tests/script/tsim/stream/state0.sim
index dc7d9bc40746b76ad3b34430981c86c2b75c9010..87d7d4b7fcd49261ed602886057f4291da840219 100644
--- a/tests/script/tsim/stream/state0.sim
+++ b/tests/script/tsim/stream/state0.sim
@@ -544,4 +544,192 @@ if $rows != 10 then
endi
+sql drop stream if exists streams4;
+sql drop database if exists test4;
+sql drop stable if exists streamt4;
+sql create database if not exists test4 vgroups 10 precision "ms" ;
+sql use test4;
+sql create table st (ts timestamp, c1 tinyint, c2 smallint) tags (t1 tinyint) ;
+sql create table t1 using st tags (-81) ;
+sql create table t2 using st tags (-81) ;
+sql create stream if not exists streams4 trigger window_close into streamt4 as select _wstart AS start, min(c1),count(c1) from t1 state_window(c1);
+
+sql insert into t1 (ts, c1) values (1668073288209, 11);
+sql insert into t1 (ts, c1) values (1668073288210, 11);
+sql insert into t1 (ts, c1) values (1668073288211, 11);
+sql insert into t1 (ts, c1) values (1668073288212, 11);
+sql insert into t1 (ts, c1) values (1668073288213, 11);
+sql insert into t1 (ts, c1) values (1668073288214, 11);
+sql insert into t1 (ts, c1) values (1668073288215, 29);
+
+$loop_count = 0
+loop7:
+
+sleep 200
+
+sql select * from streamt4 order by start;
+
+$loop_count = $loop_count + 1
+if $loop_count == 20 then
+ return -1
+endi
+
+if $rows != 1 then
+ print =====rows=$rows
+ goto loop7
+endi
+
+if $data01 != 11 then
+ print =====data01=$data01
+ goto loop7
+endi
+
+if $data02 != 6 then
+ print =====data02=$data02
+ goto loop7
+endi
+
+sql delete from t1 where ts = cast(1668073288214 as timestamp);
+sql insert into t1 (ts, c1) values (1668073288216, 29);
+sql delete from t1 where ts = cast(1668073288215 as timestamp);
+sql insert into t1 (ts, c1) values (1668073288217, 29);
+sql delete from t1 where ts = cast(1668073288216 as timestamp);
+sql insert into t1 (ts, c1) values (1668073288218, 29);
+sql delete from t1 where ts = cast(1668073288217 as timestamp);
+sql insert into t1 (ts, c1) values (1668073288219, 29);
+sql delete from t1 where ts = cast(1668073288218 as timestamp);
+sql insert into t1 (ts, c1) values (1668073288220, 29);
+sql delete from t1 where ts = cast(1668073288219 as timestamp);
+
+$loop_count = 0
+loop8:
+
+sleep 200
+
+sql select * from streamt4 order by start;
+
+$loop_count = $loop_count + 1
+if $loop_count == 20 then
+ return -1
+endi
+
+if $rows != 1 then
+ print =====rows=$rows
+ goto loop8
+endi
+
+if $data01 != 11 then
+ print =====data01=$data01
+ goto loop8
+endi
+
+if $data02 != 5 then
+ print =====data02=$data02
+ goto loop8
+endi
+
+sql insert into t1 (ts, c1) values (1668073288221, 65);
+sql insert into t1 (ts, c1) values (1668073288222, 65);
+sql insert into t1 (ts, c1) values (1668073288223, 65);
+sql insert into t1 (ts, c1) values (1668073288224, 65);
+sql insert into t1 (ts, c1) values (1668073288225, 65);
+sql insert into t1 (ts, c1) values (1668073288226, 65);
+
+$loop_count = 0
+loop8:
+
+sleep 200
+
+sql select * from streamt4 order by start;
+
+$loop_count = $loop_count + 1
+if $loop_count == 20 then
+ return -1
+endi
+
+if $rows != 2 then
+ print =====rows=$rows
+ goto loop8
+endi
+
+if $data01 != 11 then
+ print =====data01=$data01
+ goto loop8
+endi
+
+if $data02 != 5 then
+ print =====data02=$data02
+ goto loop8
+endi
+
+if $data11 != 29 then
+ print =====data11=$data11
+ goto loop8
+endi
+
+if $data12 != 1 then
+ print =====data12=$data12
+ goto loop8
+endi
+
+sql insert into t1 (ts, c1) values (1668073288224, 64);
+
+$loop_count = 0
+loop9:
+
+sleep 200
+
+sql select * from streamt4 order by start;
+
+$loop_count = $loop_count + 1
+if $loop_count == 20 then
+ return -1
+endi
+
+if $rows != 4 then
+ print =====rows=$rows
+ goto loop9
+endi
+
+if $data01 != 11 then
+ print =====data01=$data01
+ goto loop9
+endi
+
+if $data02 != 5 then
+ print =====data02=$data02
+ goto loop9
+endi
+
+if $data11 != 29 then
+ print =====data11=$data11
+ goto loop9
+endi
+
+if $data12 != 1 then
+ print =====data12=$data12
+ goto loop9
+endi
+
+if $data21 != 65 then
+ print =====data21=$data21
+ goto loop9
+endi
+
+if $data22 != 3 then
+ print =====data22=$data22
+ goto loop9
+endi
+
+if $data31 != 64 then
+ print =====data31=$data31
+ goto loop9
+endi
+
+if $data32 != 1 then
+ print =====data32=$data32
+ goto loop9
+endi
+
+
system sh/exec.sh -n dnode1 -s stop -x SIGINT