未验证 提交 029391f0 编写于 作者: S slguan 提交者: GitHub

Merge pull request #1557 from taosdata/feature/vpeer

[TD-114] add wal config
...@@ -105,10 +105,10 @@ typedef struct { ...@@ -105,10 +105,10 @@ typedef struct {
typedef struct { typedef struct {
char tableId[TSDB_TABLE_ID_LEN + 1]; char tableId[TSDB_TABLE_ID_LEN + 1];
int8_t type; int8_t type;
} STableInfo; } STableObj;
typedef struct SSuperTableObj { typedef struct SSuperTableObj {
STableInfo info; STableObj info;
uint64_t uid; uint64_t uid;
int64_t createdTime; int64_t createdTime;
int32_t sversion; int32_t sversion;
...@@ -123,7 +123,7 @@ typedef struct SSuperTableObj { ...@@ -123,7 +123,7 @@ typedef struct SSuperTableObj {
} SSuperTableObj; } SSuperTableObj;
typedef struct { typedef struct {
STableInfo info; STableObj info;
uint64_t uid; uint64_t uid;
int64_t createdTime; int64_t createdTime;
int32_t sversion; //used by normal table int32_t sversion; //used by normal table
...@@ -254,7 +254,7 @@ typedef struct { ...@@ -254,7 +254,7 @@ typedef struct {
SUserObj *pUser; SUserObj *pUser;
SDbObj *pDb; SDbObj *pDb;
SVgObj *pVgroup; SVgObj *pVgroup;
STableInfo *pTable; STableObj *pTable;
} SQueuedMsg; } SQueuedMsg;
int32_t mgmtInitSystem(); int32_t mgmtInitSystem();
......
...@@ -27,7 +27,7 @@ extern "C" { ...@@ -27,7 +27,7 @@ extern "C" {
int32_t mgmtInitTables(); int32_t mgmtInitTables();
void mgmtCleanUpTables(); void mgmtCleanUpTables();
STableInfo* mgmtGetTable(char* tableId); STableObj* mgmtGetTable(char* tableId);
void mgmtIncTableRef(void *pTable); void mgmtIncTableRef(void *pTable);
void mgmtDecTableRef(void *pTable); void mgmtDecTableRef(void *pTable);
void mgmtDropAllChildTables(SDbObj *pDropDb); void mgmtDropAllChildTables(SDbObj *pDropDb);
......
...@@ -486,8 +486,8 @@ static void *mgmtGetSuperTable(char *tableId) { ...@@ -486,8 +486,8 @@ static void *mgmtGetSuperTable(char *tableId) {
return sdbGetRow(tsSuperTableSdb, tableId); return sdbGetRow(tsSuperTableSdb, tableId);
} }
STableInfo *mgmtGetTable(char *tableId) { STableObj *mgmtGetTable(char *tableId) {
STableInfo *tableInfo = sdbGetRow(tsSuperTableSdb, tableId); STableObj *tableInfo = sdbGetRow(tsSuperTableSdb, tableId);
if (tableInfo != NULL) { if (tableInfo != NULL) {
return tableInfo; return tableInfo;
} }
...@@ -501,7 +501,7 @@ STableInfo *mgmtGetTable(char *tableId) { ...@@ -501,7 +501,7 @@ STableInfo *mgmtGetTable(char *tableId) {
} }
void mgmtIncTableRef(void *p1) { void mgmtIncTableRef(void *p1) {
STableInfo *pTable = (STableInfo *)p1; STableObj *pTable = (STableObj *)p1;
if (pTable->type == TSDB_SUPER_TABLE) { if (pTable->type == TSDB_SUPER_TABLE) {
sdbIncRef(tsSuperTableSdb, pTable); sdbIncRef(tsSuperTableSdb, pTable);
} else { } else {
...@@ -512,7 +512,7 @@ void mgmtIncTableRef(void *p1) { ...@@ -512,7 +512,7 @@ void mgmtIncTableRef(void *p1) {
void mgmtDecTableRef(void *p1) { void mgmtDecTableRef(void *p1) {
if (p1 == NULL) return; if (p1 == NULL) return;
STableInfo *pTable = (STableInfo *)p1; STableObj *pTable = (STableObj *)p1;
if (pTable->type == TSDB_SUPER_TABLE) { if (pTable->type == TSDB_SUPER_TABLE) {
sdbDecRef(tsSuperTableSdb, pTable); sdbDecRef(tsSuperTableSdb, pTable);
} else { } else {
...@@ -1302,7 +1302,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) { ...@@ -1302,7 +1302,7 @@ static void mgmtProcessCreateChildTableMsg(SQueuedMsg *pMsg) {
return; return;
} }
pMsg->pTable = (STableInfo *)mgmtDoCreateChildTable(pCreate, pVgroup, sid); pMsg->pTable = (STableObj *)mgmtDoCreateChildTable(pCreate, pVgroup, sid);
if (pMsg->pTable == NULL) { if (pMsg->pTable == NULL) {
mgmtSendSimpleResp(pMsg->thandle, terrno); mgmtSendSimpleResp(pMsg->thandle, terrno);
return; return;
...@@ -1641,7 +1641,7 @@ static SChildTableObj* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_ ...@@ -1641,7 +1641,7 @@ static SChildTableObj* mgmtGetTableByPos(uint32_t dnodeId, int32_t vnode, int32_
} }
SChildTableObj *pTable = pVgroup->tableList[sid]; SChildTableObj *pTable = pVgroup->tableList[sid];
mgmtIncTableRef((STableInfo *)pTable); mgmtIncTableRef((STableObj *)pTable);
mgmtDecVgroupRef(pVgroup); mgmtDecVgroupRef(pVgroup);
return pTable; return pTable;
} }
......
...@@ -293,7 +293,7 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) { ...@@ -293,7 +293,7 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
int32_t maxReplica = 0; int32_t maxReplica = 0;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
STableInfo *pTable = NULL; STableObj *pTable = NULL;
if (pShow->payloadLen > 0 ) { if (pShow->payloadLen > 0 ) {
pTable = mgmtGetTable(pShow->payload); pTable = mgmtGetTable(pShow->payload);
if (NULL == pTable || pTable->type == TSDB_SUPER_TABLE) { if (NULL == pTable || pTable->type == TSDB_SUPER_TABLE) {
......
...@@ -20,6 +20,9 @@ ...@@ -20,6 +20,9 @@
extern "C" { extern "C" {
#endif #endif
#include "tsync.h"
#include "twal.h"
typedef enum _VN_STATUS { typedef enum _VN_STATUS {
VN_STATUS_INIT, VN_STATUS_INIT,
VN_STATUS_CREATING, VN_STATUS_CREATING,
...@@ -41,8 +44,9 @@ typedef struct { ...@@ -41,8 +44,9 @@ typedef struct {
void *sync; void *sync;
void *events; void *events;
void *cq; // continuous query void *cq; // continuous query
int32_t replicas; STsdbCfg tsdbCfg;
SVnodeDesc vpeers[TSDB_MAX_MPEERS]; SSyncCfg syncCfg;
SWalCfg walCfg;
} SVnodeObj; } SVnodeObj;
int vnodeWriteToQueue(void *param, SWalHead *pHead, int type); int vnodeWriteToQueue(void *param, SWalHead *pHead, int type);
......
...@@ -60,16 +60,6 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -60,16 +60,6 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
STsdbCfg tsdbCfg = {0};
tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId;
tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions;
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
tsdbCfg.minRowsPerFileBlock = -1;
tsdbCfg.maxRowsPerFileBlock = -1;
tsdbCfg.keep = -1;
tsdbCfg.maxCacheSize = -1;
char rootDir[TSDB_FILENAME_LEN] = {0}; char rootDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId); sprintf(rootDir, "%s/vnode%d", tsVnodeDir, pVnodeCfg->cfg.vgId);
if (mkdir(rootDir, 0755) != 0) { if (mkdir(rootDir, 0755) != 0) {
...@@ -89,6 +79,16 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -89,6 +79,16 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
return code; return code;
} }
STsdbCfg tsdbCfg = {0};
tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.tsdbId = pVnodeCfg->cfg.vgId;
tsdbCfg.maxTables = pVnodeCfg->cfg.maxSessions;
tsdbCfg.daysPerFile = pVnodeCfg->cfg.daysPerFile;
tsdbCfg.minRowsPerFileBlock = -1;
tsdbCfg.maxRowsPerFileBlock = -1;
tsdbCfg.keep = -1;
tsdbCfg.maxCacheSize = -1;
char tsdbDir[TSDB_FILENAME_LEN] = {0}; char tsdbDir[TSDB_FILENAME_LEN] = {0};
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL); code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL);
...@@ -140,7 +140,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -140,7 +140,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->rqueue = dnodeAllocateRqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode);
sprintf(temp, "%s/wal", rootDir); sprintf(temp, "%s/wal", rootDir);
pVnode->wal = walOpen(temp, 3, tsCommitLog); pVnode->wal = walOpen(temp, pVnode->walCfg.wals, pVnode->walCfg.commitLog);
pVnode->sync = NULL; pVnode->sync = NULL;
pVnode->events = NULL; pVnode->events = NULL;
pVnode->cq = NULL; pVnode->cq = NULL;
...@@ -293,9 +293,13 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -293,9 +293,13 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
FILE *fp = fopen(cfgFile, "w"); FILE *fp = fopen(cfgFile, "w");
if (!fp) return errno; if (!fp) return errno;
fprintf(fp, "replicas %d\n", pVnodeCfg->cfg.replications); fprintf(fp, "commitLog %d\n", pVnodeCfg->cfg.commitLog);
fprintf(fp, "wals %d\n", 3);
fprintf(fp, "arbitratorIp %d\n", pVnodeCfg->vpeerDesc[0].ip);
fprintf(fp, "quorum %d\n", 1);
fprintf(fp, "replica %d\n", pVnodeCfg->cfg.replications);
for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) { for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) {
fprintf(fp, "index%d dnode %d ip %u\n", i, pVnodeCfg->vpeerDesc[i].dnodeId, pVnodeCfg->vpeerDesc[i].ip); fprintf(fp, "index%d nodeId %d nodeIp %u name n%d\n", i, pVnodeCfg->vpeerDesc[i].dnodeId, pVnodeCfg->vpeerDesc[i].ip, pVnodeCfg->vpeerDesc[i].dnodeId);
} }
fclose(fp); fclose(fp);
...@@ -306,33 +310,60 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -306,33 +310,60 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
// TODO: this is a simple implement // TODO: this is a simple implement
static int32_t vnodeReadCfg(SVnodeObj *pVnode) { static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
char option[3][16] = {0};
char cfgFile[TSDB_FILENAME_LEN * 2] = {0}; char cfgFile[TSDB_FILENAME_LEN * 2] = {0};
sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnode->vgId); sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnode->vgId);
FILE *fp = fopen(cfgFile, "r"); FILE *fp = fopen(cfgFile, "r");
if (!fp) return errno; if (!fp) return errno;
char option[3][32] = {0}; int32_t commitLog = 0;
int32_t replicas = 0; int32_t num = fscanf(fp, "%s %d", option[0], &commitLog);
int32_t num = fscanf(fp, "%s %d", option[0], &replicas); if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[0], "commitLog") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (commitLog == 0) return TSDB_CODE_INVALID_FILE_FORMAT;
pVnode->walCfg.commitLog = (int8_t)commitLog;
int32_t wals = 0;
num = fscanf(fp, "%s %d", option[0], &wals);
if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT; if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[0], "replicas") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; if (strcmp(option[0], "wals") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (replicas == 0) return TSDB_CODE_INVALID_FILE_FORMAT; if (wals == 0) return TSDB_CODE_INVALID_FILE_FORMAT;
pVnode->replicas = replicas; pVnode->walCfg.wals = (int8_t)wals;
for (int32_t i = 0; i < replicas; ++i) { int32_t arbitratorIp = 0;
num = fscanf(fp, "%s %u", option[0], &arbitratorIp);
if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[0], "arbitratorIp") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (arbitratorIp == 0) return TSDB_CODE_INVALID_FILE_FORMAT;
pVnode->syncCfg.arbitratorIp = arbitratorIp;
int32_t quorum = 0;
num = fscanf(fp, "%s %d", option[0], &quorum);
if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[0], "quorum") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (quorum == 0) return TSDB_CODE_INVALID_FILE_FORMAT;
pVnode->syncCfg.quorum = (int8_t)quorum;
int32_t replica = 0;
num = fscanf(fp, "%s %d", option[0], &replica);
if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[0], "replica") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (replica == 0) return TSDB_CODE_INVALID_FILE_FORMAT;
pVnode->syncCfg.replica = (int8_t)replica;
for (int32_t i = 0; i < replica; ++i) {
int32_t dnodeId = 0; int32_t dnodeId = 0;
uint32_t dnodeIp = 0; uint32_t dnodeIp = 0;
num = fscanf(fp, "%s %s %d %s %u", option[0], option[1], &dnodeId, option[2], &dnodeIp); num = fscanf(fp, "%s %s %d %s %u %s %s", option[0], option[1], &dnodeId, option[2], &dnodeIp, option[3], pVnode->syncCfg.nodeInfo[i].name);
if (num != 5) return TSDB_CODE_INVALID_FILE_FORMAT; if (num != 7) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[1], "dnode") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; if (strcmp(option[1], "nodeId") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[2], "ip") != 0) return TSDB_CODE_INVALID_FILE_FORMAT; if (strcmp(option[2], "nodeIp") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[3], "name") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (dnodeId == 0) return TSDB_CODE_INVALID_FILE_FORMAT; if (dnodeId == 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (dnodeIp == 0) return TSDB_CODE_INVALID_FILE_FORMAT; if (dnodeIp == 0) return TSDB_CODE_INVALID_FILE_FORMAT;
pVnode->syncCfg.nodeInfo[i].nodeId = dnodeId;
pVnode->vpeers[i].dnodeId = dnodeId; pVnode->syncCfg.nodeInfo[i].nodeIp = dnodeIp;
pVnode->vpeers[i].ip = dnodeIp;
pVnode->vpeers[i].vgId = pVnode->vgId;
} }
fclose(fp); fclose(fp);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册