diff --git a/source/common/src/tglobal.c b/source/common/src/tglobal.c index 8d3deff495c32eb97fba80b71071f72baf9b3086..8823e63db4ab569c0dd55eb6488103897826a060 100644 --- a/source/common/src/tglobal.c +++ b/source/common/src/tglobal.c @@ -405,9 +405,8 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1); if (cfgAddInt32(pCfg, "numOfVnodeWriteThreads", tsNumOfVnodeWriteThreads, 1, 1024, 0) != 0) return -1; - // tsNumOfVnodeSyncThreads = tsNumOfCores; - tsNumOfVnodeSyncThreads = 32; - tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 1); + tsNumOfVnodeSyncThreads = tsNumOfCores * 2; + tsNumOfVnodeSyncThreads = TMAX(tsNumOfVnodeSyncThreads, 16); if (cfgAddInt32(pCfg, "numOfVnodeSyncThreads", tsNumOfVnodeSyncThreads, 1, 1024, 0) != 0) return -1; tsNumOfQnodeQueryThreads = tsNumOfCores * 2; diff --git a/source/libs/sync/inc/syncRaftCfg.h b/source/libs/sync/inc/syncRaftCfg.h index fead7cdc762b46b4d3e26a2af0d5f370eb19c6cf..f0bfb883530287492f7bc44b22026f416ed4b6ed 100644 --- a/source/libs/sync/inc/syncRaftCfg.h +++ b/source/libs/sync/inc/syncRaftCfg.h @@ -31,6 +31,29 @@ extern "C" { #define MAX_CONFIG_INDEX_COUNT 512 +// SRaftCfgIndex ------------------------------------------ +typedef struct SRaftCfgIndex { + TdFilePtr pFile; + char path[TSDB_FILENAME_LEN * 2]; + + SyncIndex configIndexArr[MAX_CONFIG_INDEX_COUNT]; + int32_t configIndexCount; +} SRaftCfgIndex; + +SRaftCfgIndex *raftCfgIndexOpen(const char *path); +int32_t raftCfgIndexClose(SRaftCfgIndex *pRaftCfgIndex); +int32_t raftCfgIndexPersist(SRaftCfgIndex *pRaftCfgIndex); +int32_t raftCfgIndexAddConfigIndex(SRaftCfgIndex *pRaftCfgIndex, SyncIndex configIndex); + +cJSON *raftCfgIndex2Json(SRaftCfgIndex *pRaftCfgIndex); +char *raftCfgIndex2Str(SRaftCfgIndex *pRaftCfgIndex); +int32_t raftCfgIndexFromJson(const cJSON *pRoot, SRaftCfgIndex *pRaftCfgIndex); +int32_t raftCfgIndexFromStr(const char *s, SRaftCfgIndex *pRaftCfgIndex); + +int32_t raftCfgIndexCreateFile(const char *path); + +// --------------------------------------------------------- + typedef struct SRaftCfg { SSyncCfg cfg; TdFilePtr pFile; @@ -50,14 +73,14 @@ int32_t raftCfgClose(SRaftCfg *pRaftCfg); int32_t raftCfgPersist(SRaftCfg *pRaftCfg); int32_t raftCfgAddConfigIndex(SRaftCfg *pRaftCfg, SyncIndex configIndex); -cJSON * syncCfg2Json(SSyncCfg *pSyncCfg); -char * syncCfg2Str(SSyncCfg *pSyncCfg); -char * syncCfg2SimpleStr(SSyncCfg *pSyncCfg); +cJSON *syncCfg2Json(SSyncCfg *pSyncCfg); +char *syncCfg2Str(SSyncCfg *pSyncCfg); +char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg); int32_t syncCfgFromJson(const cJSON *pRoot, SSyncCfg *pSyncCfg); int32_t syncCfgFromStr(const char *s, SSyncCfg *pSyncCfg); -cJSON * raftCfg2Json(SRaftCfg *pRaftCfg); -char * raftCfg2Str(SRaftCfg *pRaftCfg); +cJSON *raftCfg2Json(SRaftCfg *pRaftCfg); +char *raftCfg2Str(SRaftCfg *pRaftCfg); int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg); int32_t raftCfgFromStr(const char *s, SRaftCfg *pRaftCfg); @@ -82,6 +105,11 @@ void raftCfgPrint2(char *s, SRaftCfg *pCfg); void raftCfgLog(SRaftCfg *pCfg); void raftCfgLog2(char *s, SRaftCfg *pCfg); +void raftCfgIndexPrint(SRaftCfgIndex *pCfg); +void raftCfgIndexPrint2(char *s, SRaftCfgIndex *pCfg); +void raftCfgIndexLog(SRaftCfgIndex *pCfg); +void raftCfgIndexLog2(char *s, SRaftCfgIndex *pCfg); + #ifdef __cplusplus } #endif diff --git a/source/libs/sync/src/syncRaftCfg.c b/source/libs/sync/src/syncRaftCfg.c index 56666b35b621d48d99c5a55063bd67c053d59776..5de21bceca99e13b3c2c33b72cd96f0ca6f86fa8 100644 --- a/source/libs/sync/src/syncRaftCfg.c +++ b/source/libs/sync/src/syncRaftCfg.c @@ -18,6 +18,149 @@ #include "syncEnv.h" #include "syncUtil.h" +// file must already exist! +SRaftCfgIndex *raftCfgIndexOpen(const char *path) { + SRaftCfgIndex *pRaftCfgIndex = taosMemoryMalloc(sizeof(SRaftCfg)); + snprintf(pRaftCfgIndex->path, sizeof(pRaftCfgIndex->path), "%s", path); + + pRaftCfgIndex->pFile = taosOpenFile(pRaftCfgIndex->path, TD_FILE_READ | TD_FILE_WRITE); + ASSERT(pRaftCfgIndex->pFile != NULL); + + taosLSeekFile(pRaftCfgIndex->pFile, 0, SEEK_SET); + + int32_t bufLen = MAX_CONFIG_INDEX_COUNT * 16; + char *pBuf = taosMemoryMalloc(bufLen); + memset(pBuf, 0, bufLen); + int64_t len = taosReadFile(pRaftCfgIndex->pFile, pBuf, bufLen); + ASSERT(len > 0); + + int32_t ret = raftCfgIndexFromStr(pBuf, pRaftCfgIndex); + ASSERT(ret == 0); + + taosMemoryFree(pBuf); + + return pRaftCfgIndex; +} + +int32_t raftCfgIndexClose(SRaftCfgIndex *pRaftCfgIndex) { + if (pRaftCfgIndex != NULL) { + int64_t ret = taosCloseFile(&(pRaftCfgIndex->pFile)); + ASSERT(ret == 0); + taosMemoryFree(pRaftCfgIndex); + } + return 0; +} + +int32_t raftCfgIndexPersist(SRaftCfgIndex *pRaftCfgIndex) { + ASSERT(pRaftCfgIndex != NULL); + + char *s = raftCfgIndex2Str(pRaftCfgIndex); + taosLSeekFile(pRaftCfgIndex->pFile, 0, SEEK_SET); + + int64_t ret = taosWriteFile(pRaftCfgIndex->pFile, s, strlen(s) + 1); + ASSERT(ret == strlen(s) + 1); + + taosMemoryFree(s); + taosFsyncFile(pRaftCfgIndex->pFile); + return 0; +} + +int32_t raftCfgIndexAddConfigIndex(SRaftCfgIndex *pRaftCfgIndex, SyncIndex configIndex) { + ASSERT(pRaftCfgIndex->configIndexCount <= MAX_CONFIG_INDEX_COUNT); + (pRaftCfgIndex->configIndexArr)[pRaftCfgIndex->configIndexCount] = configIndex; + ++(pRaftCfgIndex->configIndexCount); + return 0; +} + +cJSON *raftCfgIndex2Json(SRaftCfgIndex *pRaftCfgIndex) { + cJSON *pRoot = cJSON_CreateObject(); + + cJSON_AddNumberToObject(pRoot, "configIndexCount", pRaftCfgIndex->configIndexCount); + cJSON *pIndexArr = cJSON_CreateArray(); + cJSON_AddItemToObject(pRoot, "configIndexArr", pIndexArr); + for (int i = 0; i < pRaftCfgIndex->configIndexCount; ++i) { + char buf64[128]; + snprintf(buf64, sizeof(buf64), "%" PRId64, (pRaftCfgIndex->configIndexArr)[i]); + cJSON *pIndexObj = cJSON_CreateObject(); + cJSON_AddStringToObject(pIndexObj, "index", buf64); + cJSON_AddItemToArray(pIndexArr, pIndexObj); + } + + cJSON *pJson = cJSON_CreateObject(); + cJSON_AddItemToObject(pJson, "SRaftCfgIndex", pRoot); + return pJson; +} + +char *raftCfgIndex2Str(SRaftCfgIndex *pRaftCfgIndex) { + cJSON *pJson = raftCfgIndex2Json(pRaftCfgIndex); + char *serialized = cJSON_Print(pJson); + cJSON_Delete(pJson); + return serialized; +} + +int32_t raftCfgIndexFromJson(const cJSON *pRoot, SRaftCfgIndex *pRaftCfgIndex) { + cJSON *pJson = cJSON_GetObjectItem(pRoot, "SRaftCfgIndex"); + + cJSON *pJsonConfigIndexCount = cJSON_GetObjectItem(pJson, "configIndexCount"); + pRaftCfgIndex->configIndexCount = cJSON_GetNumberValue(pJsonConfigIndexCount); + + cJSON *pIndexArr = cJSON_GetObjectItem(pJson, "configIndexArr"); + int arraySize = cJSON_GetArraySize(pIndexArr); + ASSERT(arraySize == pRaftCfgIndex->configIndexCount); + + memset(pRaftCfgIndex->configIndexArr, 0, sizeof(pRaftCfgIndex->configIndexArr)); + for (int i = 0; i < arraySize; ++i) { + cJSON *pIndexObj = cJSON_GetArrayItem(pIndexArr, i); + ASSERT(pIndexObj != NULL); + + cJSON *pIndex = cJSON_GetObjectItem(pIndexObj, "index"); + ASSERT(cJSON_IsString(pIndex)); + (pRaftCfgIndex->configIndexArr)[i] = atoll(pIndex->valuestring); + } + + return 0; +} + +int32_t raftCfgIndexFromStr(const char *s, SRaftCfgIndex *pRaftCfgIndex) { + cJSON *pRoot = cJSON_Parse(s); + ASSERT(pRoot != NULL); + + int32_t ret = raftCfgIndexFromJson(pRoot, pRaftCfgIndex); + ASSERT(ret == 0); + + cJSON_Delete(pRoot); + return 0; +} + +int32_t raftCfgIndexCreateFile(const char *path) { + TdFilePtr pFile = taosOpenFile(path, TD_FILE_CREATE | TD_FILE_WRITE); + if (pFile == NULL) { + int32_t err = terrno; + const char *errStr = tstrerror(err); + int32_t sysErr = errno; + const char *sysErrStr = strerror(errno); + sError("create raft cfg index file error, err:%d %X, msg:%s, syserr:%d, sysmsg:%s", err, err, errStr, sysErr, + sysErrStr); + ASSERT(0); + + return -1; + } + + SRaftCfgIndex raftCfgIndex; + memset(raftCfgIndex.configIndexArr, 0, sizeof(raftCfgIndex.configIndexArr)); + raftCfgIndex.configIndexCount = 1; + raftCfgIndex.configIndexArr[0] = -1; + + char *s = raftCfgIndex2Str(&raftCfgIndex); + int64_t ret = taosWriteFile(pFile, s, strlen(s) + 1); + ASSERT(ret == strlen(s) + 1); + + taosMemoryFree(s); + taosCloseFile(&pFile); + return 0; +} + +// --------------------------------------- // file must already exist! SRaftCfg *raftCfgOpen(const char *path) { SRaftCfg *pCfg = taosMemoryMalloc(sizeof(SRaftCfg)); @@ -101,7 +244,7 @@ cJSON *syncCfg2Json(SSyncCfg *pSyncCfg) { char *syncCfg2Str(SSyncCfg *pSyncCfg) { cJSON *pJson = syncCfg2Json(pSyncCfg); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -109,7 +252,7 @@ char *syncCfg2Str(SSyncCfg *pSyncCfg) { char *syncCfg2SimpleStr(SSyncCfg *pSyncCfg) { if (pSyncCfg != NULL) { int32_t len = 512; - char * s = taosMemoryMalloc(len); + char *s = taosMemoryMalloc(len); memset(s, 0, len); snprintf(s, len, "{r-num:%d, my:%d, ", pSyncCfg->replicaNum, pSyncCfg->myIndex); @@ -206,7 +349,7 @@ cJSON *raftCfg2Json(SRaftCfg *pRaftCfg) { char *raftCfg2Str(SRaftCfg *pRaftCfg) { cJSON *pJson = raftCfg2Json(pRaftCfg); - char * serialized = cJSON_Print(pJson); + char *serialized = cJSON_Print(pJson); cJSON_Delete(pJson); return serialized; } @@ -285,7 +428,7 @@ int32_t raftCfgFromJson(const cJSON *pRoot, SRaftCfg *pRaftCfg) { (pRaftCfg->configIndexArr)[i] = atoll(pIndex->valuestring); } - cJSON * pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); + cJSON *pJsonSyncCfg = cJSON_GetObjectItem(pJson, "SSyncCfg"); int32_t code = syncCfgFromJson(pJsonSyncCfg, &(pRaftCfg->cfg)); ASSERT(code == 0); @@ -361,3 +504,30 @@ void raftCfgLog2(char *s, SRaftCfg *pCfg) { sTrace("raftCfgLog2 | len:%" PRIu64 " | %s | %s", strlen(serialized), s, serialized); taosMemoryFree(serialized); } + +// --------- +void raftCfgIndexPrint(SRaftCfgIndex *pCfg) { + char *serialized = raftCfgIndex2Str(pCfg); + printf("raftCfgIndexPrint | len:%" PRIu64 " | %s \n", strlen(serialized), serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void raftCfgIndexPrint2(char *s, SRaftCfgIndex *pCfg) { + char *serialized = raftCfgIndex2Str(pCfg); + printf("raftCfgIndexPrint2 | len:%" PRIu64 " | %s | %s \n", strlen(serialized), s, serialized); + fflush(NULL); + taosMemoryFree(serialized); +} + +void raftCfgIndexLog(SRaftCfgIndex *pCfg) { + char *serialized = raftCfgIndex2Str(pCfg); + sTrace("raftCfgIndexLog | len:%" PRIu64 " | %s", strlen(serialized), serialized); + taosMemoryFree(serialized); +} + +void raftCfgIndexLog2(char *s, SRaftCfgIndex *pCfg) { + char *serialized = raftCfgIndex2Str(pCfg); + sTrace("raftCfgIndexLog2 | len:%" PRIu64 " | %s | %s", strlen(serialized), s, serialized); + taosMemoryFree(serialized); +} diff --git a/source/libs/sync/test/CMakeLists.txt b/source/libs/sync/test/CMakeLists.txt index e787080795a36f77aca41d520ce7e232e9555358..72845d0c1d1a9378a3a189f4037c6fc646c8a536 100644 --- a/source/libs/sync/test/CMakeLists.txt +++ b/source/libs/sync/test/CMakeLists.txt @@ -56,6 +56,7 @@ add_executable(syncRaftLogTest3 "") add_executable(syncLeaderTransferTest "") add_executable(syncReconfigFinishTest "") add_executable(syncRestoreFromSnapshot "") +add_executable(syncRaftCfgIndexTest "") target_sources(syncTest @@ -290,6 +291,10 @@ target_sources(syncRestoreFromSnapshot PRIVATE "syncRestoreFromSnapshot.cpp" ) +target_sources(syncRaftCfgIndexTest + PRIVATE + "syncRaftCfgIndexTest.cpp" +) target_include_directories(syncTest @@ -582,6 +587,11 @@ target_include_directories(syncRestoreFromSnapshot "${TD_SOURCE_DIR}/include/libs/sync" "${CMAKE_CURRENT_SOURCE_DIR}/../inc" ) +target_include_directories(syncRaftCfgIndexTest + PUBLIC + "${TD_SOURCE_DIR}/include/libs/sync" + "${CMAKE_CURRENT_SOURCE_DIR}/../inc" +) target_link_libraries(syncTest @@ -816,6 +826,10 @@ target_link_libraries(syncRestoreFromSnapshot sync gtest_main ) +target_link_libraries(syncRaftCfgIndexTest + sync + gtest_main +) enable_testing() diff --git a/source/libs/sync/test/syncRaftCfgIndexTest.cpp b/source/libs/sync/test/syncRaftCfgIndexTest.cpp new file mode 100644 index 0000000000000000000000000000000000000000..6338383f92ceac88549753a415a5c7ba729c976f --- /dev/null +++ b/source/libs/sync/test/syncRaftCfgIndexTest.cpp @@ -0,0 +1,96 @@ +#include "syncRaftStore.h" +//#include +#include +#include "syncIO.h" +#include "syncInt.h" +#include "syncRaftCfg.h" +#include "syncUtil.h" + +void logTest() { + sTrace("--- sync log test: trace"); + sDebug("--- sync log test: debug"); + sInfo("--- sync log test: info"); + sWarn("--- sync log test: warn"); + sError("--- sync log test: error"); + sFatal("--- sync log test: fatal"); +} + +SRaftCfg* createRaftCfg() { + SRaftCfg* pCfg = (SRaftCfg*)taosMemoryMalloc(sizeof(SRaftCfg)); + memset(pCfg, 0, sizeof(SRaftCfg)); + + pCfg->cfg.replicaNum = 3; + pCfg->cfg.myIndex = 1; + for (int i = 0; i < pCfg->cfg.replicaNum; ++i) { + ((pCfg->cfg.nodeInfo)[i]).nodePort = i * 100; + snprintf(((pCfg->cfg.nodeInfo)[i]).nodeFqdn, sizeof(((pCfg->cfg.nodeInfo)[i]).nodeFqdn), "100.200.300.%d", i); + } + pCfg->isStandBy = taosGetTimestampSec() % 100; + pCfg->batchSize = taosGetTimestampSec() % 100; + + pCfg->configIndexCount = 5; + for (int i = 0; i < MAX_CONFIG_INDEX_COUNT; ++i) { + (pCfg->configIndexArr)[i] = -1; + } + for (int i = 0; i < pCfg->configIndexCount; ++i) { + (pCfg->configIndexArr)[i] = i * 100; + } + + return pCfg; +} + +SSyncCfg* createSyncCfg() { + SSyncCfg* pCfg = (SSyncCfg*)taosMemoryMalloc(sizeof(SSyncCfg)); + memset(pCfg, 0, sizeof(SSyncCfg)); + + pCfg->replicaNum = 3; + pCfg->myIndex = 1; + for (int i = 0; i < pCfg->replicaNum; ++i) { + ((pCfg->nodeInfo)[i]).nodePort = i * 100; + snprintf(((pCfg->nodeInfo)[i]).nodeFqdn, sizeof(((pCfg->nodeInfo)[i]).nodeFqdn), "100.200.300.%d", i); + } + + return pCfg; +} + +const char *pFile = "./raft_config_index.json"; + +void test1() { + int32_t code = raftCfgIndexCreateFile(pFile); + ASSERT(code == 0); + + SRaftCfgIndex *pRaftCfgIndex = raftCfgIndexOpen(pFile); + raftCfgIndexLog2((char*)"==test1==", pRaftCfgIndex); + + raftCfgIndexClose(pRaftCfgIndex); +} + +void test2() { + SRaftCfgIndex *pRaftCfgIndex = raftCfgIndexOpen(pFile); + for (int i = 0; i < 500; ++i) { + raftCfgIndexAddConfigIndex(pRaftCfgIndex, i); + } + raftCfgIndexPersist(pRaftCfgIndex); + + raftCfgIndexLog2((char*)"==test2==", pRaftCfgIndex); + raftCfgIndexClose(pRaftCfgIndex); +} + +void test3() { + SRaftCfgIndex *pRaftCfgIndex = raftCfgIndexOpen(pFile); + + raftCfgIndexLog2((char*)"==test3==", pRaftCfgIndex); + raftCfgIndexClose(pRaftCfgIndex); +} + +int main() { + tsAsyncLog = 0; + sDebugFlag = DEBUG_TRACE + DEBUG_SCREEN + DEBUG_FILE; + + logTest(); + test1(); + test2(); + test3(); + + return 0; +}