提交 f11d2403 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-11463-3.0

......@@ -86,21 +86,21 @@ typedef struct {
typedef struct {
float uptime; // day
float cpu_engine;
float cpu_system;
double cpu_engine;
double cpu_system;
float cpu_cores;
int64_t mem_engine; // KB
int64_t mem_system; // KB
int64_t mem_total; // KB
float disk_engine; // GB
float disk_used; // GB
float disk_total; // GB
int64_t net_in;
int64_t net_out;
float io_read;
float io_write;
float io_read_disk;
float io_write_disk;
int64_t mem_engine; // KB
int64_t mem_system; // KB
int64_t mem_total; // KB
int64_t disk_engine; // Byte
int64_t disk_used; // Byte
int64_t disk_total; // Byte
double net_in; // bytes per second
double net_out; // bytes per second
double io_read;
double io_write;
double io_read_disk;
double io_write_disk;
int32_t req_select;
float req_select_rate;
int32_t req_insert;
......@@ -138,6 +138,8 @@ typedef struct SSyncInfo {
void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
void* queue;
int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg);
} SSyncInfo;
......@@ -147,13 +149,10 @@ typedef struct SSyncNode SSyncNode;
int32_t syncInit();
void syncCleanUp();
int64_t syncStart(const SSyncInfo* pSyncInfo);
void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak);
// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak);
int64_t syncStart(const SSyncInfo* pSyncInfo);
void syncStop(int64_t rid);
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg);
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak);
ESyncState syncGetMyRole(int64_t rid);
void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole);
......@@ -38,15 +38,15 @@ int32_t taosGetEmail(char *email, int32_t maxLen);
int32_t taosGetOsReleaseName(char *releaseName, int32_t maxLen);
int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores);
int32_t taosGetCpuCores(float *numOfCores);
int32_t taosGetCpuUsage(float *cpu_system, float *cpu_engine);
int32_t taosGetCpuUsage(double *cpu_system, double *cpu_engine);
int32_t taosGetTotalMemory(int64_t *totalKB);
int32_t taosGetProcMemory(int64_t *usedKB);
int32_t taosGetSysMemory(int64_t *usedKB);
int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize);
int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars);
int32_t taosGetProcIO(float *readKB, float *writeKB);
int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes);
int32_t taosGetBandSpeed(float *bandSpeedKb);
int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes);
int32_t taosGetIOSpeed(double *readKB, double *writeKB, double *readDiskKB, double *writeDiskKB);
int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes);
int32_t taosGetBandSpeed(double *receive_bytes_per_sec, double *transmit_bytes_per_sec);
int32_t taosSystem(const char *cmd);
void taosKillSystem();
......@@ -108,6 +108,7 @@ typedef struct {
SHashObj *hash;
int32_t openVnodes;
int32_t totalVnodes;
int32_t masterNum;
SRWLatch latch;
SQWorkerPool queryPool;
SFWorkerPool fetchPool;
......@@ -487,12 +487,10 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
pInfo->mem_total = tsTotalMemoryKB;
pInfo->disk_engine = 0;
pInfo->disk_used = tsDataSpace.size.used / (1024 * 1024 * 1024.0);
pInfo->disk_total = tsDataSpace.size.avail / (1024 * 1024 * 1024.0);
taosGetCardInfo(NULL, &pInfo->net_in, &pInfo->net_out);
taosGetProcIO(&pInfo->io_read, &pInfo->io_write);
pInfo->io_read_disk = 0;
pInfo->io_write_disk = 0;
pInfo->disk_used = tsDataSpace.size.used;
pInfo->disk_total = tsDataSpace.size.total;
taosGetBandSpeed(&pInfo->net_in, &pInfo->net_out);
taosGetIOSpeed(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk);
pInfo->req_select = 0;
pInfo->req_select_rate = 0;
pInfo->req_insert = 0;
......@@ -501,9 +499,9 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) {
pInfo->req_insert_batch = 0;
pInfo->req_insert_batch_success = 0;
pInfo->req_insert_batch_rate = 0;
pInfo->errors = 0;
pInfo->vnodes_num = 0;
pInfo->masters = 0;
pInfo->errors = tsNumOfErrorLogs;
pInfo->vnodes_num = pDnode->vmgmt.totalVnodes;
pInfo->masters = pDnode->vmgmt.masterNum;
pInfo->has_mnode = dndIsMnode(pDnode);
......@@ -17,6 +17,7 @@
#include "dndVnodes.h"
#include "dndMgmt.h"
#include "dndTransport.h"
#include "sync.h"
typedef struct {
int32_t vgId;
......@@ -979,6 +980,8 @@ void dndCleanupVnodes(SDnode *pDnode) {
void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) {
SVnodesMgmt *pMgmt = &pDnode->vmgmt;
int32_t totalVnodes = 0;
int32_t masterNum = 0;
......@@ -993,8 +996,12 @@ void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) {
vnodeGetLoad(pVnode->pImpl, &vload);
taosArrayPush(pLoads, &vload);
if (vload.role == TAOS_SYNC_STATE_LEADER) masterNum++;
pIter = taosHashIterate(pMgmt->hash, pIter);
pMgmt->totalVnodes = totalVnodes;
pMgmt->masterNum = masterNum;
......@@ -16,6 +16,7 @@
#include "index.h"
#include "indexInt.h"
#include "index_cache.h"
#include "index_comm.h"
#include "index_tfile.h"
#include "index_util.h"
#include "tdef.h"
......@@ -30,8 +31,6 @@
void* indexQhandle = NULL;
static char JSON_COLUMN[] = "JSON";
#define INDEX_MERGE_ADD_DEL(src, dst, tgt) \
{ \
bool f = false; \
......@@ -64,13 +63,11 @@ typedef struct SIdxColInfo {
int cVersion;
} SIdxColInfo;
typedef struct SIdxMergeHelper {
char* colVal;
typedef struct SIdxTempResult {
SArray* total;
SArray* added;
SArray* deled;
bool reset;
} SIdxMergeHelper;
} SIdxTempResult;
static pthread_once_t isInit = PTHREAD_ONCE_INIT;
// static void indexInit();
......@@ -82,8 +79,7 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp
static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch);
// merge cache and tfile by opera type
static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxMergeHelper* helper);
static void indexMergeSameKey(SArray* result, TFileValue* tv, SIdxMergeHelper* helper);
static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTempResult* helper);
// static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf);
// int32_t indexSerialKey(ICacheKey* key, char* buf);
......@@ -399,7 +395,6 @@ static void indexInterResultsDestroy(SArray* results) {
static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* fResults) {
// refactor, merge interResults into fResults by oType
for (int i = 0; i < taosArrayGetSize(interResults); i--) {
SArray* t = taosArrayGetP(interResults, i);
taosArraySort(t, uidCompare);
......@@ -418,98 +413,82 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType
return 0;
SIdxMergeHelper* sIdxMergeHelperCreate() {
SIdxMergeHelper* hp = calloc(1, sizeof(SIdxMergeHelper));
hp->total = taosArrayInit(4, sizeof(uint64_t));
hp->added = taosArrayInit(4, sizeof(uint64_t));
hp->deled = taosArrayInit(4, sizeof(uint64_t));
hp->reset = false;
return hp;
SIdxTempResult* sIdxTempResultCreate() {
SIdxTempResult* tr = calloc(1, sizeof(SIdxTempResult));
tr->total = taosArrayInit(4, sizeof(uint64_t));
tr->added = taosArrayInit(4, sizeof(uint64_t));
tr->deled = taosArrayInit(4, sizeof(uint64_t));
return tr;
void sIdxMergeHelperClear(SIdxMergeHelper* hp) {
if (hp == NULL) {
void sIdxTempResultClear(SIdxTempResult* tr) {
if (tr == NULL) {
hp->reset = false;
void sIdxMergeHelperDestroy(SIdxMergeHelper* hp) {
if (hp == NULL) {
void sIdxTempResultDestroy(SIdxTempResult* tr) {
if (tr == NULL) {
static void indexMergeSameKey(SArray* result, TFileValue* tv, SIdxMergeHelper* helper) {
int32_t sz = result ? taosArrayGetSize(result) : 0;
if (sz > 0) {
// TODO(yihao): remove duplicate tableid
TFileValue* lv = taosArrayGetP(result, sz - 1);
// indexError("merge colVal: %s", lv->colVal);
if (strcmp(lv->colVal, tv->colVal) == 0) {
taosArrayAddAll(lv->tableId, tv->tableId);
} else {
taosArrayPush(result, &tv);
} else {
taosArrayPush(result, &tv);
static void sIdxMergeResult(SArray* result, SIdxMergeHelper* mh) {
taosArraySort(mh->total, uidCompare);
taosArraySort(mh->added, uidCompare);
taosArraySort(mh->deled, uidCompare);
static void sIdxTempResultMergeTo(SArray* result, SIdxTempResult* tr) {
taosArraySort(tr->total, uidCompare);
taosArraySort(tr->added, uidCompare);
taosArraySort(tr->deled, uidCompare);
SArray* arrs = taosArrayInit(2, sizeof(void*));
taosArrayPush(arrs, &mh->total);
taosArrayPush(arrs, &mh->added);
taosArrayPush(arrs, &tr->total);
taosArrayPush(arrs, &tr->added);
iUnion(arrs, result);
iExcept(result, mh->deled);
iExcept(result, tr->deled);
static void indexMayMergeToFinalResult(SArray* result, TFileValue* tfv, SIdxMergeHelper* help) {
static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTempResult* tr) {
int32_t sz = taosArrayGetSize(result);
if (sz > 0) {
TFileValue* lv = taosArrayGetP(result, sz - 1);
if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) {
sIdxMergeResult(lv->tableId, help);
sIdxTempResultMergeTo(lv->tableId, tr);
taosArrayPush(result, &tfv);
} else if (tfv == NULL) {
sIdxMergeResult(lv->tableId, help);
// handle last iterator
sIdxTempResultMergeTo(lv->tableId, tr);
} else {
// temp result saved in help
} else {
taosArrayPush(result, &tfv);
static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxMergeHelper* mh) {
static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTempResult* tr) {
char* colVal = (cv != NULL) ? cv->colVal : tv->colVal;
TFileValue* tfv = tfileValueCreate(colVal);
indexMayMergeToFinalResult(result, tfv, mh);
indexMayMergeTempToFinalResult(result, tfv, tr);
if (cv != NULL) {
uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0);
if (cv->type == ADD_VALUE) {
INDEX_MERGE_ADD_DEL(mh->deled, mh->added, id)
INDEX_MERGE_ADD_DEL(tr->deled, tr->added, id)
} else if (cv->type == DEL_VALUE) {
INDEX_MERGE_ADD_DEL(mh->added, mh->deled, id)
INDEX_MERGE_ADD_DEL(tr->added, tr->deled, id)
if (tv != NULL) {
taosArrayAddAll(mh->total, tv->val);
taosArrayAddAll(tr->total, tv->val);
static void indexDestroyTempResult(SArray* result) {
static void indexDestroyFinalResult(SArray* result) {
int32_t sz = result ? taosArrayGetSize(result) : 0;
for (size_t i = 0; i < sz; i++) {
TFileValue* tv = taosArrayGetP(result, i);
......@@ -543,7 +522,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
bool cn = cacheIter ? cacheIter->next(cacheIter) : false;
bool tn = tfileIter ? tfileIter->next(tfileIter) : false;
SIdxMergeHelper* help = sIdxMergeHelperCreate();
SIdxTempResult* tr = sIdxTempResultCreate();
while (cn == true || tn == true) {
IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL;
IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL;
......@@ -557,21 +536,22 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
comp = 1;
if (comp == 0) {
indexMergeCacheAndTFile(result, cv, tv, help);
indexMergeCacheAndTFile(result, cv, tv, tr);
cn = cacheIter->next(cacheIter);
tn = tfileIter->next(tfileIter);
} else if (comp < 0) {
indexMergeCacheAndTFile(result, cv, NULL, help);
indexMergeCacheAndTFile(result, cv, NULL, tr);
cn = cacheIter->next(cacheIter);
} else {
indexMergeCacheAndTFile(result, NULL, tv, help);
indexMergeCacheAndTFile(result, NULL, tv, tr);
tn = tfileIter->next(tfileIter);
indexMayMergeToFinalResult(result, NULL, help);
indexMayMergeTempToFinalResult(result, NULL, tr);
int ret = indexGenTFile(sIdx, pCache, result);
......@@ -581,8 +561,6 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) {
int64_t cost = taosGetTimestampUs() - st;
if (ret != 0) {
indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000);
......@@ -102,7 +102,6 @@ void tfileCacheDestroy(TFileCache* tcache) {
if (tcache == NULL) {
// free table cache
TFileReader** reader = taosHashIterate(tcache->tableCache, NULL);
while (reader) {
......@@ -105,6 +105,22 @@ TEST_F(JsonEnv, testWriteMillonData) {
std::string colName("voltagefdadfa");
std::string colVal("abxxxxxxxxxxxx");
for (uint i = 0; i < 1000; i++) {
colVal[i % colVal.size()] = '0' + i % 128;
SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(),
colVal.c_str(), colVal.size());
SIndexMultiTerm* terms = indexMultiTermCreate();
indexMultiTermAdd(terms, term);
for (size_t i = 0; i < 1000; i++) {
tIndexJsonPut(index, terms, i);
std::string colName("voltagefdadfa");
std::string colVal("abxxxxxxxxxxxx");
......@@ -26,6 +26,9 @@ extern "C" {
#include "syncInt.h"
#include "taosdef.h"
void syncNodeElect(SSyncNode* pSyncNode);
void syncNodeRequestVotePeers(SSyncNode* pSyncNode);
#ifdef __cplusplus
......@@ -49,6 +49,7 @@ typedef struct SSyncIO {
int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg);
int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg);
int32_t (*FpOnSyncAppendEntriesReply)(SSyncNode *pSyncNode, SyncAppendEntriesReply *pMsg);
int32_t (*FpOnSyncTimeout)(SSyncNode *pSyncNode, SyncTimeout *pMsg);
int8_t isStart;
......@@ -58,9 +59,10 @@ extern SSyncIO *gSyncIO;
int32_t syncIOStart(char *host, uint16_t port);
int32_t syncIOStop();
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg);
int32_t syncIOTickQ();
int32_t syncIOTickPing();
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg);
int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg);
#ifdef __cplusplus
......@@ -69,6 +69,9 @@ extern "C" {
struct SRaft;
typedef struct SRaft SRaft;
struct SyncTimeout;
typedef struct SyncTimeout SyncTimeout;
struct SyncPing;
typedef struct SyncPing SyncPing;
......@@ -111,17 +114,22 @@ typedef struct SSyncNode {
void* rpcClient;
int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg);
void* queue;
int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg);
// init internal
SNodeInfo me;
SRaftId raftId;
int32_t peersNum;
SNodeInfo peers[TSDB_MAX_REPLICA];
int32_t replicaNum;
SRaftId replicasId[TSDB_MAX_REPLICA];
// raft algorithm
SSyncFSM* pFsm;
SRaftId raftId;
int32_t replicaNum;
int32_t quorum;
// life cycle
......@@ -147,19 +155,19 @@ typedef struct SSyncNode {
// timer
tmr_h pPingTimer;
int32_t pingTimerMS;
uint8_t pingTimerStart;
uint8_t pingTimerEnable;
TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp
uint64_t pingTimerCounter;
tmr_h pElectTimer;
int32_t electTimerMS;
uint8_t electTimerStart;
uint8_t electTimerEnable;
TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp
uint64_t electTimerCounter;
tmr_h pHeartbeatTimer;
int32_t heartbeatTimerMS;
uint8_t heartbeatTimerStart;
uint8_t heartbeatTimerEnable;
TAOS_TMR_CALLBACK FpHeartbeatTimer; // Timer Fp
uint64_t heartbeatTimerCounter;
......@@ -170,6 +178,7 @@ typedef struct SSyncNode {
int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg);
int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg);
int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
int32_t (*FpOnTimeout)(SSyncNode* pSyncNode, SyncTimeout* pMsg);
} SSyncNode;
......@@ -178,8 +187,24 @@ void syncNodeClose(SSyncNode* pSyncNode);
void syncNodePingAll(SSyncNode* pSyncNode);
void syncNodePingPeers(SSyncNode* pSyncNode);
void syncNodePingSelf(SSyncNode* pSyncNode);
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode);
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode);
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms);
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode);
int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms);
void syncNodeBecomeFollower(SSyncNode* pSyncNode);
void syncNodeBecomeLeader(SSyncNode* pSyncNode);
void syncNodeFollower2Candidate(SSyncNode* pSyncNode);
void syncNodeCandidate2Leader(SSyncNode* pSyncNode);
void syncNodeLeader2Follower(SSyncNode* pSyncNode);
void syncNodeCandidate2Follower(SSyncNode* pSyncNode);
#ifdef __cplusplus
......@@ -30,6 +30,8 @@ extern "C" {
// encode as uint32
typedef enum ESyncMessageType {
SYNC_PING = 101,
......@@ -38,8 +40,37 @@ typedef enum ESyncMessageType {
} ESyncMessageType;
// ---------------------------------------------
cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg);
cJSON* syncRpcUnknownMsg2Json();
// ---------------------------------------------
typedef enum ESyncTimeoutType {
} ESyncTimeoutType;
typedef struct SyncTimeout {
uint32_t bytes;
uint32_t msgType;
ESyncTimeoutType timeoutType;
void* data;
} SyncTimeout;
SyncTimeout* syncTimeoutBuild();
void syncTimeoutDestroy(SyncTimeout* pMsg);
void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen);
void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg);
void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg);
void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg);
cJSON* syncTimeout2Json(const SyncTimeout* pMsg);
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data);
// ---------------------------------------------
typedef struct SyncPing {
uint32_t bytes;
......@@ -26,6 +26,8 @@ extern "C" {
#include "syncInt.h"
#include "taosdef.h"
void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode);
#ifdef __cplusplus
......@@ -39,8 +39,9 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet);
void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId);
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2);
// ---- SSyncBuffer ----
#if 0
void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len);
void syncUtilbufDestroy(SSyncBuffer* syncBuf);
......@@ -48,7 +49,6 @@ void syncUtilbufDestroy(SSyncBuffer* syncBuf);
void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest);
void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest);
#ifdef __cplusplus
......@@ -24,13 +24,36 @@ extern "C" {
#include <stdio.h>
#include <stdlib.h>
#include "syncInt.h"
#include "syncMessage.h"
#include "syncUtil.h"
#include "taosdef.h"
typedef struct SVotesGranted {
SyncTerm term;
int32_t quorum;
int32_t votes;
bool toLeader;
SSyncNode *pSyncNode;
} SVotesGranted;
typedef struct SVotesResponded {
} SVotesResponded;
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode);
void voteGrantedDestroy(SVotesGranted *pVotesGranted);
bool voteGrantedMajority(SVotesGranted *pVotesGranted);
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg);
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term);
typedef struct SVotesRespond {
SRaftId (*replicas)[TSDB_MAX_REPLICA];
bool isRespond[TSDB_MAX_REPLICA];
int32_t replicaNum;
SyncTerm term;
SSyncNode *pSyncNode;
} SVotesRespond;
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode);
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId);
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg);
void Reset(SVotesRespond *pVotesRespond, SyncTerm term);
#ifdef __cplusplus
......@@ -14,3 +14,7 @@
#include "syncElection.h"
void syncNodeElect(SSyncNode* pSyncNode) {}
void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {}
\ No newline at end of file
......@@ -40,17 +40,6 @@ static void syncIOTickPingFunc(void *param, void *tmrId);
// ----------------------------
// public function ------------
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
"<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, "
"pMsg->msgType:%d, pMsg->contLen:%d",
clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle,
pMsg->msgType, pMsg->contLen);
pMsg->handle = NULL;
rpcSendRequest(clientRpc, pEpSet, pMsg, NULL);
return 0;
int32_t syncIOStart(char *host, uint16_t port) {
gSyncIO = syncIOCreate(host, port);
assert(gSyncIO != NULL);
......@@ -83,6 +72,35 @@ int32_t syncIOTickPing() {
return ret;
int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) {
"<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, "
"pMsg->msgType:%d, pMsg->contLen:%d",
clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle,
pMsg->msgType, pMsg->contLen);
cJSON *pJson = syncRpcMsg2Json(pMsg);
char *serialized = cJSON_Print(pJson);
sTrace("process syncMessage send: pMsg:%s ", serialized);
pMsg->handle = NULL;
rpcSendRequest(clientRpc, pEpSet, pMsg, NULL);
return 0;
int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) {
SRpcMsg *pTemp;
pTemp = taosAllocateQitem(sizeof(SRpcMsg));
memcpy(pTemp, pMsg, sizeof(SRpcMsg));
STaosQueue *pMsgQ = queue;
taosWriteQitem(pMsgQ, pTemp);
return 0;
// local function ------------
static int32_t syncIOStartInternal(SSyncIO *io) {
......@@ -193,7 +211,7 @@ static void *syncIOConsumerFunc(void *param) {
SSyncIO *io = param;
STaosQall *qall;
SRpcMsg * pRpcMsg, rpcMsg;
SRpcMsg *pRpcMsg, rpcMsg;
int type;
qall = taosAllocateQall();
......@@ -215,6 +233,7 @@ static void *syncIOConsumerFunc(void *param) {
syncPingFromRpcMsg(pRpcMsg, pSyncMsg);
// memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen);
io->FpOnSyncPing(io->pSyncNode, pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_PING_REPLY) {
......@@ -223,6 +242,16 @@ static void *syncIOConsumerFunc(void *param) {
pSyncMsg = syncPingReplyBuild(pRpcMsg->contLen);
syncPingReplyFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_TIMEOUT) {
if (io->FpOnSyncTimeout != NULL) {
SyncTimeout *pSyncMsg;
pSyncMsg = syncTimeoutBuild();
syncTimeoutFromRpcMsg(pRpcMsg, pSyncMsg);
io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg);
} else {
......@@ -25,7 +25,10 @@ static int32_t tsNodeRefId = -1;
// ------ local funciton ---------
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg);
static void syncNodePingTimerCb(void* param, void* tmrId);
static void syncNodeEqPingTimer(void* param, void* tmrId);
static void syncNodeEqElectTimer(void* param, void* tmrId);
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId);
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg);
static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg);
......@@ -37,6 +40,7 @@ static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg);
static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg);
static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg);
static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg);
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg);
// ---------------------------------
int32_t syncInit() {
......@@ -56,8 +60,6 @@ void syncStop(int64_t rid) {}
int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; }
// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; }
int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; }
ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; }
......@@ -76,6 +78,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->rpcClient = pSyncInfo->rpcClient;
pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg;
pSyncNode->queue = pSyncInfo->queue;
pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg;
pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex];
pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1;
......@@ -93,8 +97,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->pPingTimer = NULL;
pSyncNode->pingTimerMS = 1000;
atomic_store_8(&pSyncNode->pingTimerStart, 0);
pSyncNode->FpPingTimer = syncNodePingTimerCb;
atomic_store_8(&pSyncNode->pingTimerEnable, 0);
pSyncNode->FpPingTimer = syncNodeEqPingTimer;
pSyncNode->pingTimerCounter = 0;
pSyncNode->FpOnPing = syncNodeOnPingCb;
......@@ -103,6 +107,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) {
pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb;
pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb;
pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb;
pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb;
return pSyncNode;
......@@ -148,22 +153,76 @@ void syncNodePingSelf(SSyncNode* pSyncNode) {
int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) {
if (pSyncNode->pPingTimer == NULL) {
pSyncNode->pPingTimer =
taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager);
taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager);
} else {
taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager,
taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager,
atomic_store_8(&pSyncNode->pingTimerStart, 1);
atomic_store_8(&pSyncNode->pingTimerEnable, 1);
return 0;
int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) {
atomic_store_8(&pSyncNode->pingTimerStart, 0);
pSyncNode->pingTimerCounter = TIMER_MAX_MS;
atomic_store_8(&pSyncNode->pingTimerEnable, 0);
pSyncNode->pingTimerMS = TIMER_MAX_MS;
return 0;
int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode) {
if (pSyncNode->pElectTimer == NULL) {
pSyncNode->pElectTimer =
taosTmrStart(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager);
} else {
taosTmrReset(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager,
atomic_store_8(&pSyncNode->electTimerEnable, 1);
return 0;
int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) {
atomic_store_8(&pSyncNode->electTimerEnable, 0);
pSyncNode->electTimerMS = TIMER_MAX_MS;
return 0;
int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; }
int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) {
if (pSyncNode->pHeartbeatTimer == NULL) {
pSyncNode->pHeartbeatTimer =
taosTmrStart(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager);
} else {
taosTmrReset(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager,
atomic_store_8(&pSyncNode->heartbeatTimerEnable, 1);
return 0;
int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) {
atomic_store_8(&pSyncNode->heartbeatTimerEnable, 0);
pSyncNode->heartbeatTimerMS = TIMER_MAX_MS;
return 0;
int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; }
void syncNodeBecomeFollower(SSyncNode* pSyncNode) {}
void syncNodeBecomeLeader(SSyncNode* pSyncNode) {}
void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {}
void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {}
void syncNodeLeader2Follower(SSyncNode* pSyncNode) {}
void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {}
// ------ local funciton ---------
static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) {
sTrace("syncNodePing pSyncNode:%p ", pSyncNode);
......@@ -204,7 +263,6 @@ static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pM
static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) {
sTrace("syncNodeSendMsgById pSyncNode:%p ", pSyncNode);
SEpSet epSet;
syncUtilraftId2EpSet(destRaftId, &epSet);
pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg);
......@@ -225,7 +283,7 @@ static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) {
cJSON* pJson = syncPing2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("syncNodeOnPingCb syncNodePing pMsg:%s ", serialized);
sTrace("process syncMessage recv: syncNodeOnPingCb pMsg:%s ", serialized);
......@@ -245,7 +303,7 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) {
cJSON* pJson = syncPingReply2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("syncNodeOnPingReplyCb syncNodePing pMsg:%s ", serialized);
sTrace("process syncMessage recv: syncNodeOnPingReplyCb pMsg:%s ", serialized);
......@@ -273,22 +331,50 @@ static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesR
return ret;
static void syncNodePingTimerCb(void* param, void* tmrId) {
static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) {
int32_t ret = 0;
sTrace("<-- syncNodeOnTimeoutCb -->");
cJSON* pJson = syncTimeout2Json(pMsg);
char* serialized = cJSON_Print(pJson);
sTrace("process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s ", serialized);
if (pMsg->timeoutType == SYNC_TIMEOUT_PING) {
if (atomic_load_8(&ths->pingTimerEnable)) {
} else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) {
} else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) {
} else {
return ret;
static void syncNodeEqPingTimer(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
if (atomic_load_8(&pSyncNode->pingTimerStart)) {
if (atomic_load_8(&pSyncNode->pingTimerEnable)) {
// pSyncNode->pingTimerMS += 100;
"syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, "
"tmrId:%p ",
pSyncNode->pingTimerCounter, pSyncNode->pingTimerMS, pSyncNode->pPingTimer, tmrId);
SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, pSyncNode);
SRpcMsg rpcMsg;
syncTimeout2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager,
} else {
sTrace("syncNodePingTimerCb: pingTimerStart:%u ", pSyncNode->pingTimerStart);
sTrace("syncNodeEqPingTimer: pingTimerEnable:%u ", pSyncNode->pingTimerEnable);
\ No newline at end of file
static void syncNodeEqElectTimer(void* param, void* tmrId) {}
static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {}
\ No newline at end of file
......@@ -20,6 +20,124 @@
void onMessage(SRaft* pRaft, void* pMsg) {}
// ---------------------------------------------
cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) {
cJSON* pRoot;
// in compiler optimization, switch case = if else constants
if (pRpcMsg->msgType == SYNC_TIMEOUT) {
SyncTimeout* pSyncMsg = (SyncTimeout*)pRpcMsg->pCont;
pRoot = syncTimeout2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_PING) {
SyncPing* pSyncMsg = (SyncPing*)pRpcMsg->pCont;
pRoot = syncPing2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_PING_REPLY) {
SyncPingReply* pSyncMsg = (SyncPingReply*)pRpcMsg->pCont;
pRoot = syncPingReply2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) {
pRoot = syncRpcUnknownMsg2Json();
} else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST_REPLY) {
pRoot = syncRpcUnknownMsg2Json();
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) {
SyncRequestVote* pSyncMsg = (SyncRequestVote*)pRpcMsg->pCont;
pRoot = syncRequestVote2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) {
SyncRequestVoteReply* pSyncMsg = (SyncRequestVoteReply*)pRpcMsg->pCont;
pRoot = syncRequestVoteReply2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) {
SyncAppendEntries* pSyncMsg = (SyncAppendEntries*)pRpcMsg->pCont;
pRoot = syncAppendEntries2Json(pSyncMsg);
} else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) {
SyncAppendEntriesReply* pSyncMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont;
pRoot = syncAppendEntriesReply2Json(pSyncMsg);
} else {
pRoot = syncRpcUnknownMsg2Json();
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "RpcMsg", pRoot);
return pJson;
cJSON* syncRpcUnknownMsg2Json() {
cJSON* pRoot = cJSON_CreateObject();
cJSON_AddNumberToObject(pRoot, "msgType", SYNC_UNKNOWN);
cJSON_AddStringToObject(pRoot, "data", "known message");
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncPing", pRoot);
return pJson;
// ---- message process SyncTimeout----
SyncTimeout* syncTimeoutBuild() {
uint32_t bytes = sizeof(SyncTimeout);
SyncTimeout* pMsg = malloc(bytes);
memset(pMsg, 0, bytes);
pMsg->bytes = bytes;
pMsg->msgType = SYNC_TIMEOUT;
return pMsg;
void syncTimeoutDestroy(SyncTimeout* pMsg) {
if (pMsg != NULL) {
void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen) {
assert(pMsg->bytes <= bufLen);
memcpy(buf, pMsg, pMsg->bytes);
void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg) {
memcpy(pMsg, buf, len);
assert(len == pMsg->bytes);
void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg) {
memset(pRpcMsg, 0, sizeof(*pRpcMsg));
pRpcMsg->msgType = pMsg->msgType;
pRpcMsg->contLen = pMsg->bytes;
pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen);
syncTimeoutSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen);
void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg) {
syncTimeoutDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg);
cJSON* syncTimeout2Json(const SyncTimeout* pMsg) {
char u64buf[128];
cJSON* pRoot = cJSON_CreateObject();
cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes);
cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType);
cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType);
snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data);
cJSON_AddStringToObject(pRoot, "data", u64buf);
cJSON* pJson = cJSON_CreateObject();
cJSON_AddItemToObject(pJson, "SyncTimeout", pRoot);
return pJson;
SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data) {
SyncTimeout* pMsg = syncTimeoutBuild();
pMsg->timeoutType = timeoutType;
pMsg->data = data;
return pMsg;
// ---- message process SyncPing----
SyncPing* syncPingBuild(uint32_t dataLen) {
uint32_t bytes = SYNC_PING_FIX_LEN + dataLen;
......@@ -14,3 +14,5 @@
#include "syncReplication.h"
void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {}
\ No newline at end of file
......@@ -68,8 +68,12 @@ void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaft
raftId->vgId = vgId;
bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) {
bool ret = pId1->addr == pId2->addr && pId1->vgId == pId2->vgId;
return ret;
// ---- SSyncBuffer -----
#if 0
void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len) {
syncBuf->len = len;
syncBuf->data = malloc(syncBuf->len);
......@@ -87,4 +91,3 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) {
dest->data = malloc(dest->len);
memcpy(dest->data, src->data, dest->len);
\ No newline at end of file
......@@ -14,3 +14,83 @@
#include "syncVoteMgr.h"
SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) {
SVotesGranted *pVotesGranted = malloc(sizeof(SVotesGranted));
assert(pVotesGranted != NULL);
memset(pVotesGranted, 0, sizeof(SVotesGranted));
pVotesGranted->quorum = pSyncNode->quorum;
pVotesGranted->term = 0;
pVotesGranted->votes = 0;
pVotesGranted->toLeader = false;
pVotesGranted->pSyncNode = pSyncNode;
return pVotesGranted;
void voteGrantedDestroy(SVotesGranted *pVotesGranted) {
if (pVotesGranted != NULL) {
bool voteGrantedMajority(SVotesGranted *pVotesGranted) {
bool ret = pVotesGranted->votes >= pVotesGranted->quorum;
return ret;
void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) {
assert(pMsg->voteGranted == true);
assert(pMsg->term == pVotesGranted->term);
void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) {
pVotesGranted->term = term;
pVotesGranted->votes = 0;
pVotesGranted->toLeader = false;
SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) {
SVotesRespond *pVotesRespond = malloc(sizeof(SVotesRespond));
assert(pVotesRespond != NULL);
memset(pVotesRespond, 0, sizeof(SVotesRespond));
pVotesRespond->replicas = &(pSyncNode->replicasId);
pVotesRespond->replicaNum = pSyncNode->replicaNum;
pVotesRespond->term = 0;
pVotesRespond->pSyncNode = pSyncNode;
return pVotesRespond;
bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) {
bool ret = false;
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
if (syncUtilSameId(&(*pVotesRespond->replicas)[i], pRaftId) && pVotesRespond->isRespond[i]) {
ret = true;
return ret;
void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg) {
assert(pVotesRespond->term == pMsg->term);
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
if (syncUtilSameId(&(*pVotesRespond->replicas)[i], &pMsg->srcId)) {
assert(pVotesRespond->isRespond[i] == false);
pVotesRespond->isRespond[i] = true;
void Reset(SVotesRespond *pVotesRespond, SyncTerm term) {
pVotesRespond->term = term;
for (int i = 0; i < pVotesRespond->replicaNum; ++i) {
pVotesRespond->isRespond[i] = false;
\ No newline at end of file
......@@ -8,6 +8,7 @@ add_executable(syncIOSendMsgTest "")
add_executable(syncIOSendMsgClientTest "")
add_executable(syncIOSendMsgServerTest "")
add_executable(syncRaftStoreTest "")
add_executable(syncEnqTest "")
......@@ -50,6 +51,10 @@ target_sources(syncRaftStoreTest
......@@ -102,6 +107,11 @@ target_include_directories(syncRaftStoreTest
......@@ -144,6 +154,10 @@ target_link_libraries(syncRaftStoreTest
#include <stdio.h>
#include "syncEnv.h"
#include "syncIO.h"
#include "syncInt.h"
#include "syncRaftStore.h"
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
uint16_t ports[3] = {7010, 7110, 7210};
SSyncNode* doSync(int myIndex) {
SSyncFSM* pFsm;
SSyncInfo syncInfo;
syncInfo.vgId = 1;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping");
SSyncCfg* pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = 3;
pCfg->nodeInfo[0].nodePort = ports[0];
snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
pCfg->nodeInfo[1].nodePort = ports[1];
snprintf(pCfg->nodeInfo[1].nodeFqdn, sizeof(pCfg->nodeInfo[1].nodeFqdn), "%s", "");
// taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn);
pCfg->nodeInfo[2].nodePort = ports[2];
snprintf(pCfg->nodeInfo[2].nodeFqdn, sizeof(pCfg->nodeInfo[2].nodeFqdn), "%s", "");
// taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn);
SSyncNode* pSyncNode = syncNodeOpen(&syncInfo);
assert(pSyncNode != NULL);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->pSyncNode = pSyncNode;
return pSyncNode;
void timerPingAll(void* param, void* tmrId) {
SSyncNode* pSyncNode = (SSyncNode*)param;
int main(int argc, char** argv) {
// taosInitLog((char*)"syncPingTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
int myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
if (myIndex > 2 || myIndex < 0) {
fprintf(stderr, "myIndex:%d error. should be 0 - 2", myIndex);
return 1;
int32_t ret = syncIOStart((char*)"", ports[myIndex]);
assert(ret == 0);
ret = syncEnvStart();
assert(ret == 0);
SSyncNode* pSyncNode = doSync(myIndex);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
for (int i = 0; i < 10; ++i) {
SyncPingReply* pSyncMsg = syncPingReplyBuild3(&pSyncNode->raftId, &pSyncNode->raftId);
SRpcMsg rpcMsg;
syncPingReply2RpcMsg(pSyncMsg, &rpcMsg);
pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg);
while (1) {
return 0;
......@@ -22,6 +22,8 @@ SSyncNode* doSync(int myIndex) {
syncInfo.vgId = 1;
syncInfo.rpcClient = gSyncIO->clientRpc;
syncInfo.FpSendMsg = syncIOSendMsg;
syncInfo.queue = gSyncIO->pMsgQ;
syncInfo.FpEqMsg = syncIOEqMsg;
syncInfo.pFsm = pFsm;
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping");
......@@ -76,15 +78,14 @@ int main(int argc, char** argv) {
SSyncNode* pSyncNode = doSync(myIndex);
gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
ret = syncNodeStartPingTimer(pSyncNode);
assert(ret == 0);
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
ret = syncNodeStopPingTimer(pSyncNode);
assert(ret == 0);
while (1) {
......@@ -87,7 +87,7 @@ int32_t taosGetCpuCores(float *numOfCores) {
return 0;
int32_t taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) {
int32_t taosGetCpuUsage(double *sysCpuUsage, double *procCpuUsage) {
*sysCpuUsage = 0;
*procCpuUsage = 0;
return 0;
......@@ -112,64 +112,32 @@ int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) {
int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
if (bytes) *bytes = 0;
if (rbytes) *rbytes = 0;
if (tbytes) *tbytes = 0;
int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) {
*receive_bytes = 0;
*transmit_bytes = 0;
return 0;
int32_t taosGetBandSpeed(float *bandSpeedKb) {
*bandSpeedKb = 0;
return 0;
int32_t taosReadProcIO(int64_t *readbyte, int64_t *writebyte) {
int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) {
IO_COUNTERS io_counter;
if (GetProcessIoCounters(GetCurrentProcess(), &io_counter)) {
if (readbyte) *readbyte = io_counter.ReadTransferCount;
if (writebyte) *writebyte = io_counter.WriteTransferCount;
if (rchars) *rchars = io_counter.ReadTransferCount;
if (wchars) *wchars = io_counter.WriteTransferCount;
if (read_bytes) *read_bytes = 0;
if (write_bytes) *write_bytes = 0;
return 0;
return -1;
int32_t taosGetProcIO(float *readKB, float *writeKB) {
static int64_t lastReadbyte = -1;
static int64_t lastWritebyte = -1;
int64_t curReadbyte = 0;
int64_t curWritebyte = 0;
if (taosReadProcIO(&curReadbyte, &curWritebyte) != 0) {
return -1;
if (lastReadbyte == -1 || lastWritebyte == -1) {
lastReadbyte = curReadbyte;
lastWritebyte = curWritebyte;
return -1;
*readKB = (float)((double)(curReadbyte - lastReadbyte) / 1024);
*writeKB = (float)((double)(curWritebyte - lastWritebyte) / 1024);
if (*readKB < 0) *readKB = 0;
if (*writeKB < 0) *writeKB = 0;
lastReadbyte = curReadbyte;
lastWritebyte = curWritebyte;
return 0;
void taosGetSystemInfo() {
float tmp1, tmp2;
double tmp1, tmp2, tmp3, tmp4;
taosGetBandSpeed(&tmp1, &tmp2);
taosGetCpuUsage(&tmp1, &tmp2);
taosGetProcIO(&tmp1, &tmp2);
taosGetIOSpeed(&tmp1, &tmp2, &tmp3, &tmp4);
void taosKillSystem() {
......@@ -259,31 +227,21 @@ void taosGetSystemInfo() {
tsNumOfCores = sysconf(_SC_NPROCESSORS_ONLN);
int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars) {
int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) {
if (rchars) *rchars = 0;
if (wchars) *wchars = 0;
if (read_bytes) *read_bytes = 0;
if (write_bytes) *write_bytes = 0;
return 0;
int32_t taosGetProcIO(float *readKB, float *writeKB) {
*readKB = 0;
*writeKB = 0;
int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) {
*receive_bytes = 0;
*transmit_bytes = 0;
return 0;
int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
if (bytes) *bytes = 0;
if (rbytes) *rbytes = 0;
if (tbytes) *tbytes = 0;
return 0;
int32_t taosGetBandSpeed(float *bandSpeedKb) {
*bandSpeedKb = 0;
return 0;
int32_t taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) {
int32_t taosGetCpuUsage(double *sysCpuUsage, double *procCpuUsage) {
*sysCpuUsage = 0;
*procCpuUsage = 0;
return 0;
......@@ -400,7 +358,6 @@ int32_t taosGetSysMemory(int64_t *usedKB) {
int32_t taosGetProcMemory(int64_t *usedKB) {
// FILE *fp = fopen(tsProcMemFile, "r");
TdFilePtr pFile = taosOpenFile(tsProcMemFile, TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
// printf("open file:%s failed", tsProcMemFile);
......@@ -434,7 +391,6 @@ int32_t taosGetProcMemory(int64_t *usedKB) {
static int32_t taosGetSysCpuInfo(SysCpuInfo *cpuInfo) {
// FILE *fp = fopen(tsSysCpuFile, "r");
TdFilePtr pFile = taosOpenFile(tsSysCpuFile, TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
// printf("open file:%s failed", tsSysCpuFile);
......@@ -459,7 +415,6 @@ static int32_t taosGetSysCpuInfo(SysCpuInfo *cpuInfo) {
static int32_t taosGetProcCpuInfo(ProcCpuInfo *cpuInfo) {
// FILE *fp = fopen(tsProcCpuFile, "r");
TdFilePtr pFile = taosOpenFile(tsProcCpuFile, TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
// printf("open file:%s failed", tsProcCpuFile);
......@@ -493,7 +448,7 @@ int32_t taosGetCpuCores(float *numOfCores) {
return 0;
int32_t taosGetCpuUsage(float *cpu_system, float *cpu_engine) {
int32_t taosGetCpuUsage(double *cpu_system, double *cpu_engine) {
static uint64_t lastSysUsed = 0;
static uint64_t lastSysTotal = 0;
static uint64_t lastProcTotal = 0;
......@@ -522,8 +477,8 @@ int32_t taosGetCpuUsage(float *cpu_system, float *cpu_engine) {
return -1;
*cpu_engine = (float)((double)(curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100);
*cpu_system = (float)((double)(curProcTotal - lastProcTotal) / (double)(curSysTotal - lastSysTotal) * 100);
*cpu_engine = (curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100;
*cpu_system = (curProcTotal - lastProcTotal) / (double)(curSysTotal - lastSysTotal) * 100;
lastSysUsed = curSysUsed;
lastSysTotal = curSysTotal;
......@@ -544,14 +499,9 @@ int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) {
int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
if (bytes) *bytes = 0;
// FILE *fp = fopen(tsSysNetFile, "r");
int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) {
TdFilePtr pFile = taosOpenFile(tsSysNetFile, TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
// printf("open file:%s failed", tsSysNetFile);
return -1;
if (pFile == NULL) return -1;
ssize_t _bytes = 0;
char *line = NULL;
......@@ -584,9 +534,8 @@ int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
"%s %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64
" %" PRId64,
nouse0, &o_rbytes, &rpackts, &nouse1, &nouse2, &nouse3, &nouse4, &nouse5, &nouse6, &o_tbytes, &tpackets);
if (rbytes) *rbytes = o_rbytes;
if (tbytes) *tbytes = o_tbytes;
if (bytes) *bytes += (o_rbytes + o_tbytes);
*receive_bytes = o_rbytes;
*transmit_bytes = o_tbytes;
if (line != NULL) tfree(line);
......@@ -595,58 +544,52 @@ int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) {
return 0;
int32_t taosGetBandSpeed(float *bandSpeedKb) {
static int64_t lastBytes = 0;
static time_t lastTime = 0;
int64_t curBytes = 0;
time_t curTime = time(NULL);
int32_t taosGetBandSpeed(double *receive_bytes_per_sec, double *transmit_bytes_per_sec) {
static int64_t last_receive_bytes = 0;
static int64_t last_transmit_bytes = 0;
static int64_t last_time = 0;
int64_t cur_receive_bytes = 0;
int64_t cur_transmit_bytes = 0;
int64_t cur_time = taosGetTimestampMs();
if (taosGetCardInfo(&curBytes, NULL, NULL) != 0) {
if (taosGetCardInfo(&cur_receive_bytes, &cur_transmit_bytes) != 0) {
return -1;
if (lastTime == 0 || lastBytes == 0) {
lastTime = curTime;
lastBytes = curBytes;
*bandSpeedKb = 0;
if (last_time == 0 || last_time >= cur_time) {
last_time = cur_time;
last_receive_bytes = cur_receive_bytes;
last_transmit_bytes = cur_transmit_bytes;
*receive_bytes_per_sec = 0;
*transmit_bytes_per_sec = 0;
return 0;
if (lastTime >= curTime || lastBytes > curBytes) {
lastTime = curTime;
lastBytes = curBytes;
*bandSpeedKb = 0;
return 0;
*receive_bytes_per_sec = (cur_receive_bytes - last_receive_bytes) / (double)(cur_time - last_time) * 1000;
*transmit_bytes_per_sec = (cur_transmit_bytes - last_transmit_bytes) / (double)(cur_time - last_time) * 1000;
double totalBytes = (double)(curBytes - lastBytes) / 1024 * 8; // Kb
*bandSpeedKb = (float)(totalBytes / (double)(curTime - lastTime));
last_time = cur_time;
last_transmit_bytes = cur_transmit_bytes;
last_receive_bytes = cur_receive_bytes;
// //printf("bandwidth lastBytes:%ld, lastTime:%ld, curBytes:%ld, curTime:%ld,
// speed:%f", lastBytes, lastTime, curBytes, curTime, *bandSpeed);
lastTime = curTime;
lastBytes = curBytes;
if (*receive_bytes_per_sec < 0) *receive_bytes_per_sec = 0;
if (*transmit_bytes_per_sec < 0) *transmit_bytes_per_sec = 0;
return 0;
int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars) {
// FILE *fp = fopen(tsProcIOFile, "r");
int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) {
TdFilePtr pFile = taosOpenFile(tsProcIOFile, TD_FILE_READ | TD_FILE_STREAM);
if (pFile == NULL) {
// printf("open file:%s failed", tsProcIOFile);
return -1;
if (pFile == NULL) return -1;
ssize_t _bytes = 0;
char *line = NULL;
char tmp[10];
char tmp[24];
int readIndex = 0;
while (!taosEOFFile(pFile)) {
_bytes = taosGetLineFile(pFile, &line);
if ((_bytes < 0) || (line == NULL)) {
if (_bytes < 10 || line == NULL) {
if (strstr(line, "rchar:") != NULL) {
......@@ -655,47 +598,70 @@ int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars) {
} else if (strstr(line, "wchar:") != NULL) {
sscanf(line, "%s %" PRId64, tmp, wchars);
} else if (strstr(line, "read_bytes:") != NULL) { // read_bytes
sscanf(line, "%s %" PRId64, tmp, read_bytes);
} else if (strstr(line, "write_bytes:") != NULL) { // write_bytes
sscanf(line, "%s %" PRId64, tmp, write_bytes);
} else {
if (readIndex >= 2) break;
if (readIndex >= 4) break;
if (line != NULL) tfree(line);
if (readIndex < 2) {
// printf("read file:%s failed", tsProcIOFile);
if (readIndex < 4) {
return -1;
return 0;
int32_t taosGetProcIO(float *readKB, float *writeKB) {
static int64_t lastReadbyte = -1;
static int64_t lastWritebyte = -1;
int32_t taosGetIOSpeed(double *rchar_per_sec, double *wchar_per_sec, double *read_bytes_per_sec,
double *write_bytes_per_sec) {
static int64_t last_rchar = -1;
static int64_t last_wchar = -1;
static int64_t last_read_bytes = -1;
static int64_t last_write_bytes = -1;
static int64_t last_time = 0;
int64_t curReadbyte = 0;
int64_t curWritebyte = 0;
int64_t cur_rchar = 0;
int64_t cur_wchar = 0;
int64_t cur_read_bytes = 0;
int64_t cur_write_bytes = 0;
int64_t cur_time = taosGetTimestampMs();
if (taosReadProcIO(&curReadbyte, &curWritebyte) != 0) {
if (taosReadProcIO(&cur_rchar, &cur_wchar, &cur_read_bytes, &cur_write_bytes) != 0) {
return -1;
if (lastReadbyte == -1 || lastWritebyte == -1) {
lastReadbyte = curReadbyte;
lastWritebyte = curWritebyte;
if (last_time == 0 || last_time >= cur_time) {
last_time = cur_time;
last_rchar = cur_rchar;
last_wchar = cur_wchar;
last_read_bytes = cur_read_bytes;
last_write_bytes = cur_write_bytes;
return -1;
*readKB = (float)((double)(curReadbyte - lastReadbyte) / 1024);
*writeKB = (float)((double)(curWritebyte - lastWritebyte) / 1024);
if (*readKB < 0) *readKB = 0;
if (*writeKB < 0) *writeKB = 0;
*rchar_per_sec = (cur_rchar - last_rchar) / (double)(cur_time - last_time) * 1000;
*wchar_per_sec = (cur_wchar - last_wchar) / (double)(cur_time - last_time) * 1000;
*read_bytes_per_sec = (cur_read_bytes - last_read_bytes) / (double)(cur_time - last_time) * 1000;
*write_bytes_per_sec = (cur_write_bytes - last_write_bytes) / (double)(cur_time - last_time) * 1000;
last_time = cur_time;
last_rchar = cur_rchar;
last_wchar = cur_wchar;
last_read_bytes = cur_read_bytes;
last_write_bytes = cur_write_bytes;
lastReadbyte = curReadbyte;
lastWritebyte = curWritebyte;
if (*rchar_per_sec < 0) *rchar_per_sec = 0;
if (*wchar_per_sec < 0) *wchar_per_sec = 0;
if (*read_bytes_per_sec < 0) *read_bytes_per_sec = 0;
if (*write_bytes_per_sec < 0) *write_bytes_per_sec = 0;
return 0;
......@@ -705,10 +671,10 @@ void taosGetSystemInfo() {
float tmp1, tmp2;
double tmp1, tmp2, tmp3, tmp4;
taosGetBandSpeed(&tmp1, &tmp2);
taosGetCpuUsage(&tmp1, &tmp2);
taosGetProcIO(&tmp1, &tmp2);
taosGetIOSpeed(&tmp1, &tmp2, &tmp3, &tmp4);
void taosKillSystem() {
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
想要评论请 注册