diff --git a/include/libs/monitor/monitor.h b/include/libs/monitor/monitor.h index 0041f3ac5fdb42ad7ac9337e32be265fa8a541ac..1695edd98375685ad735a85cca9947c65561ea33 100644 --- a/include/libs/monitor/monitor.h +++ b/include/libs/monitor/monitor.h @@ -86,21 +86,21 @@ typedef struct { typedef struct { float uptime; // day - float cpu_engine; - float cpu_system; + double cpu_engine; + double cpu_system; float cpu_cores; - int64_t mem_engine; // KB - int64_t mem_system; // KB - int64_t mem_total; // KB - float disk_engine; // GB - float disk_used; // GB - float disk_total; // GB - int64_t net_in; - int64_t net_out; - float io_read; - float io_write; - float io_read_disk; - float io_write_disk; + int64_t mem_engine; // KB + int64_t mem_system; // KB + int64_t mem_total; // KB + int64_t disk_engine; // Byte + int64_t disk_used; // Byte + int64_t disk_total; // Byte + double net_in; // bytes per second + double net_out; // bytes per second + double io_read; + double io_write; + double io_read_disk; + double io_write_disk; int32_t req_select; float req_select_rate; int32_t req_insert; diff --git a/include/libs/sync/sync.h b/include/libs/sync/sync.h index 23e3ef752569c5beca0b603c7b13cabf924bbd7b..fddf18c571f2c54f0cb9c79b98a4a173a4c55066 100644 --- a/include/libs/sync/sync.h +++ b/include/libs/sync/sync.h @@ -138,6 +138,8 @@ typedef struct SSyncInfo { void* rpcClient; int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); + void* queue; + int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg); } SSyncInfo; @@ -147,13 +149,10 @@ typedef struct SSyncNode SSyncNode; int32_t syncInit(); void syncCleanUp(); -int64_t syncStart(const SSyncInfo* pSyncInfo); -void syncStop(int64_t rid); -int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); - -int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); -// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak); - +int64_t syncStart(const SSyncInfo* pSyncInfo); +void syncStop(int64_t rid); +int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg); +int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak); ESyncState syncGetMyRole(int64_t rid); void syncGetNodesRole(int64_t rid, SNodesRole* pNodeRole); diff --git a/include/os/osSysinfo.h b/include/os/osSysinfo.h index e14dba8269c7ddd8573f5a43e4dad29da23f1665..9081fa9715390530fed9463f7df98b15446be798 100644 --- a/include/os/osSysinfo.h +++ b/include/os/osSysinfo.h @@ -38,15 +38,15 @@ int32_t taosGetEmail(char *email, int32_t maxLen); int32_t taosGetOsReleaseName(char *releaseName, int32_t maxLen); int32_t taosGetCpuInfo(char *cpuModel, int32_t maxLen, float *numOfCores); int32_t taosGetCpuCores(float *numOfCores); -int32_t taosGetCpuUsage(float *cpu_system, float *cpu_engine); +int32_t taosGetCpuUsage(double *cpu_system, double *cpu_engine); int32_t taosGetTotalMemory(int64_t *totalKB); int32_t taosGetProcMemory(int64_t *usedKB); int32_t taosGetSysMemory(int64_t *usedKB); int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize); -int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars); -int32_t taosGetProcIO(float *readKB, float *writeKB); -int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes); -int32_t taosGetBandSpeed(float *bandSpeedKb); +int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes); +int32_t taosGetIOSpeed(double *readKB, double *writeKB, double *readDiskKB, double *writeDiskKB); +int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes); +int32_t taosGetBandSpeed(double *receive_bytes_per_sec, double *transmit_bytes_per_sec); int32_t taosSystem(const char *cmd); void taosKillSystem(); diff --git a/source/dnode/mgmt/impl/inc/dndEnv.h b/source/dnode/mgmt/impl/inc/dndEnv.h index efe54ee1e592a0d4f7d582088b4b4a08086ff96d..b9acbea02f14515d93ecb753821d03b9928401da 100644 --- a/source/dnode/mgmt/impl/inc/dndEnv.h +++ b/source/dnode/mgmt/impl/inc/dndEnv.h @@ -108,6 +108,7 @@ typedef struct { SHashObj *hash; int32_t openVnodes; int32_t totalVnodes; + int32_t masterNum; SRWLatch latch; SQWorkerPool queryPool; SFWorkerPool fetchPool; diff --git a/source/dnode/mgmt/impl/src/dndMgmt.c b/source/dnode/mgmt/impl/src/dndMgmt.c index 60cfdc299c21ce25f574162b06730bc61e2207a9..ff9eb5d884eefe439c5b3a2a3d90ab6f5c630e6e 100644 --- a/source/dnode/mgmt/impl/src/dndMgmt.c +++ b/source/dnode/mgmt/impl/src/dndMgmt.c @@ -487,12 +487,10 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { taosGetSysMemory(&pInfo->mem_system); pInfo->mem_total = tsTotalMemoryKB; pInfo->disk_engine = 0; - pInfo->disk_used = tsDataSpace.size.used / (1024 * 1024 * 1024.0); - pInfo->disk_total = tsDataSpace.size.avail / (1024 * 1024 * 1024.0); - taosGetCardInfo(NULL, &pInfo->net_in, &pInfo->net_out); - taosGetProcIO(&pInfo->io_read, &pInfo->io_write); - pInfo->io_read_disk = 0; - pInfo->io_write_disk = 0; + pInfo->disk_used = tsDataSpace.size.used; + pInfo->disk_total = tsDataSpace.size.total; + taosGetBandSpeed(&pInfo->net_in, &pInfo->net_out); + taosGetIOSpeed(&pInfo->io_read, &pInfo->io_write, &pInfo->io_read_disk, &pInfo->io_write_disk); pInfo->req_select = 0; pInfo->req_select_rate = 0; pInfo->req_insert = 0; @@ -501,9 +499,9 @@ static void dndGetMonitorDnodeInfo(SDnode *pDnode, SMonDnodeInfo *pInfo) { pInfo->req_insert_batch = 0; pInfo->req_insert_batch_success = 0; pInfo->req_insert_batch_rate = 0; - pInfo->errors = 0; - pInfo->vnodes_num = 0; - pInfo->masters = 0; + pInfo->errors = tsNumOfErrorLogs; + pInfo->vnodes_num = pDnode->vmgmt.totalVnodes; + pInfo->masters = pDnode->vmgmt.masterNum; pInfo->has_mnode = dndIsMnode(pDnode); } diff --git a/source/dnode/mgmt/impl/src/dndVnodes.c b/source/dnode/mgmt/impl/src/dndVnodes.c index 1c7bac4b6b4754821f2ecb11fd238dc99d97e772..28bc615abab0562dab86301977b6dc5c55f2601f 100644 --- a/source/dnode/mgmt/impl/src/dndVnodes.c +++ b/source/dnode/mgmt/impl/src/dndVnodes.c @@ -17,6 +17,7 @@ #include "dndVnodes.h" #include "dndMgmt.h" #include "dndTransport.h" +#include "sync.h" typedef struct { int32_t vgId; @@ -979,6 +980,8 @@ void dndCleanupVnodes(SDnode *pDnode) { void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) { SVnodesMgmt *pMgmt = &pDnode->vmgmt; + int32_t totalVnodes = 0; + int32_t masterNum = 0; taosRLockLatch(&pMgmt->latch); @@ -993,8 +996,12 @@ void dndGetVnodeLoads(SDnode *pDnode, SArray *pLoads) { vnodeGetLoad(pVnode->pImpl, &vload); taosArrayPush(pLoads, &vload); + totalVnodes++; + if (vload.role == TAOS_SYNC_STATE_LEADER) masterNum++; pIter = taosHashIterate(pMgmt->hash, pIter); } taosRUnLockLatch(&pMgmt->latch); + pMgmt->totalVnodes = totalVnodes; + pMgmt->masterNum = masterNum; } diff --git a/source/libs/index/src/index.c b/source/libs/index/src/index.c index 57e965a753c08ca323a364d2ac016209ddf4045d..75dc05fc5b8cfdf254eebf9383723f82a3507bed 100644 --- a/source/libs/index/src/index.c +++ b/source/libs/index/src/index.c @@ -16,6 +16,7 @@ #include "index.h" #include "indexInt.h" #include "index_cache.h" +#include "index_comm.h" #include "index_tfile.h" #include "index_util.h" #include "tdef.h" @@ -30,8 +31,6 @@ void* indexQhandle = NULL; -static char JSON_COLUMN[] = "JSON"; - #define INDEX_MERGE_ADD_DEL(src, dst, tgt) \ { \ bool f = false; \ @@ -64,13 +63,11 @@ typedef struct SIdxColInfo { int cVersion; } SIdxColInfo; -typedef struct SIdxMergeHelper { - char* colVal; +typedef struct SIdxTempResult { SArray* total; SArray* added; SArray* deled; - bool reset; -} SIdxMergeHelper; +} SIdxTempResult; static pthread_once_t isInit = PTHREAD_ONCE_INIT; // static void indexInit(); @@ -82,8 +79,7 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oTyp static int indexGenTFile(SIndex* index, IndexCache* cache, SArray* batch); // merge cache and tfile by opera type -static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxMergeHelper* helper); -static void indexMergeSameKey(SArray* result, TFileValue* tv, SIdxMergeHelper* helper); +static void indexMergeCacheAndTFile(SArray* result, IterateValue* icache, IterateValue* iTfv, SIdxTempResult* helper); // static int32_t indexSerialTermKey(SIndexTerm* itm, char* buf); // int32_t indexSerialKey(ICacheKey* key, char* buf); @@ -399,7 +395,6 @@ static void indexInterResultsDestroy(SArray* results) { static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType, SArray* fResults) { // refactor, merge interResults into fResults by oType - for (int i = 0; i < taosArrayGetSize(interResults); i--) { SArray* t = taosArrayGetP(interResults, i); taosArraySort(t, uidCompare); @@ -418,98 +413,82 @@ static int indexMergeFinalResults(SArray* interResults, EIndexOperatorType oType return 0; } -SIdxMergeHelper* sIdxMergeHelperCreate() { - SIdxMergeHelper* hp = calloc(1, sizeof(SIdxMergeHelper)); - hp->total = taosArrayInit(4, sizeof(uint64_t)); - hp->added = taosArrayInit(4, sizeof(uint64_t)); - hp->deled = taosArrayInit(4, sizeof(uint64_t)); - hp->reset = false; - return hp; +SIdxTempResult* sIdxTempResultCreate() { + SIdxTempResult* tr = calloc(1, sizeof(SIdxTempResult)); + tr->total = taosArrayInit(4, sizeof(uint64_t)); + tr->added = taosArrayInit(4, sizeof(uint64_t)); + tr->deled = taosArrayInit(4, sizeof(uint64_t)); + return tr; } -void sIdxMergeHelperClear(SIdxMergeHelper* hp) { - if (hp == NULL) { +void sIdxTempResultClear(SIdxTempResult* tr) { + if (tr == NULL) { return; } - hp->reset = false; - taosArrayClear(hp->total); - taosArrayClear(hp->added); - taosArrayClear(hp->deled); + taosArrayClear(tr->total); + taosArrayClear(tr->added); + taosArrayClear(tr->deled); } -void sIdxMergeHelperDestroy(SIdxMergeHelper* hp) { - if (hp == NULL) { +void sIdxTempResultDestroy(SIdxTempResult* tr) { + if (tr == NULL) { return; } - taosArrayDestroy(hp->total); - taosArrayDestroy(hp->added); - taosArrayDestroy(hp->deled); + taosArrayDestroy(tr->total); + taosArrayDestroy(tr->added); + taosArrayDestroy(tr->deled); } -static void indexMergeSameKey(SArray* result, TFileValue* tv, SIdxMergeHelper* helper) { - int32_t sz = result ? taosArrayGetSize(result) : 0; - if (sz > 0) { - // TODO(yihao): remove duplicate tableid - TFileValue* lv = taosArrayGetP(result, sz - 1); - // indexError("merge colVal: %s", lv->colVal); - if (strcmp(lv->colVal, tv->colVal) == 0) { - taosArrayAddAll(lv->tableId, tv->tableId); - tfileValueDestroy(tv); - } else { - taosArrayPush(result, &tv); - } - } else { - taosArrayPush(result, &tv); - } -} -static void sIdxMergeResult(SArray* result, SIdxMergeHelper* mh) { - taosArraySort(mh->total, uidCompare); - taosArraySort(mh->added, uidCompare); - taosArraySort(mh->deled, uidCompare); +static void sIdxTempResultMergeTo(SArray* result, SIdxTempResult* tr) { + taosArraySort(tr->total, uidCompare); + taosArraySort(tr->added, uidCompare); + taosArraySort(tr->deled, uidCompare); SArray* arrs = taosArrayInit(2, sizeof(void*)); - taosArrayPush(arrs, &mh->total); - taosArrayPush(arrs, &mh->added); + taosArrayPush(arrs, &tr->total); + taosArrayPush(arrs, &tr->added); iUnion(arrs, result); taosArrayDestroy(arrs); - iExcept(result, mh->deled); + iExcept(result, tr->deled); } -static void indexMayMergeToFinalResult(SArray* result, TFileValue* tfv, SIdxMergeHelper* help) { +static void indexMayMergeTempToFinalResult(SArray* result, TFileValue* tfv, SIdxTempResult* tr) { int32_t sz = taosArrayGetSize(result); if (sz > 0) { TFileValue* lv = taosArrayGetP(result, sz - 1); if (tfv != NULL && strcmp(lv->colVal, tfv->colVal) != 0) { - sIdxMergeResult(lv->tableId, help); - sIdxMergeHelperClear(help); + sIdxTempResultMergeTo(lv->tableId, tr); + sIdxTempResultClear(tr); taosArrayPush(result, &tfv); } else if (tfv == NULL) { - sIdxMergeResult(lv->tableId, help); + // handle last iterator + sIdxTempResultMergeTo(lv->tableId, tr); } else { + // temp result saved in help tfileValueDestroy(tfv); } } else { taosArrayPush(result, &tfv); } } -static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxMergeHelper* mh) { +static void indexMergeCacheAndTFile(SArray* result, IterateValue* cv, IterateValue* tv, SIdxTempResult* tr) { char* colVal = (cv != NULL) ? cv->colVal : tv->colVal; TFileValue* tfv = tfileValueCreate(colVal); - indexMayMergeToFinalResult(result, tfv, mh); + indexMayMergeTempToFinalResult(result, tfv, tr); if (cv != NULL) { uint64_t id = *(uint64_t*)taosArrayGet(cv->val, 0); if (cv->type == ADD_VALUE) { - INDEX_MERGE_ADD_DEL(mh->deled, mh->added, id) + INDEX_MERGE_ADD_DEL(tr->deled, tr->added, id) } else if (cv->type == DEL_VALUE) { - INDEX_MERGE_ADD_DEL(mh->added, mh->deled, id) + INDEX_MERGE_ADD_DEL(tr->added, tr->deled, id) } } if (tv != NULL) { - taosArrayAddAll(mh->total, tv->val); + taosArrayAddAll(tr->total, tv->val); } } -static void indexDestroyTempResult(SArray* result) { +static void indexDestroyFinalResult(SArray* result) { int32_t sz = result ? taosArrayGetSize(result) : 0; for (size_t i = 0; i < sz; i++) { TFileValue* tv = taosArrayGetP(result, i); @@ -543,7 +522,7 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { bool cn = cacheIter ? cacheIter->next(cacheIter) : false; bool tn = tfileIter ? tfileIter->next(tfileIter) : false; - SIdxMergeHelper* help = sIdxMergeHelperCreate(); + SIdxTempResult* tr = sIdxTempResultCreate(); while (cn == true || tn == true) { IterateValue* cv = (cn == true) ? cacheIter->getValue(cacheIter) : NULL; IterateValue* tv = (tn == true) ? tfileIter->getValue(tfileIter) : NULL; @@ -557,21 +536,22 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { comp = 1; } if (comp == 0) { - indexMergeCacheAndTFile(result, cv, tv, help); + indexMergeCacheAndTFile(result, cv, tv, tr); cn = cacheIter->next(cacheIter); tn = tfileIter->next(tfileIter); } else if (comp < 0) { - indexMergeCacheAndTFile(result, cv, NULL, help); + indexMergeCacheAndTFile(result, cv, NULL, tr); cn = cacheIter->next(cacheIter); } else { - indexMergeCacheAndTFile(result, NULL, tv, help); + indexMergeCacheAndTFile(result, NULL, tv, tr); tn = tfileIter->next(tfileIter); } } - indexMayMergeToFinalResult(result, NULL, help); + indexMayMergeTempToFinalResult(result, NULL, tr); + sIdxTempResultDestroy(tr); int ret = indexGenTFile(sIdx, pCache, result); - indexDestroyTempResult(result); + indexDestroyFinalResult(result); indexCacheDestroyImm(pCache); @@ -581,8 +561,6 @@ int indexFlushCacheToTFile(SIndex* sIdx, void* cache) { tfileReaderUnRef(pReader); indexCacheUnRef(pCache); - sIdxMergeHelperDestroy(help); - int64_t cost = taosGetTimestampUs() - st; if (ret != 0) { indexError("failed to merge, time cost: %" PRId64 "ms", cost / 1000); diff --git a/source/libs/index/src/index_tfile.c b/source/libs/index/src/index_tfile.c index 6cfc199afd7fefcef3267806989402bb62da324c..f5f46b061714fa372a2aa52607598cf69214990e 100644 --- a/source/libs/index/src/index_tfile.c +++ b/source/libs/index/src/index_tfile.c @@ -102,7 +102,6 @@ void tfileCacheDestroy(TFileCache* tcache) { if (tcache == NULL) { return; } - // free table cache TFileReader** reader = taosHashIterate(tcache->tableCache, NULL); while (reader) { diff --git a/source/libs/index/test/jsonUT.cc b/source/libs/index/test/jsonUT.cc index e5c79d137f43cb7350396211d81c71ec734e3957..df9f8b8439db379e99cfc96b69ea150b7a51577a 100644 --- a/source/libs/index/test/jsonUT.cc +++ b/source/libs/index/test/jsonUT.cc @@ -105,6 +105,22 @@ TEST_F(JsonEnv, testWriteMillonData) { } indexMultiTermDestroy(terms); } + { + std::string colName("voltagefdadfa"); + std::string colVal("abxxxxxxxxxxxx"); + for (uint i = 0; i < 1000; i++) { + colVal[i % colVal.size()] = '0' + i % 128; + SIndexTerm* term = indexTermCreate(1, ADD_VALUE, TSDB_DATA_TYPE_BINARY, colName.c_str(), colName.size(), + colVal.c_str(), colVal.size()); + + SIndexMultiTerm* terms = indexMultiTermCreate(); + indexMultiTermAdd(terms, term); + for (size_t i = 0; i < 1000; i++) { + tIndexJsonPut(index, terms, i); + } + indexMultiTermDestroy(terms); + } + } { std::string colName("voltagefdadfa"); std::string colVal("abxxxxxxxxxxxx"); diff --git a/source/libs/sync/inc/syncElection.h b/source/libs/sync/inc/syncElection.h index 7e9e637854f29f35650761e4411357099396d9d4..ed5b86fa98a78b38997ee45388ce2126cac908b8 100644 --- a/source/libs/sync/inc/syncElection.h +++ b/source/libs/sync/inc/syncElection.h @@ -26,6 +26,9 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" +void syncNodeElect(SSyncNode* pSyncNode); +void syncNodeRequestVotePeers(SSyncNode* pSyncNode); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncIO.h b/source/libs/sync/inc/syncIO.h index 238948b403b58cf8142752507872b7b2fb856423..160fefd086cc7e4a0334e2156883da1ba5034046 100644 --- a/source/libs/sync/inc/syncIO.h +++ b/source/libs/sync/inc/syncIO.h @@ -49,6 +49,7 @@ typedef struct SSyncIO { int32_t (*FpOnSyncRequestVoteReply)(SSyncNode *pSyncNode, SyncRequestVoteReply *pMsg); int32_t (*FpOnSyncAppendEntries)(SSyncNode *pSyncNode, SyncAppendEntries *pMsg); int32_t (*FpOnSyncAppendEntriesReply)(SSyncNode *pSyncNode, SyncAppendEntriesReply *pMsg); + int32_t (*FpOnSyncTimeout)(SSyncNode *pSyncNode, SyncTimeout *pMsg); int8_t isStart; @@ -58,9 +59,10 @@ extern SSyncIO *gSyncIO; int32_t syncIOStart(char *host, uint16_t port); int32_t syncIOStop(); -int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg); int32_t syncIOTickQ(); int32_t syncIOTickPing(); +int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg); +int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncInt.h b/source/libs/sync/inc/syncInt.h index e276bbd23495c7db8c94dad194dc0468b66e6721..15ed5503ebe7fe2e927826711a384deec611fb3c 100644 --- a/source/libs/sync/inc/syncInt.h +++ b/source/libs/sync/inc/syncInt.h @@ -69,6 +69,9 @@ extern "C" { struct SRaft; typedef struct SRaft SRaft; +struct SyncTimeout; +typedef struct SyncTimeout SyncTimeout; + struct SyncPing; typedef struct SyncPing SyncPing; @@ -111,17 +114,22 @@ typedef struct SSyncNode { char path[TSDB_FILENAME_LEN]; void* rpcClient; int32_t (*FpSendMsg)(void* rpcClient, const SEpSet* pEpSet, SRpcMsg* pMsg); + void* queue; + int32_t (*FpEqMsg)(void* queue, SRpcMsg* pMsg); // init internal SNodeInfo me; + SRaftId raftId; + int32_t peersNum; SNodeInfo peers[TSDB_MAX_REPLICA]; + SRaftId peersId[TSDB_MAX_REPLICA]; + + int32_t replicaNum; + SRaftId replicasId[TSDB_MAX_REPLICA]; // raft algorithm SSyncFSM* pFsm; - SRaftId raftId; - SRaftId peersId[TSDB_MAX_REPLICA]; - int32_t replicaNum; int32_t quorum; // life cycle @@ -147,19 +155,19 @@ typedef struct SSyncNode { // timer tmr_h pPingTimer; int32_t pingTimerMS; - uint8_t pingTimerStart; + uint8_t pingTimerEnable; TAOS_TMR_CALLBACK FpPingTimer; // Timer Fp uint64_t pingTimerCounter; tmr_h pElectTimer; int32_t electTimerMS; - uint8_t electTimerStart; + uint8_t electTimerEnable; TAOS_TMR_CALLBACK FpElectTimer; // Timer Fp uint64_t electTimerCounter; tmr_h pHeartbeatTimer; int32_t heartbeatTimerMS; - uint8_t heartbeatTimerStart; + uint8_t heartbeatTimerEnable; TAOS_TMR_CALLBACK FpHeartbeatTimer; // Timer Fp uint64_t heartbeatTimerCounter; @@ -170,6 +178,7 @@ typedef struct SSyncNode { int32_t (*FpOnRequestVoteReply)(SSyncNode* ths, SyncRequestVoteReply* pMsg); int32_t (*FpOnAppendEntries)(SSyncNode* ths, SyncAppendEntries* pMsg); int32_t (*FpOnAppendEntriesReply)(SSyncNode* ths, SyncAppendEntriesReply* pMsg); + int32_t (*FpOnTimeout)(SSyncNode* pSyncNode, SyncTimeout* pMsg); } SSyncNode; @@ -178,8 +187,24 @@ void syncNodeClose(SSyncNode* pSyncNode); void syncNodePingAll(SSyncNode* pSyncNode); void syncNodePingPeers(SSyncNode* pSyncNode); void syncNodePingSelf(SSyncNode* pSyncNode); -int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); -int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); + +int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode); +int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode); + +int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode); +int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode); +int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms); + +int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode); +int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode); +int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms); + +void syncNodeBecomeFollower(SSyncNode* pSyncNode); +void syncNodeBecomeLeader(SSyncNode* pSyncNode); +void syncNodeFollower2Candidate(SSyncNode* pSyncNode); +void syncNodeCandidate2Leader(SSyncNode* pSyncNode); +void syncNodeLeader2Follower(SSyncNode* pSyncNode); +void syncNodeCandidate2Follower(SSyncNode* pSyncNode); #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncMessage.h b/source/libs/sync/inc/syncMessage.h index 3057e23bc23d5780161da5afcf8816a218ab9948..b022044528ccd241b53b6e1d37c629b843ce412e 100644 --- a/source/libs/sync/inc/syncMessage.h +++ b/source/libs/sync/inc/syncMessage.h @@ -30,6 +30,8 @@ extern "C" { // encode as uint32 typedef enum ESyncMessageType { + SYNC_UNKNOWN = 77, + SYNC_TIMEOUT = 99, SYNC_PING = 101, SYNC_PING_REPLY = 103, SYNC_CLIENT_REQUEST = 105, @@ -38,8 +40,37 @@ typedef enum ESyncMessageType { SYNC_REQUEST_VOTE_REPLY = 111, SYNC_APPEND_ENTRIES = 113, SYNC_APPEND_ENTRIES_REPLY = 115, + } ESyncMessageType; +// --------------------------------------------- +cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg); +cJSON* syncRpcUnknownMsg2Json(); + +// --------------------------------------------- +typedef enum ESyncTimeoutType { + SYNC_TIMEOUT_PING = 100, + SYNC_TIMEOUT_ELECTION, + SYNC_TIMEOUT_HEARTBEAT, + +} ESyncTimeoutType; + +typedef struct SyncTimeout { + uint32_t bytes; + uint32_t msgType; + ESyncTimeoutType timeoutType; + void* data; +} SyncTimeout; + +SyncTimeout* syncTimeoutBuild(); +void syncTimeoutDestroy(SyncTimeout* pMsg); +void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen); +void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg); +void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg); +void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg); +cJSON* syncTimeout2Json(const SyncTimeout* pMsg); +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data); + // --------------------------------------------- typedef struct SyncPing { uint32_t bytes; diff --git a/source/libs/sync/inc/syncReplication.h b/source/libs/sync/inc/syncReplication.h index 7f97ae9e49cb516dbbc04f082a4068165208bd6b..a9875d5caeb6fcfac97de9b18e06d5df1c5fa2fd 100644 --- a/source/libs/sync/inc/syncReplication.h +++ b/source/libs/sync/inc/syncReplication.h @@ -26,6 +26,8 @@ extern "C" { #include "syncInt.h" #include "taosdef.h" +void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/inc/syncUtil.h b/source/libs/sync/inc/syncUtil.h index 93d2c125254766ccfdd881a8afafd303b45f91fb..e1078d57387234e9be84cb9e30707dea4f6f7423 100644 --- a/source/libs/sync/inc/syncUtil.h +++ b/source/libs/sync/inc/syncUtil.h @@ -39,8 +39,9 @@ void syncUtilraftId2EpSet(const SRaftId* raftId, SEpSet* pEpSet); void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaftId* raftId); +bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2); + // ---- SSyncBuffer ---- -#if 0 void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len); void syncUtilbufDestroy(SSyncBuffer* syncBuf); @@ -48,7 +49,6 @@ void syncUtilbufDestroy(SSyncBuffer* syncBuf); void syncUtilbufCopy(const SSyncBuffer* src, SSyncBuffer* dest); void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest); -#endif #ifdef __cplusplus } diff --git a/source/libs/sync/inc/syncVoteMgr.h b/source/libs/sync/inc/syncVoteMgr.h index b841f2e3163816759d10d2a41454232f326b0fbb..e2307e9e665a7525369d96027dc98a2d8014128f 100644 --- a/source/libs/sync/inc/syncVoteMgr.h +++ b/source/libs/sync/inc/syncVoteMgr.h @@ -24,13 +24,36 @@ extern "C" { #include #include #include "syncInt.h" +#include "syncMessage.h" +#include "syncUtil.h" #include "taosdef.h" typedef struct SVotesGranted { + SyncTerm term; + int32_t quorum; + int32_t votes; + bool toLeader; + SSyncNode *pSyncNode; } SVotesGranted; -typedef struct SVotesResponded { -} SVotesResponded; +SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode); +void voteGrantedDestroy(SVotesGranted *pVotesGranted); +bool voteGrantedMajority(SVotesGranted *pVotesGranted); +void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg); +void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term); + +typedef struct SVotesRespond { + SRaftId (*replicas)[TSDB_MAX_REPLICA]; + bool isRespond[TSDB_MAX_REPLICA]; + int32_t replicaNum; + SyncTerm term; + SSyncNode *pSyncNode; +} SVotesRespond; + +SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode); +bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId); +void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg); +void Reset(SVotesRespond *pVotesRespond, SyncTerm term); #ifdef __cplusplus } diff --git a/source/libs/sync/src/syncElection.c b/source/libs/sync/src/syncElection.c index 329105e2a1214aba3cfb1bce2850f292f85811b8..433201b849aae039b9e9557054b41e3d7b853fec 100644 --- a/source/libs/sync/src/syncElection.c +++ b/source/libs/sync/src/syncElection.c @@ -14,3 +14,7 @@ */ #include "syncElection.h" + +void syncNodeElect(SSyncNode* pSyncNode) {} + +void syncNodeRequestVotePeers(SSyncNode* pSyncNode) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncIO.c b/source/libs/sync/src/syncIO.c index 757718282a507e68c5772642ed6043c4188ae7a5..25775bec7634a4ec93317e3a6f038b6fc3a3c735 100644 --- a/source/libs/sync/src/syncIO.c +++ b/source/libs/sync/src/syncIO.c @@ -40,17 +40,6 @@ static void syncIOTickPingFunc(void *param, void *tmrId); // ---------------------------- // public function ------------ -int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { - sTrace( - "<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, " - "pMsg->msgType:%d, pMsg->contLen:%d", - clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle, - pMsg->msgType, pMsg->contLen); - pMsg->handle = NULL; - rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); - return 0; -} - int32_t syncIOStart(char *host, uint16_t port) { gSyncIO = syncIOCreate(host, port); assert(gSyncIO != NULL); @@ -83,6 +72,35 @@ int32_t syncIOTickPing() { return ret; } +int32_t syncIOSendMsg(void *clientRpc, const SEpSet *pEpSet, SRpcMsg *pMsg) { + sTrace( + "<--- syncIOSendMsg ---> clientRpc:%p, numOfEps:%d, inUse:%d, destAddr:%s-%u, pMsg->ahandle:%p, pMsg->handle:%p, " + "pMsg->msgType:%d, pMsg->contLen:%d", + clientRpc, pEpSet->numOfEps, pEpSet->inUse, pEpSet->eps[0].fqdn, pEpSet->eps[0].port, pMsg->ahandle, pMsg->handle, + pMsg->msgType, pMsg->contLen); + { + cJSON *pJson = syncRpcMsg2Json(pMsg); + char *serialized = cJSON_Print(pJson); + sTrace("process syncMessage send: pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + pMsg->handle = NULL; + rpcSendRequest(clientRpc, pEpSet, pMsg, NULL); + return 0; +} + +int32_t syncIOEqMsg(void *queue, SRpcMsg *pMsg) { + SRpcMsg *pTemp; + pTemp = taosAllocateQitem(sizeof(SRpcMsg)); + memcpy(pTemp, pMsg, sizeof(SRpcMsg)); + + STaosQueue *pMsgQ = queue; + taosWriteQitem(pMsgQ, pTemp); + + return 0; +} + // local function ------------ static int32_t syncIOStartInternal(SSyncIO *io) { taosBlockSIGPIPE(); @@ -193,7 +211,7 @@ static void *syncIOConsumerFunc(void *param) { SSyncIO *io = param; STaosQall *qall; - SRpcMsg * pRpcMsg, rpcMsg; + SRpcMsg *pRpcMsg, rpcMsg; int type; qall = taosAllocateQall(); @@ -215,6 +233,7 @@ static void *syncIOConsumerFunc(void *param) { syncPingFromRpcMsg(pRpcMsg, pSyncMsg); // memcpy(pSyncMsg, tmpRpcMsg.pCont, tmpRpcMsg.contLen); io->FpOnSyncPing(io->pSyncNode, pSyncMsg); + syncPingDestroy(pSyncMsg); } } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { @@ -223,6 +242,16 @@ static void *syncIOConsumerFunc(void *param) { pSyncMsg = syncPingReplyBuild(pRpcMsg->contLen); syncPingReplyFromRpcMsg(pRpcMsg, pSyncMsg); io->FpOnSyncPingReply(io->pSyncNode, pSyncMsg); + syncPingReplyDestroy(pSyncMsg); + } + + } else if (pRpcMsg->msgType == SYNC_TIMEOUT) { + if (io->FpOnSyncTimeout != NULL) { + SyncTimeout *pSyncMsg; + pSyncMsg = syncTimeoutBuild(); + syncTimeoutFromRpcMsg(pRpcMsg, pSyncMsg); + io->FpOnSyncTimeout(io->pSyncNode, pSyncMsg); + syncTimeoutDestroy(pSyncMsg); } } else { ; diff --git a/source/libs/sync/src/syncMain.c b/source/libs/sync/src/syncMain.c index 7e01e7e81c2251ef603d29c07734518a53d1467f..6a0663dd579cb14dbabf20fcaa1bdce89ddeda2f 100644 --- a/source/libs/sync/src/syncMain.c +++ b/source/libs/sync/src/syncMain.c @@ -25,7 +25,10 @@ static int32_t tsNodeRefId = -1; // ------ local funciton --------- static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg); static int32_t syncNodeSendMsgByInfo(const SNodeInfo* nodeInfo, SSyncNode* pSyncNode, SRpcMsg* pMsg); -static void syncNodePingTimerCb(void* param, void* tmrId); + +static void syncNodeEqPingTimer(void* param, void* tmrId); +static void syncNodeEqElectTimer(void* param, void* tmrId); +static void syncNodeEqHeartbeatTimer(void* param, void* tmrId); static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg); static int32_t syncNodeRequestVote(SSyncNode* ths, const SyncRequestVote* pMsg); @@ -37,6 +40,7 @@ static int32_t syncNodeOnRequestVoteCb(SSyncNode* ths, SyncRequestVote* pMsg); static int32_t syncNodeOnRequestVoteReplyCb(SSyncNode* ths, SyncRequestVoteReply* pMsg); static int32_t syncNodeOnAppendEntriesCb(SSyncNode* ths, SyncAppendEntries* pMsg); static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesReply* pMsg); +static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg); // --------------------------------- int32_t syncInit() { @@ -56,8 +60,6 @@ void syncStop(int64_t rid) {} int32_t syncReconfig(int64_t rid, const SSyncCfg* pSyncCfg) { return 0; } -// int32_t syncForwardToPeer(int64_t rid, const SSyncBuffer* pBuf, bool isWeak) { return 0; } - int32_t syncForwardToPeer(int64_t rid, const SRpcMsg* pBuf, bool isWeak) { return 0; } ESyncState syncGetMyRole(int64_t rid) { return TAOS_SYNC_STATE_LEADER; } @@ -76,6 +78,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->rpcClient = pSyncInfo->rpcClient; pSyncNode->FpSendMsg = pSyncInfo->FpSendMsg; + pSyncNode->queue = pSyncInfo->queue; + pSyncNode->FpEqMsg = pSyncInfo->FpEqMsg; pSyncNode->me = pSyncInfo->syncCfg.nodeInfo[pSyncInfo->syncCfg.myIndex]; pSyncNode->peersNum = pSyncInfo->syncCfg.replicaNum - 1; @@ -93,8 +97,8 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->pPingTimer = NULL; pSyncNode->pingTimerMS = 1000; - atomic_store_8(&pSyncNode->pingTimerStart, 0); - pSyncNode->FpPingTimer = syncNodePingTimerCb; + atomic_store_8(&pSyncNode->pingTimerEnable, 0); + pSyncNode->FpPingTimer = syncNodeEqPingTimer; pSyncNode->pingTimerCounter = 0; pSyncNode->FpOnPing = syncNodeOnPingCb; @@ -103,6 +107,7 @@ SSyncNode* syncNodeOpen(const SSyncInfo* pSyncInfo) { pSyncNode->FpOnRequestVoteReply = syncNodeOnRequestVoteReplyCb; pSyncNode->FpOnAppendEntries = syncNodeOnAppendEntriesCb; pSyncNode->FpOnAppendEntriesReply = syncNodeOnAppendEntriesReplyCb; + pSyncNode->FpOnTimeout = syncNodeOnTimeoutCb; return pSyncNode; } @@ -148,22 +153,76 @@ void syncNodePingSelf(SSyncNode* pSyncNode) { int32_t syncNodeStartPingTimer(SSyncNode* pSyncNode) { if (pSyncNode->pPingTimer == NULL) { pSyncNode->pPingTimer = - taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager); + taosTmrStart(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager); } else { - taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerCounter, pSyncNode, gSyncEnv->pTimerManager, + taosTmrReset(pSyncNode->FpPingTimer, pSyncNode->pingTimerMS, pSyncNode, gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } - atomic_store_8(&pSyncNode->pingTimerStart, 1); + atomic_store_8(&pSyncNode->pingTimerEnable, 1); return 0; } int32_t syncNodeStopPingTimer(SSyncNode* pSyncNode) { - atomic_store_8(&pSyncNode->pingTimerStart, 0); - pSyncNode->pingTimerCounter = TIMER_MAX_MS; + atomic_store_8(&pSyncNode->pingTimerEnable, 0); + pSyncNode->pingTimerMS = TIMER_MAX_MS; + return 0; +} + +int32_t syncNodeStartElectTimer(SSyncNode* pSyncNode) { + if (pSyncNode->pElectTimer == NULL) { + pSyncNode->pElectTimer = + taosTmrStart(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager); + } else { + taosTmrReset(pSyncNode->FpElectTimer, pSyncNode->electTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pElectTimer); + } + + atomic_store_8(&pSyncNode->electTimerEnable, 1); + return 0; +} + +int32_t syncNodeStopElectTimer(SSyncNode* pSyncNode) { + atomic_store_8(&pSyncNode->electTimerEnable, 0); + pSyncNode->electTimerMS = TIMER_MAX_MS; + return 0; +} + +int32_t syncNodeResetElectTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; } + +int32_t syncNodeStartHeartbeatTimer(SSyncNode* pSyncNode) { + if (pSyncNode->pHeartbeatTimer == NULL) { + pSyncNode->pHeartbeatTimer = + taosTmrStart(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager); + } else { + taosTmrReset(pSyncNode->FpHeartbeatTimer, pSyncNode->heartbeatTimerMS, pSyncNode, gSyncEnv->pTimerManager, + &pSyncNode->pHeartbeatTimer); + } + + atomic_store_8(&pSyncNode->heartbeatTimerEnable, 1); return 0; } +int32_t syncNodeStopHeartbeatTimer(SSyncNode* pSyncNode) { + atomic_store_8(&pSyncNode->heartbeatTimerEnable, 0); + pSyncNode->heartbeatTimerMS = TIMER_MAX_MS; + return 0; +} + +int32_t syncNodeResetHeartbeatTimer(SSyncNode* pSyncNode, int32_t ms) { return 0; } + +void syncNodeBecomeFollower(SSyncNode* pSyncNode) {} + +void syncNodeBecomeLeader(SSyncNode* pSyncNode) {} + +void syncNodeFollower2Candidate(SSyncNode* pSyncNode) {} + +void syncNodeCandidate2Leader(SSyncNode* pSyncNode) {} + +void syncNodeLeader2Follower(SSyncNode* pSyncNode) {} + +void syncNodeCandidate2Follower(SSyncNode* pSyncNode) {} + // ------ local funciton --------- static int32_t syncNodePing(SSyncNode* pSyncNode, const SRaftId* destRaftId, SyncPing* pMsg) { sTrace("syncNodePing pSyncNode:%p ", pSyncNode); @@ -204,7 +263,6 @@ static int32_t syncNodeAppendEntries(SSyncNode* ths, const SyncAppendEntries* pM } static int32_t syncNodeSendMsgById(const SRaftId* destRaftId, SSyncNode* pSyncNode, SRpcMsg* pMsg) { - sTrace("syncNodeSendMsgById pSyncNode:%p ", pSyncNode); SEpSet epSet; syncUtilraftId2EpSet(destRaftId, &epSet); pSyncNode->FpSendMsg(pSyncNode->rpcClient, &epSet, pMsg); @@ -225,7 +283,7 @@ static int32_t syncNodeOnPingCb(SSyncNode* ths, SyncPing* pMsg) { { cJSON* pJson = syncPing2Json(pMsg); char* serialized = cJSON_Print(pJson); - sTrace("syncNodeOnPingCb syncNodePing pMsg:%s ", serialized); + sTrace("process syncMessage recv: syncNodeOnPingCb pMsg:%s ", serialized); free(serialized); cJSON_Delete(pJson); } @@ -245,7 +303,7 @@ static int32_t syncNodeOnPingReplyCb(SSyncNode* ths, SyncPingReply* pMsg) { { cJSON* pJson = syncPingReply2Json(pMsg); char* serialized = cJSON_Print(pJson); - sTrace("syncNodeOnPingReplyCb syncNodePing pMsg:%s ", serialized); + sTrace("process syncMessage recv: syncNodeOnPingReplyCb pMsg:%s ", serialized); free(serialized); cJSON_Delete(pJson); } @@ -273,22 +331,50 @@ static int32_t syncNodeOnAppendEntriesReplyCb(SSyncNode* ths, SyncAppendEntriesR return ret; } -static void syncNodePingTimerCb(void* param, void* tmrId) { +static int32_t syncNodeOnTimeoutCb(SSyncNode* ths, SyncTimeout* pMsg) { + int32_t ret = 0; + sTrace("<-- syncNodeOnTimeoutCb -->"); + + { + cJSON* pJson = syncTimeout2Json(pMsg); + char* serialized = cJSON_Print(pJson); + sTrace("process syncMessage recv: syncNodeOnTimeoutCb pMsg:%s ", serialized); + free(serialized); + cJSON_Delete(pJson); + } + + if (pMsg->timeoutType == SYNC_TIMEOUT_PING) { + if (atomic_load_8(&ths->pingTimerEnable)) { + ++(ths->pingTimerCounter); + syncNodePingAll(ths); + } + + } else if (pMsg->timeoutType == SYNC_TIMEOUT_ELECTION) { + } else if (pMsg->timeoutType == SYNC_TIMEOUT_HEARTBEAT) { + } else { + } + + return ret; +} + +static void syncNodeEqPingTimer(void* param, void* tmrId) { SSyncNode* pSyncNode = (SSyncNode*)param; - if (atomic_load_8(&pSyncNode->pingTimerStart)) { - ++(pSyncNode->pingTimerCounter); + if (atomic_load_8(&pSyncNode->pingTimerEnable)) { // pSyncNode->pingTimerMS += 100; - sTrace( - "syncNodePingTimerCb: pSyncNode->pingTimerCounter:%lu, pSyncNode->pingTimerMS:%d, pSyncNode->pPingTimer:%p, " - "tmrId:%p ", - pSyncNode->pingTimerCounter, pSyncNode->pingTimerMS, pSyncNode->pPingTimer, tmrId); - - syncNodePingAll(pSyncNode); + SyncTimeout* pSyncMsg = syncTimeoutBuild2(SYNC_TIMEOUT_PING, pSyncNode); + SRpcMsg rpcMsg; + syncTimeout2RpcMsg(pSyncMsg, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + syncTimeoutDestroy(pSyncMsg); - taosTmrReset(syncNodePingTimerCb, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, + taosTmrReset(syncNodeEqPingTimer, pSyncNode->pingTimerMS, pSyncNode, &gSyncEnv->pTimerManager, &pSyncNode->pPingTimer); } else { - sTrace("syncNodePingTimerCb: pingTimerStart:%u ", pSyncNode->pingTimerStart); + sTrace("syncNodeEqPingTimer: pingTimerEnable:%u ", pSyncNode->pingTimerEnable); } -} \ No newline at end of file +} + +static void syncNodeEqElectTimer(void* param, void* tmrId) {} + +static void syncNodeEqHeartbeatTimer(void* param, void* tmrId) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncMessage.c b/source/libs/sync/src/syncMessage.c index 4c44b4691c5b439b5f7308be3c4103364d15c46a..f1434947a1132161477ac9991d5ea965ed380372 100644 --- a/source/libs/sync/src/syncMessage.c +++ b/source/libs/sync/src/syncMessage.c @@ -20,6 +20,124 @@ void onMessage(SRaft* pRaft, void* pMsg) {} +// --------------------------------------------- +cJSON* syncRpcMsg2Json(SRpcMsg* pRpcMsg) { + cJSON* pRoot; + + // in compiler optimization, switch case = if else constants + if (pRpcMsg->msgType == SYNC_TIMEOUT) { + SyncTimeout* pSyncMsg = (SyncTimeout*)pRpcMsg->pCont; + pRoot = syncTimeout2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_PING) { + SyncPing* pSyncMsg = (SyncPing*)pRpcMsg->pCont; + pRoot = syncPing2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_PING_REPLY) { + SyncPingReply* pSyncMsg = (SyncPingReply*)pRpcMsg->pCont; + pRoot = syncPingReply2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST) { + pRoot = syncRpcUnknownMsg2Json(); + + } else if (pRpcMsg->msgType == SYNC_CLIENT_REQUEST_REPLY) { + pRoot = syncRpcUnknownMsg2Json(); + + } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE) { + SyncRequestVote* pSyncMsg = (SyncRequestVote*)pRpcMsg->pCont; + pRoot = syncRequestVote2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_REQUEST_VOTE_REPLY) { + SyncRequestVoteReply* pSyncMsg = (SyncRequestVoteReply*)pRpcMsg->pCont; + pRoot = syncRequestVoteReply2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES) { + SyncAppendEntries* pSyncMsg = (SyncAppendEntries*)pRpcMsg->pCont; + pRoot = syncAppendEntries2Json(pSyncMsg); + + } else if (pRpcMsg->msgType == SYNC_APPEND_ENTRIES_REPLY) { + SyncAppendEntriesReply* pSyncMsg = (SyncAppendEntriesReply*)pRpcMsg->pCont; + pRoot = syncAppendEntriesReply2Json(pSyncMsg); + + } else { + pRoot = syncRpcUnknownMsg2Json(); + } + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "RpcMsg", pRoot); + return pJson; +} + +cJSON* syncRpcUnknownMsg2Json() { + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "msgType", SYNC_UNKNOWN); + cJSON_AddStringToObject(pRoot, "data", "known message"); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncPing", pRoot); + return pJson; +} + +// ---- message process SyncTimeout---- +SyncTimeout* syncTimeoutBuild() { + uint32_t bytes = sizeof(SyncTimeout); + SyncTimeout* pMsg = malloc(bytes); + memset(pMsg, 0, bytes); + pMsg->bytes = bytes; + pMsg->msgType = SYNC_TIMEOUT; + return pMsg; +} + +void syncTimeoutDestroy(SyncTimeout* pMsg) { + if (pMsg != NULL) { + free(pMsg); + } +} + +void syncTimeoutSerialize(const SyncTimeout* pMsg, char* buf, uint32_t bufLen) { + assert(pMsg->bytes <= bufLen); + memcpy(buf, pMsg, pMsg->bytes); +} + +void syncTimeoutDeserialize(const char* buf, uint32_t len, SyncTimeout* pMsg) { + memcpy(pMsg, buf, len); + assert(len == pMsg->bytes); +} + +void syncTimeout2RpcMsg(const SyncTimeout* pMsg, SRpcMsg* pRpcMsg) { + memset(pRpcMsg, 0, sizeof(*pRpcMsg)); + pRpcMsg->msgType = pMsg->msgType; + pRpcMsg->contLen = pMsg->bytes; + pRpcMsg->pCont = rpcMallocCont(pRpcMsg->contLen); + syncTimeoutSerialize(pMsg, pRpcMsg->pCont, pRpcMsg->contLen); +} + +void syncTimeoutFromRpcMsg(const SRpcMsg* pRpcMsg, SyncTimeout* pMsg) { + syncTimeoutDeserialize(pRpcMsg->pCont, pRpcMsg->contLen, pMsg); +} + +cJSON* syncTimeout2Json(const SyncTimeout* pMsg) { + char u64buf[128]; + + cJSON* pRoot = cJSON_CreateObject(); + cJSON_AddNumberToObject(pRoot, "bytes", pMsg->bytes); + cJSON_AddNumberToObject(pRoot, "msgType", pMsg->msgType); + cJSON_AddNumberToObject(pRoot, "timeoutType", pMsg->timeoutType); + snprintf(u64buf, sizeof(u64buf), "%p", pMsg->data); + cJSON_AddStringToObject(pRoot, "data", u64buf); + + cJSON* pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SyncTimeout", pRoot); + return pJson; +} + +SyncTimeout* syncTimeoutBuild2(ESyncTimeoutType timeoutType, void* data) { + SyncTimeout* pMsg = syncTimeoutBuild(); + pMsg->timeoutType = timeoutType; + pMsg->data = data; + return pMsg; +} + // ---- message process SyncPing---- SyncPing* syncPingBuild(uint32_t dataLen) { uint32_t bytes = SYNC_PING_FIX_LEN + dataLen; diff --git a/source/libs/sync/src/syncReplication.c b/source/libs/sync/src/syncReplication.c index 4cea7c150e19b490910adf771cfca050c03aea22..e3e551fd2b2d3c5f2338e7e84345f8a618fec738 100644 --- a/source/libs/sync/src/syncReplication.c +++ b/source/libs/sync/src/syncReplication.c @@ -14,3 +14,5 @@ */ #include "syncReplication.h" + +void syncNodeAppendEntriesPeers(SSyncNode* pSyncNode) {} \ No newline at end of file diff --git a/source/libs/sync/src/syncUtil.c b/source/libs/sync/src/syncUtil.c index b4959a810b5b730bfebebd48ae34bda88c112fe0..7b4d6ee366d8a327c7706899cd2525c5fbd93feb 100644 --- a/source/libs/sync/src/syncUtil.c +++ b/source/libs/sync/src/syncUtil.c @@ -68,8 +68,12 @@ void syncUtilnodeInfo2raftId(const SNodeInfo* pNodeInfo, SyncGroupId vgId, SRaft raftId->vgId = vgId; } +bool syncUtilSameId(const SRaftId* pId1, const SRaftId* pId2) { + bool ret = pId1->addr == pId2->addr && pId1->vgId == pId2->vgId; + return ret; +} + // ---- SSyncBuffer ----- -#if 0 void syncUtilbufBuild(SSyncBuffer* syncBuf, size_t len) { syncBuf->len = len; syncBuf->data = malloc(syncBuf->len); @@ -87,4 +91,3 @@ void syncUtilbufCopyDeep(const SSyncBuffer* src, SSyncBuffer* dest) { dest->data = malloc(dest->len); memcpy(dest->data, src->data, dest->len); } -#endif \ No newline at end of file diff --git a/source/libs/sync/src/syncVoteMgr.c b/source/libs/sync/src/syncVoteMgr.c index 02cf4ac03365eb8cedb49eff0b1ba1de08f4699f..c9f0ceab57a62116f4a786bdba3c386a28dfbd3b 100644 --- a/source/libs/sync/src/syncVoteMgr.c +++ b/source/libs/sync/src/syncVoteMgr.c @@ -14,3 +14,83 @@ */ #include "syncVoteMgr.h" + +SVotesGranted *voteGrantedCreate(SSyncNode *pSyncNode) { + SVotesGranted *pVotesGranted = malloc(sizeof(SVotesGranted)); + assert(pVotesGranted != NULL); + memset(pVotesGranted, 0, sizeof(SVotesGranted)); + + pVotesGranted->quorum = pSyncNode->quorum; + pVotesGranted->term = 0; + pVotesGranted->votes = 0; + pVotesGranted->toLeader = false; + pVotesGranted->pSyncNode = pSyncNode; + + return pVotesGranted; +} + +void voteGrantedDestroy(SVotesGranted *pVotesGranted) { + if (pVotesGranted != NULL) { + free(pVotesGranted); + } +} + +bool voteGrantedMajority(SVotesGranted *pVotesGranted) { + bool ret = pVotesGranted->votes >= pVotesGranted->quorum; + return ret; +} + +void voteGrantedVote(SVotesGranted *pVotesGranted, SyncRequestVoteReply *pMsg) { + assert(pMsg->voteGranted == true); + assert(pMsg->term == pVotesGranted->term); + pVotesGranted->votes++; +} + +void voteGrantedReset(SVotesGranted *pVotesGranted, SyncTerm term) { + pVotesGranted->term = term; + pVotesGranted->votes = 0; + pVotesGranted->toLeader = false; +} + +SVotesRespond *votesRespondCreate(SSyncNode *pSyncNode) { + SVotesRespond *pVotesRespond = malloc(sizeof(SVotesRespond)); + assert(pVotesRespond != NULL); + memset(pVotesRespond, 0, sizeof(SVotesRespond)); + + pVotesRespond->replicas = &(pSyncNode->replicasId); + pVotesRespond->replicaNum = pSyncNode->replicaNum; + pVotesRespond->term = 0; + pVotesRespond->pSyncNode = pSyncNode; + + return pVotesRespond; +} + +bool votesResponded(SVotesRespond *pVotesRespond, const SRaftId *pRaftId) { + bool ret = false; + for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + if (syncUtilSameId(&(*pVotesRespond->replicas)[i], pRaftId) && pVotesRespond->isRespond[i]) { + ret = true; + break; + } + } + return ret; +} + +void votesRespondAdd(SVotesRespond *pVotesRespond, const SyncRequestVoteReply *pMsg) { + assert(pVotesRespond->term == pMsg->term); + for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + if (syncUtilSameId(&(*pVotesRespond->replicas)[i], &pMsg->srcId)) { + assert(pVotesRespond->isRespond[i] == false); + pVotesRespond->isRespond[i] = true; + return; + } + } + assert(0); +} + +void Reset(SVotesRespond *pVotesRespond, SyncTerm term) { + pVotesRespond->term = term; + for (int i = 0; i < pVotesRespond->replicaNum; ++i) { + pVotesRespond->isRespond[i] = false; + } +} \ No newline at end of file diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index 2913d230b2023d089b2a9166c44866870b93428c..4c5f7ffa56a42a42e807eec800ae723918076992 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -8,6 +8,7 @@ add_executable(syncIOSendMsgTest "") add_executable(syncIOSendMsgClientTest "") add_executable(syncIOSendMsgServerTest "") add_executable(syncRaftStoreTest "") +add_executable(syncEnqTest "") target_sources(syncTest @@ -50,6 +51,10 @@ target_sources(syncRaftStoreTest PRIVATE "syncRaftStoreTest.cpp" ) +target_sources(syncEnqTest + PRIVATE + "syncEnqTest.cpp" +) target_include_directories(syncTest @@ -102,6 +107,11 @@ target_include_directories(syncRaftStoreTest "${CMAKE_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncEnqTest + PUBLIC + "${CMAKE_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -144,6 +154,10 @@ target_link_libraries(syncRaftStoreTest sync gtest_main ) +target_link_libraries(syncEnqTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncEnqTest.cpp b/source/libs/sync/test/syncEnqTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..0bf43f933ee85610b6280c88f6120c2e8677e221 --- /dev/null +++ b/source/libs/sync/test/syncEnqTest.cpp @@ -0,0 +1,99 @@ +#include +#include "syncEnv.h" +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftStore.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +uint16_t ports[3] = {7010, 7110, 7210}; + +SSyncNode* doSync(int myIndex) { + SSyncFSM* pFsm; + + SSyncInfo syncInfo; + syncInfo.vgId = 1; + syncInfo.rpcClient = gSyncIO->clientRpc; + syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; + syncInfo.pFsm = pFsm; + snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping"); + + SSyncCfg* pCfg = &syncInfo.syncCfg; + pCfg->myIndex = myIndex; + pCfg->replicaNum = 3; + + pCfg->nodeInfo[0].nodePort = ports[0]; + snprintf(pCfg->nodeInfo[0].nodeFqdn, sizeof(pCfg->nodeInfo[0].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn); + + pCfg->nodeInfo[1].nodePort = ports[1]; + snprintf(pCfg->nodeInfo[1].nodeFqdn, sizeof(pCfg->nodeInfo[1].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[1].nodeFqdn); + + pCfg->nodeInfo[2].nodePort = ports[2]; + snprintf(pCfg->nodeInfo[2].nodeFqdn, sizeof(pCfg->nodeInfo[2].nodeFqdn), "%s", "127.0.0.1"); + // taosGetFqdn(pCfg->nodeInfo[2].nodeFqdn); + + SSyncNode* pSyncNode = syncNodeOpen(&syncInfo); + assert(pSyncNode != NULL); + + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->pSyncNode = pSyncNode; + + return pSyncNode; +} + +void timerPingAll(void* param, void* tmrId) { + SSyncNode* pSyncNode = (SSyncNode*)param; + syncNodePingAll(pSyncNode); +} + +int main(int argc, char** argv) { + // taosInitLog((char*)"syncPingTest.log", 100000, 10); + tsAsyncLog = 0; + sDebugFlag = 143 + 64; + + logTest(); + + int myIndex = 0; + if (argc >= 2) { + myIndex = atoi(argv[1]); + if (myIndex > 2 || myIndex < 0) { + fprintf(stderr, "myIndex:%d error. should be 0 - 2", myIndex); + return 1; + } + } + + int32_t ret = syncIOStart((char*)"127.0.0.1", ports[myIndex]); + assert(ret == 0); + + ret = syncEnvStart(); + assert(ret == 0); + + SSyncNode* pSyncNode = doSync(myIndex); + gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; + gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + + for (int i = 0; i < 10; ++i) { + SyncPingReply* pSyncMsg = syncPingReplyBuild3(&pSyncNode->raftId, &pSyncNode->raftId); + SRpcMsg rpcMsg; + syncPingReply2RpcMsg(pSyncMsg, &rpcMsg); + pSyncNode->FpEqMsg(pSyncNode->queue, &rpcMsg); + taosMsleep(1000); + } + + while (1) { + taosMsleep(1000); + } + + return 0; +} diff --git a/source/libs/sync/test/syncPingTest.cpp b/source/libs/sync/test/syncPingTest.cpp index 82681283477f23623aa37113487d41a9dbaad4be..77413a713b13547e33c6177195c72ea687687d71 100644 --- a/source/libs/sync/test/syncPingTest.cpp +++ b/source/libs/sync/test/syncPingTest.cpp @@ -22,6 +22,8 @@ SSyncNode* doSync(int myIndex) { syncInfo.vgId = 1; syncInfo.rpcClient = gSyncIO->clientRpc; syncInfo.FpSendMsg = syncIOSendMsg; + syncInfo.queue = gSyncIO->pMsgQ; + syncInfo.FpEqMsg = syncIOEqMsg; syncInfo.pFsm = pFsm; snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", "./test_sync_ping"); @@ -76,15 +78,14 @@ int main(int argc, char** argv) { SSyncNode* pSyncNode = doSync(myIndex); gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing; gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply; + gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout; ret = syncNodeStartPingTimer(pSyncNode); assert(ret == 0); - /* - taosMsleep(10000); - ret = syncNodeStopPingTimer(pSyncNode); - assert(ret == 0); - */ + taosMsleep(10000); + ret = syncNodeStopPingTimer(pSyncNode); + assert(ret == 0); while (1) { taosMsleep(1000); diff --git a/source/os/src/osSysinfo.c b/source/os/src/osSysinfo.c index 12952141a4f58f5fd7bb5b6de05d9fd6c51f295e..8f2820f67a84a531bc34fc9d4863a1982028cafe 100644 --- a/source/os/src/osSysinfo.c +++ b/source/os/src/osSysinfo.c @@ -87,7 +87,7 @@ int32_t taosGetCpuCores(float *numOfCores) { return 0; } -int32_t taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) { +int32_t taosGetCpuUsage(double *sysCpuUsage, double *procCpuUsage) { *sysCpuUsage = 0; *procCpuUsage = 0; return 0; @@ -112,64 +112,32 @@ int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) { } } -int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) { - if (bytes) *bytes = 0; - if (rbytes) *rbytes = 0; - if (tbytes) *tbytes = 0; +int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) { + *receive_bytes = 0; + *transmit_bytes = 0; return 0; } -int32_t taosGetBandSpeed(float *bandSpeedKb) { - *bandSpeedKb = 0; - return 0; -} - -int32_t taosReadProcIO(int64_t *readbyte, int64_t *writebyte) { +int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { IO_COUNTERS io_counter; if (GetProcessIoCounters(GetCurrentProcess(), &io_counter)) { - if (readbyte) *readbyte = io_counter.ReadTransferCount; - if (writebyte) *writebyte = io_counter.WriteTransferCount; + if (rchars) *rchars = io_counter.ReadTransferCount; + if (wchars) *wchars = io_counter.WriteTransferCount; + if (read_bytes) *read_bytes = 0; + if (write_bytes) *write_bytes = 0; return 0; } return -1; } -int32_t taosGetProcIO(float *readKB, float *writeKB) { - static int64_t lastReadbyte = -1; - static int64_t lastWritebyte = -1; - - int64_t curReadbyte = 0; - int64_t curWritebyte = 0; - - if (taosReadProcIO(&curReadbyte, &curWritebyte) != 0) { - return -1; - } - - if (lastReadbyte == -1 || lastWritebyte == -1) { - lastReadbyte = curReadbyte; - lastWritebyte = curWritebyte; - return -1; - } - - *readKB = (float)((double)(curReadbyte - lastReadbyte) / 1024); - *writeKB = (float)((double)(curWritebyte - lastWritebyte) / 1024); - if (*readKB < 0) *readKB = 0; - if (*writeKB < 0) *writeKB = 0; - - lastReadbyte = curReadbyte; - lastWritebyte = curWritebyte; - - return 0; -} - void taosGetSystemInfo() { taosGetCpuCores(&tsNumOfCores); taosGetTotalMemory(&tsTotalMemoryKB); - float tmp1, tmp2; - taosGetBandSpeed(&tmp1); + double tmp1, tmp2, tmp3, tmp4; + taosGetBandSpeed(&tmp1, &tmp2); taosGetCpuUsage(&tmp1, &tmp2); - taosGetProcIO(&tmp1, &tmp2); + taosGetIOSpeed(&tmp1, &tmp2, &tmp3, &tmp4); } void taosKillSystem() { @@ -259,31 +227,21 @@ void taosGetSystemInfo() { tsNumOfCores = sysconf(_SC_NPROCESSORS_ONLN); } -int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars) { +int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { if (rchars) *rchars = 0; if (wchars) *wchars = 0; + if (read_bytes) *read_bytes = 0; + if (write_bytes) *write_bytes = 0; return 0; } -int32_t taosGetProcIO(float *readKB, float *writeKB) { - *readKB = 0; - *writeKB = 0; +int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) { + *receive_bytes = 0; + *transmit_bytes = 0; return 0; } -int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) { - if (bytes) *bytes = 0; - if (rbytes) *rbytes = 0; - if (tbytes) *tbytes = 0; - return 0; -} - -int32_t taosGetBandSpeed(float *bandSpeedKb) { - *bandSpeedKb = 0; - return 0; -} - -int32_t taosGetCpuUsage(float *sysCpuUsage, float *procCpuUsage) { +int32_t taosGetCpuUsage(double *sysCpuUsage, double *procCpuUsage) { *sysCpuUsage = 0; *procCpuUsage = 0; return 0; @@ -400,7 +358,6 @@ int32_t taosGetSysMemory(int64_t *usedKB) { } int32_t taosGetProcMemory(int64_t *usedKB) { - // FILE *fp = fopen(tsProcMemFile, "r"); TdFilePtr pFile = taosOpenFile(tsProcMemFile, TD_FILE_READ | TD_FILE_STREAM); if (pFile == NULL) { // printf("open file:%s failed", tsProcMemFile); @@ -434,7 +391,6 @@ int32_t taosGetProcMemory(int64_t *usedKB) { } static int32_t taosGetSysCpuInfo(SysCpuInfo *cpuInfo) { - // FILE *fp = fopen(tsSysCpuFile, "r"); TdFilePtr pFile = taosOpenFile(tsSysCpuFile, TD_FILE_READ | TD_FILE_STREAM); if (pFile == NULL) { // printf("open file:%s failed", tsSysCpuFile); @@ -459,7 +415,6 @@ static int32_t taosGetSysCpuInfo(SysCpuInfo *cpuInfo) { } static int32_t taosGetProcCpuInfo(ProcCpuInfo *cpuInfo) { - // FILE *fp = fopen(tsProcCpuFile, "r"); TdFilePtr pFile = taosOpenFile(tsProcCpuFile, TD_FILE_READ | TD_FILE_STREAM); if (pFile == NULL) { // printf("open file:%s failed", tsProcCpuFile); @@ -493,7 +448,7 @@ int32_t taosGetCpuCores(float *numOfCores) { return 0; } -int32_t taosGetCpuUsage(float *cpu_system, float *cpu_engine) { +int32_t taosGetCpuUsage(double *cpu_system, double *cpu_engine) { static uint64_t lastSysUsed = 0; static uint64_t lastSysTotal = 0; static uint64_t lastProcTotal = 0; @@ -522,8 +477,8 @@ int32_t taosGetCpuUsage(float *cpu_system, float *cpu_engine) { return -1; } - *cpu_engine = (float)((double)(curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100); - *cpu_system = (float)((double)(curProcTotal - lastProcTotal) / (double)(curSysTotal - lastSysTotal) * 100); + *cpu_engine = (curSysUsed - lastSysUsed) / (double)(curSysTotal - lastSysTotal) * 100; + *cpu_system = (curProcTotal - lastProcTotal) / (double)(curSysTotal - lastSysTotal) * 100; lastSysUsed = curSysUsed; lastSysTotal = curSysTotal; @@ -544,14 +499,9 @@ int32_t taosGetDiskSize(char *dataDir, SDiskSize *diskSize) { } } -int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) { - if (bytes) *bytes = 0; - // FILE *fp = fopen(tsSysNetFile, "r"); +int32_t taosGetCardInfo(int64_t *receive_bytes, int64_t *transmit_bytes) { TdFilePtr pFile = taosOpenFile(tsSysNetFile, TD_FILE_READ | TD_FILE_STREAM); - if (pFile == NULL) { - // printf("open file:%s failed", tsSysNetFile); - return -1; - } + if (pFile == NULL) return -1; ssize_t _bytes = 0; char *line = NULL; @@ -584,9 +534,8 @@ int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) { "%s %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64 " %" PRId64, nouse0, &o_rbytes, &rpackts, &nouse1, &nouse2, &nouse3, &nouse4, &nouse5, &nouse6, &o_tbytes, &tpackets); - if (rbytes) *rbytes = o_rbytes; - if (tbytes) *tbytes = o_tbytes; - if (bytes) *bytes += (o_rbytes + o_tbytes); + *receive_bytes = o_rbytes; + *transmit_bytes = o_tbytes; } if (line != NULL) tfree(line); @@ -595,58 +544,52 @@ int32_t taosGetCardInfo(int64_t *bytes, int64_t *rbytes, int64_t *tbytes) { return 0; } -int32_t taosGetBandSpeed(float *bandSpeedKb) { - static int64_t lastBytes = 0; - static time_t lastTime = 0; - int64_t curBytes = 0; - time_t curTime = time(NULL); +int32_t taosGetBandSpeed(double *receive_bytes_per_sec, double *transmit_bytes_per_sec) { + static int64_t last_receive_bytes = 0; + static int64_t last_transmit_bytes = 0; + static int64_t last_time = 0; + int64_t cur_receive_bytes = 0; + int64_t cur_transmit_bytes = 0; + int64_t cur_time = taosGetTimestampMs(); - if (taosGetCardInfo(&curBytes, NULL, NULL) != 0) { + if (taosGetCardInfo(&cur_receive_bytes, &cur_transmit_bytes) != 0) { return -1; } - if (lastTime == 0 || lastBytes == 0) { - lastTime = curTime; - lastBytes = curBytes; - *bandSpeedKb = 0; + if (last_time == 0 || last_time >= cur_time) { + last_time = cur_time; + last_receive_bytes = cur_receive_bytes; + last_transmit_bytes = cur_transmit_bytes; + *receive_bytes_per_sec = 0; + *transmit_bytes_per_sec = 0; return 0; } - if (lastTime >= curTime || lastBytes > curBytes) { - lastTime = curTime; - lastBytes = curBytes; - *bandSpeedKb = 0; - return 0; - } + *receive_bytes_per_sec = (cur_receive_bytes - last_receive_bytes) / (double)(cur_time - last_time) * 1000; + *transmit_bytes_per_sec = (cur_transmit_bytes - last_transmit_bytes) / (double)(cur_time - last_time) * 1000; - double totalBytes = (double)(curBytes - lastBytes) / 1024 * 8; // Kb - *bandSpeedKb = (float)(totalBytes / (double)(curTime - lastTime)); + last_time = cur_time; + last_transmit_bytes = cur_transmit_bytes; + last_receive_bytes = cur_receive_bytes; - // //printf("bandwidth lastBytes:%ld, lastTime:%ld, curBytes:%ld, curTime:%ld, - // speed:%f", lastBytes, lastTime, curBytes, curTime, *bandSpeed); - - lastTime = curTime; - lastBytes = curBytes; + if (*receive_bytes_per_sec < 0) *receive_bytes_per_sec = 0; + if (*transmit_bytes_per_sec < 0) *transmit_bytes_per_sec = 0; return 0; } -int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars) { - // FILE *fp = fopen(tsProcIOFile, "r"); +int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars, int64_t *read_bytes, int64_t *write_bytes) { TdFilePtr pFile = taosOpenFile(tsProcIOFile, TD_FILE_READ | TD_FILE_STREAM); - if (pFile == NULL) { - // printf("open file:%s failed", tsProcIOFile); - return -1; - } + if (pFile == NULL) return -1; ssize_t _bytes = 0; char *line = NULL; - char tmp[10]; + char tmp[24]; int readIndex = 0; while (!taosEOFFile(pFile)) { _bytes = taosGetLineFile(pFile, &line); - if ((_bytes < 0) || (line == NULL)) { + if (_bytes < 10 || line == NULL) { break; } if (strstr(line, "rchar:") != NULL) { @@ -655,47 +598,70 @@ int32_t taosReadProcIO(int64_t *rchars, int64_t *wchars) { } else if (strstr(line, "wchar:") != NULL) { sscanf(line, "%s %" PRId64, tmp, wchars); readIndex++; + } else if (strstr(line, "read_bytes:") != NULL) { // read_bytes + sscanf(line, "%s %" PRId64, tmp, read_bytes); + readIndex++; + } else if (strstr(line, "write_bytes:") != NULL) { // write_bytes + sscanf(line, "%s %" PRId64, tmp, write_bytes); + readIndex++; } else { } - if (readIndex >= 2) break; + if (readIndex >= 4) break; } if (line != NULL) tfree(line); taosCloseFile(&pFile); - if (readIndex < 2) { - // printf("read file:%s failed", tsProcIOFile); + if (readIndex < 4) { return -1; } return 0; } -int32_t taosGetProcIO(float *readKB, float *writeKB) { - static int64_t lastReadbyte = -1; - static int64_t lastWritebyte = -1; +int32_t taosGetIOSpeed(double *rchar_per_sec, double *wchar_per_sec, double *read_bytes_per_sec, + double *write_bytes_per_sec) { + static int64_t last_rchar = -1; + static int64_t last_wchar = -1; + static int64_t last_read_bytes = -1; + static int64_t last_write_bytes = -1; + static int64_t last_time = 0; - int64_t curReadbyte = 0; - int64_t curWritebyte = 0; + int64_t cur_rchar = 0; + int64_t cur_wchar = 0; + int64_t cur_read_bytes = 0; + int64_t cur_write_bytes = 0; + int64_t cur_time = taosGetTimestampMs(); - if (taosReadProcIO(&curReadbyte, &curWritebyte) != 0) { + if (taosReadProcIO(&cur_rchar, &cur_wchar, &cur_read_bytes, &cur_write_bytes) != 0) { return -1; } - if (lastReadbyte == -1 || lastWritebyte == -1) { - lastReadbyte = curReadbyte; - lastWritebyte = curWritebyte; + if (last_time == 0 || last_time >= cur_time) { + last_time = cur_time; + last_rchar = cur_rchar; + last_wchar = cur_wchar; + last_read_bytes = cur_read_bytes; + last_write_bytes = cur_write_bytes; return -1; } - *readKB = (float)((double)(curReadbyte - lastReadbyte) / 1024); - *writeKB = (float)((double)(curWritebyte - lastWritebyte) / 1024); - if (*readKB < 0) *readKB = 0; - if (*writeKB < 0) *writeKB = 0; + *rchar_per_sec = (cur_rchar - last_rchar) / (double)(cur_time - last_time) * 1000; + *wchar_per_sec = (cur_wchar - last_wchar) / (double)(cur_time - last_time) * 1000; + *read_bytes_per_sec = (cur_read_bytes - last_read_bytes) / (double)(cur_time - last_time) * 1000; + *write_bytes_per_sec = (cur_write_bytes - last_write_bytes) / (double)(cur_time - last_time) * 1000; + + last_time = cur_time; + last_rchar = cur_rchar; + last_wchar = cur_wchar; + last_read_bytes = cur_read_bytes; + last_write_bytes = cur_write_bytes; - lastReadbyte = curReadbyte; - lastWritebyte = curWritebyte; + if (*rchar_per_sec < 0) *rchar_per_sec = 0; + if (*wchar_per_sec < 0) *wchar_per_sec = 0; + if (*read_bytes_per_sec < 0) *read_bytes_per_sec = 0; + if (*write_bytes_per_sec < 0) *write_bytes_per_sec = 0; return 0; } @@ -705,10 +671,10 @@ void taosGetSystemInfo() { taosGetCpuCores(&tsNumOfCores); taosGetTotalMemory(&tsTotalMemoryKB); - float tmp1, tmp2; - taosGetBandSpeed(&tmp1); + double tmp1, tmp2, tmp3, tmp4; + taosGetBandSpeed(&tmp1, &tmp2); taosGetCpuUsage(&tmp1, &tmp2); - taosGetProcIO(&tmp1, &tmp2); + taosGetIOSpeed(&tmp1, &tmp2, &tmp3, &tmp4); } void taosKillSystem() {