diff --git a/src/inc/mnode.h b/src/inc/mnode.h index 651ad0036edf9f80abad52660e9cdc813b89dcde..70c7f8003d92811670767c1aa263c3e8455221b7 100644 --- a/src/inc/mnode.h +++ b/src/inc/mnode.h @@ -105,10 +105,10 @@ typedef struct { typedef struct { char tableId[TSDB_TABLE_ID_LEN + 1]; int8_t type; -} STableInfo; +} STableObj; typedef struct SSuperTableObj { - STableInfo info; + STableObj info; uint64_t uid; int64_t createdTime; int32_t sversion; @@ -123,7 +123,7 @@ typedef struct SSuperTableObj { } SSuperTableObj; typedef struct { - STableInfo info; + STableObj info; uint64_t uid; int64_t createdTime; int32_t sversion; //used by normal table @@ -255,7 +255,7 @@ typedef struct { SUserObj *pUser; SDbObj *pDb; SVgObj *pVgroup; - STableInfo *pTable; + STableObj *pTable; } SQueuedMsg; int32_t mgmtInitSystem(); diff --git a/src/mnode/inc/mgmtTable.h b/src/mnode/inc/mgmtTable.h index ddbbfb4a70200ab560cb33db1a9a2d897c152993..4d3e0f6b4356b633fb2ef8547a669ea872ccbd87 100644 --- a/src/mnode/inc/mgmtTable.h +++ b/src/mnode/inc/mgmtTable.h @@ -27,7 +27,7 @@ extern "C" { int32_t mgmtInitTables(); void mgmtCleanUpTables(); -STableInfo* mgmtGetTable(char* tableId); +STableObj* mgmtGetTable(char* tableId); void mgmtIncTableRef(void *pTable); void mgmtDecTableRef(void *pTable); void mgmtDropAllChildTables(SDbObj *pDropDb); diff --git a/src/mnode/src/mgmtTable.c b/src/mnode/src/mgmtTable.c index c4996fc4bb02592e8f3dfb914cfd24d47b0ae6fc..d6d7a6afc0cc649cee27ad45ef3483f816af7a61 100644 --- a/src/mnode/src/mgmtTable.c +++ b/src/mnode/src/mgmtTable.c @@ -486,8 +486,8 @@ static void *mgmtGetSuperTable(char *tableId) { return sdbGetRow(tsSuperTableSdb, tableId); } -STableInfo *mgmtGetTable(char *tableId) { - STableInfo *tableInfo = sdbGetRow(tsSuperTableSdb, tableId); +STableObj *mgmtGetTable(char *tableId) { + STableObj *tableInfo = sdbGetRow(tsSuperTableSdb, tableId); if (tableInfo != NULL) { return tableInfo; } @@ -501,7 +501,7 @@ STableInfo *mgmtGetTable(char *tableId) { } void mgmtIncTableRef(void *p1) { - STableInfo *pTable = (STableInfo *)p1; + STableObj *pTable = (STableObj *)p1; if (pTable->type == TSDB_SUPER_TABLE) { sdbIncRef(tsSuperTableSdb, pTable); } else { @@ -512,7 +512,7 @@ void mgmtIncTableRef(void *p1) { void mgmtDecTableRef(void *p1) { if (p1 == NULL) return; - STableInfo *pTable = (STableInfo *)p1; + STableObj *pTable = (STableObj *)p1; if (pTable->type == TSDB_SUPER_TABLE) { sdbDecRef(tsSuperTableSdb, pTable); } else { @@ -1302,7 +1302,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { return; } - pMsg->pTable = (STableInfo *)mgmtDoCreateChildTable(pCreate, pVgroup, sid); + pMsg->pTable = (STableObj *)mgmtDoCreateChildTable(pCreate, pVgroup, sid); if (pMsg->pTable == NULL) { mgmtSendSimpleResp(pMsg->thandle, terrno); return; @@ -1641,7 +1641,7 @@ static SChildTableObj* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_ } SChildTableObj *pTable = pVgroup->tableList[sid]; - mgmtIncTableRef((STableInfo *)pTable); + mgmtIncTableRef((STableObj *)pTable); mgmtDecVgroupRef(pVgroup); return pTable; } diff --git a/src/mnode/src/mgmtVgroup.c b/src/mnode/src/mgmtVgroup.c index c16cae775cfacd23c8637d382bfd19ee3c2c30fd..a6ceaa1f41c57a173a2775d0cf085271a861d89e 100644 --- a/src/mnode/src/mgmtVgroup.c +++ b/src/mnode/src/mgmtVgroup.c @@ -301,7 +301,7 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { int32_t maxReplica = 0; SVgObj *pVgroup = NULL; - STableInfo *pTable = NULL; + STableObj *pTable = NULL; if (pShow->payloadLen > 0 ) { pTable = mgmtGetTable(pShow->payload); if (NULL == pTable || pTable->type == TSDB_SUPER_TABLE) { diff --git a/src/vnode/main/inc/vnodeInt.h b/src/vnode/main/inc/vnodeInt.h index 561b6ae61ff8ae85c1bd6b0c47e53b1d80c6ba99..4d078869c441e6a827fb0270af6af5cc3d07c42c 100644 --- a/src/vnode/main/inc/vnodeInt.h +++ b/src/vnode/main/inc/vnodeInt.h @@ -20,6 +20,9 @@ extern "C" { #endif +#include "tsync.h" +#include "twal.h" + typedef enum _VN_STATUS { VN_STATUS_INIT, VN_STATUS_CREATING, @@ -41,8 +44,9 @@ typedef struct { void *sync; void *events; void *cq; // continuous query - int32_t replicas; - SVnodeDesc vpeers[TSDB_MAX_MPEERS]; + STsdbCfg tsdbCfg; + SSyncCfg syncCfg; + SWalCfg walCfg; } SVnodeObj; int vnodeWriteToQueue(void *param, SWalHead *pHead, int type); diff --git a/src/vnode/main/src/vnodeMain.c b/src/vnode/main/src/vnodeMain.c index 3a70ec0d7d024774bf4cb4af962164c57b1add37..1211828a4714b64cdcfde160164bfa316621c756 100644 --- a/src/vnode/main/src/vnodeMain.c +++ b/src/vnode/main/src/vnodeMain.c @@ -60,16 +60,6 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { return TSDB_CODE_SUCCESS; } - STsdbCfg tsdbCfg = {0}; - tsdbCfg.precision = pVnodeCfg->cfg.precision; - tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId; - tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions; - tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile; - tsdbCfg.minRowsPerFileBlock = -1; - tsdbCfg.maxRowsPerFileBlock = -1; - tsdbCfg.keep = -1; - tsdbCfg.maxCacheSize = -1; - char rootDir[TSDB_FILENAME_LEN] = {0}; sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId); if (mkdir(rootDir, 0755) != 0) { @@ -89,6 +79,16 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { return code; } + STsdbCfg tsdbCfg = {0}; + tsdbCfg.precision = pVnodeCfg->cfg.precision; + tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId; + tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions; + tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile; + tsdbCfg.minRowsPerFileBlock = -1; + tsdbCfg.maxRowsPerFileBlock = -1; + tsdbCfg.keep = -1; + tsdbCfg.maxCacheSize = -1; + char tsdbDir[TSDB_FILENAME_LEN] = {0}; sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL); @@ -140,7 +140,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->rqueue = dnodeAllocateRqueue(pVnode); sprintf(temp, "%s/wal", rootDir); - pVnode->wal = walOpen(temp, 3, tsCommitLog); + pVnode->wal = walOpen(temp, pVnode->walCfg.wals, pVnode->walCfg.commitLog); pVnode->sync = NULL; pVnode->events = NULL; pVnode->cq = NULL; @@ -293,9 +293,13 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { FILE *fp = fopen(cfgFile, "w"); if (!fp) return errno; - fprintf(fp, "replicas %d\n", pVnodeCfg->cfg.replications); + fprintf(fp, "commitLog %d\n", pVnodeCfg->cfg.commitLog); + fprintf(fp, "wals %d\n", 3); + fprintf(fp, "arbitratorIp %d\n", pVnodeCfg->vpeerDesc[0].ip); + fprintf(fp, "quorum %d\n", 1); + fprintf(fp, "replica %d\n", pVnodeCfg->cfg.replications); for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) { - fprintf(fp, "index%d dnode %d ip %u\n", i, pVnodeCfg->vpeerDesc[i].dnodeId, pVnodeCfg->vpeerDesc[i].ip); + fprintf(fp, "index%d nodeId %d nodeIp %u name n%d\n", i, pVnodeCfg->vpeerDesc[i].dnodeId, pVnodeCfg->vpeerDesc[i].ip, pVnodeCfg->vpeerDesc[i].dnodeId); } fclose(fp); @@ -306,37 +310,64 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { // TODO: this is a simple implement static int32_t vnodeReadCfg(SVnodeObj *pVnode) { + char option[3][16] = {0}; char cfgFile[TSDB_FILENAME_LEN * 2] = {0}; sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnode->vgId); FILE *fp = fopen(cfgFile, "r"); if (!fp) return errno; - char option[3][32] = {0}; - int32_t replicas = 0; - int32_t num = fscanf(fp, "%s %d", option[0], &replicas); + int32_t commitLog = -1; + int32_t num = fscanf(fp, "%s %d", option[0], &commitLog); + if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; + if (strcmp(option[0], "commitLog") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (commitLog == -1) return TSDB_CODE_INVALID_FILE_FORMAT; + pVnode->walCfg.commitLog = (int8_t)commitLog; + + int32_t wals = -1; + num = fscanf(fp, "%s %d", option[0], &wals); if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; - if (strcmp(option[0], "replicas") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; - if (replicas == 0) return TSDB_CODE_INVALID_FILE_FORMAT; - pVnode->replicas = replicas; - - for (int32_t i = 0; i < replicas; ++i) { - int32_t dnodeId = 0; - uint32_t dnodeIp = 0; - num = fscanf(fp, "%s %s %d %s %u", option[0], option[1], &dnodeId, option[2], &dnodeIp); - if (num != 5) return TSDB_CODE_INVALID_FILE_FORMAT; - if (strcmp(option[1], "dnode") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; - if (strcmp(option[2], "ip") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; - if (dnodeId == 0) return TSDB_CODE_INVALID_FILE_FORMAT; - if (dnodeIp == 0) return TSDB_CODE_INVALID_FILE_FORMAT; - - pVnode->vpeers[i].dnodeId = dnodeId; - pVnode->vpeers[i].ip = dnodeIp; - pVnode->vpeers[i].vgId = pVnode->vgId; + if (strcmp(option[0], "wals") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (wals == -1) return TSDB_CODE_INVALID_FILE_FORMAT; + pVnode->walCfg.wals = (int8_t)wals; + + int32_t arbitratorIp = -1; + num = fscanf(fp, "%s %u", option[0], &arbitratorIp); + if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; + if (strcmp(option[0], "arbitratorIp") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (arbitratorIp == -1) return TSDB_CODE_INVALID_FILE_FORMAT; + pVnode->syncCfg.arbitratorIp = arbitratorIp; + + int32_t quorum = -1; + num = fscanf(fp, "%s %d", option[0], &quorum); + if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; + if (strcmp(option[0], "quorum") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (quorum == -1) return TSDB_CODE_INVALID_FILE_FORMAT; + pVnode->syncCfg.quorum = (int8_t)quorum; + + int32_t replica = -1; + num = fscanf(fp, "%s %d", option[0], &replica); + if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; + if (strcmp(option[0], "replica") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (replica == -1) return TSDB_CODE_INVALID_FILE_FORMAT; + pVnode->syncCfg.replica = (int8_t)replica; + + for (int32_t i = 0; i < replica; ++i) { + int32_t dnodeId = -1; + uint32_t dnodeIp = -1; + num = fscanf(fp, "%s %s %d %s %u %s %s", option[0], option[1], &dnodeId, option[2], &dnodeIp, option[3], pVnode->syncCfg.nodeInfo[i].name); + if (num != 7) return TSDB_CODE_INVALID_FILE_FORMAT; + if (strcmp(option[1], "nodeId") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (strcmp(option[2], "nodeIp") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (strcmp(option[3], "name") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; + if (dnodeId == -1) return TSDB_CODE_INVALID_FILE_FORMAT; + if (dnodeIp == -1) return TSDB_CODE_INVALID_FILE_FORMAT; + pVnode->syncCfg.nodeInfo[i].nodeId = dnodeId; + pVnode->syncCfg.nodeInfo[i].nodeIp = dnodeIp; } fclose(fp); dTrace("pVnode:%p vgId:%d, read vnode cfg successed", pVnode, pVnode->vgId); return TSDB_CODE_SUCCESS; -} \ No newline at end of file +}