提交 445cdc2d 编写于 作者: S slguan

[TD-114] save vnode cfg into config file

上级 69a071d8
......@@ -1835,7 +1835,7 @@ int tscProcessTableMetaRsp(SSqlObj *pSql) {
}
for (int i = 0; i < TSDB_VNODES_SUPPORT; ++i) {
pMetaMsg->vpeerDesc[i].vnode = htonl(pMetaMsg->vpeerDesc[i].vnode);
pMetaMsg->vpeerDesc[i].vgId = htonl(pMetaMsg->vpeerDesc[i].vgId);
}
SSchema* pSchema = pMetaMsg->schema;
......
......@@ -1454,7 +1454,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
SSqlObj *pNew = tscCreateSqlObjForSubquery(pParentSql, trsupport, pSql);
if (pNew == NULL) {
tscError("%p sub:%p failed to create new subquery due to out of memory, abort retry, vid:%d, orderOfSub:%d",
trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vnode : -1, trsupport->subqueryIndex);
trsupport->pParentSqlObj, pSql, pSvd != NULL ? pSvd->vgId : -1, trsupport->subqueryIndex);
pState->code = -TSDB_CODE_CLI_OUT_OF_MEMORY;
trsupport->numOfRetry = MAX_NUM_OF_SUBQUERY_RETRY;
......@@ -1470,7 +1470,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
if (pState->code != TSDB_CODE_SUCCESS) { // failed, abort
if (vnodeInfo != NULL) {
tscTrace("%p sub:%p query failed,ip:%u,vid:%d,orderOfSub:%d,global code:%d", pParentSql, pSql,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vgId,
trsupport->subqueryIndex, pState->code);
} else {
tscTrace("%p sub:%p query failed,orderOfSub:%d,global code:%d", pParentSql, pSql,
......@@ -1481,7 +1481,7 @@ void tscRetrieveDataRes(void *param, TAOS_RES *tres, int code) {
} else { // success, proceed to retrieve data from dnode
if (vnodeInfo != NULL) {
tscTrace("%p sub:%p query complete,ip:%u,vid:%d,orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vnode,
vnodeInfo->vpeerDesc[vnodeInfo->index].ip, vnodeInfo->vpeerDesc[vnodeInfo->index].vgId,
trsupport->subqueryIndex);
} else {
tscTrace("%p sub:%p query complete, orderOfSub:%d,retrieve data", trsupport->pParentSqlObj, pSql,
......
......@@ -160,11 +160,24 @@ static void dnodeCloseVnodes() {
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SMDCreateVnodeMsg *pCreate = rpcMsg->pCont;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
pCreate->cfg.commitLog = pCreate->cfg.commitLog;
pCreate->cfg.vgId = htonl(pCreate->cfg.vgId);
pCreate->cfg.maxSessions = htonl(pCreate->cfg.maxSessions);
pCreate->cfg.cacheBlockSize = htonl(pCreate->cfg.cacheBlockSize);
pCreate->cfg.daysPerFile = htonl(pCreate->cfg.daysPerFile);
pCreate->cfg.daysToKeep1 = htonl(pCreate->cfg.daysToKeep1);
pCreate->cfg.daysToKeep2 = htonl(pCreate->cfg.daysToKeep2);
pCreate->cfg.daysToKeep = htonl(pCreate->cfg.daysToKeep);
pCreate->cfg.commitTime = htonl(pCreate->cfg.commitTime);
pCreate->cfg.rowsInFileBlock = htonl(pCreate->cfg.rowsInFileBlock);
pCreate->cfg.blocksPerTable = htons(pCreate->cfg.blocksPerTable);
pCreate->cfg.cacheNumOfBlocks.totalBlocks = htonl(pCreate->cfg.cacheNumOfBlocks.totalBlocks);
for (int32_t j = 0; j < pCreate->cfg.replications; ++j) {
pCreate->vpeerDesc[j].vgId = htonl(pCreate->vpeerDesc[j].vgId);
pCreate->vpeerDesc[j].dnodeId = htonl(pCreate->vpeerDesc[j].dnodeId);
pCreate->vpeerDesc[j].ip = htonl(pCreate->vpeerDesc[j].ip);
}
return vnodeCreate(pCreate);
}
......
......@@ -98,7 +98,6 @@ typedef struct {
typedef struct {
int32_t dnodeId;
int32_t vnode;
uint32_t privateIp;
uint32_t publicIp;
} SVnodeGid;
......
......@@ -156,28 +156,9 @@ TAOS_DEFINE_ERROR(TSDB_CODE_QUERY_CANCELLED, 0, 110, "query cancelled
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_IE, 0, 111, "invalid ie")
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VALUE, 0, 112, "invalid value")
// TAOS_DEFINE_ERROR(TSDB_CODE_SYNC_REQUIRED, 0, 99, "sync required")
// TAOS_DEFINE_ERROR(TSDB_CODE_UNSYNCED, 0, 100, "unsyned")
// TAOS_DEFINE_ERROR(TSDB_CODE_DATA_ALREADY_IMPORTED, 0, 75, "data already imported")
// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_COMMIT_LOG, 0, 109, "invalid commit log") // commit log init failed
// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_VNODE_STATUS, 0, 116, "invalid vnode status")
// TAOS_DEFINE_ERROR(TSDB_CODE_TIMESTAMP_OUT_OF_RANGE, 0, 105, "timestamp out of range")
// TAOS_DEFINE_ERROR(TSDB_CODE_DUPLICATE_TAGS, 0, 112, "duplicate tags") // tags value for join not unique
// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_SUBMIT_MSG, 0, 113, "invalid submit message")
// TAOS_DEFINE_ERROR(TSDB_CODE_FAILED_TO_LOCK_RESOURCES, 0, 117, "failed to lock resources")
// TAOS_DEFINE_ERROR(TSDB_CODE_FILE_BLOCK_TS_DISORDERED, 0, 108, "file block ts disordered") // time stamp in file block is disordered
// TAOS_DEFINE_ERROR(TSDB_CODE_BATCH_SIZE_TOO_BIG, 0, 104, "batch size too big")
// TAOS_DEFINE_ERROR(TSDB_CODE_WRONG_SCHEMA, 0, 53, "wrong schema")
// TAOS_DEFINE_ERROR(TSDB_CODE_NO_QSUMMARY, 0, 68, "no qsummery")
// TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_METER_ID, 0, 27, "invalid meter id")
// TAOS_DEFINE_ERROR(TSDB_CODE_METRICMETA_EXPIRED, 0, 63, "metricmeta expired") // local cached metric-meta expired causes error in metric query
// TAOS_DEFINE_ERROR(TSDB_CODE_SESSION_ALREADY_EXIST, 0, 67, "session already exist")
// TAOS_DEFINE_ERROR(TSDB_CODE_SESSION_NOT_READY, 0, 103, "session not ready") // table NOT in ready state
// TAOS_DEFINE_ERROR(TSDB_CODE_DATA_OVERFLOW, 0, 82, "data overflow")
// TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_TRANS_NOT_FINISHED, 0, 17, "action transaction not finished")
// TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_NOT_ONLINE, 0, 18, "action not online")
// TAOS_DEFINE_ERROR(TSDB_CODE_ACTION_SEND_FAILD, 0, 19, "action send failed")
// TAOS_DEFINE_ERROR(TSDB_CODE_NOT_ACTIVE_SESSION, 0, 20, "not active session")
// others
TAOS_DEFINE_ERROR(TSDB_CODE_INVALID_FILE_FORMAT, 0, 120, "invalid file format")
#ifdef TAOS_ERROR_C
};
......
......@@ -241,7 +241,8 @@ typedef struct SSchema {
} SSchema;
typedef struct {
int32_t vnode; // the index of vnode
int32_t vgId;
int32_t dnodeId;
uint32_t ip;
} SVnodeDesc;
......
......@@ -1512,7 +1512,8 @@ static int32_t mgmtDoGetChildTableMeta(SQueuedMsg *pMsg, STableMetaMsg *pMeta) {
} else {
pMeta->vpeerDesc[i].ip = pVgroup->vnodeGid[i].privateIp;
}
pMeta->vpeerDesc[i].vnode = htonl(pVgroup->vnodeGid[i].vnode);
pMeta->vpeerDesc[i].vgId = htonl(pVgroup->vgId);
pMeta->vpeerDesc[i].dnodeId = htonl(pVgroup->vnodeGid[i].dnodeId);
}
pMeta->numOfVpeers = pVgroup->numOfVnodes;
......
......@@ -95,7 +95,6 @@ static int32_t mgmtVgroupActionInsert(SSdbOperDesc *pOper) {
SDnodeObj *pDnode = mgmtGetDnode(pVgroup->vnodeGid[i].dnodeId);
pVgroup->vnodeGid[i].privateIp = pDnode->privateIp;
pVgroup->vnodeGid[i].publicIp = pDnode->publicIp;
pVgroup->vnodeGid[i].vnode = pVgroup->vgId;
atomic_add_fetch_32(&pDnode->openVnodes, 1);
mgmtDecDnodeRef(pDnode);
}
......@@ -236,7 +235,7 @@ void mgmtCreateVgroup(SQueuedMsg *pMsg, SDbObj *pDb) {
mPrint("vgroup:%d, is created in mnode, db:%s replica:%d", pVgroup->vgId, pDb->name, pVgroup->numOfVnodes);
for (int32_t i = 0; i < pVgroup->numOfVnodes; ++i) {
mPrint("vgroup:%d, index:%d, dnode:%d vnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId, pVgroup->vnodeGid[i].vnode);
mPrint("vgroup:%d, index:%d, dnode:%d", pVgroup->vgId, i, pVgroup->vnodeGid[i].dnodeId);
}
pMsg->ahandle = pVgroup;
......@@ -312,27 +311,21 @@ int32_t mgmtGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *pConn) {
}
for (int32_t i = 0; i < maxReplica; ++i) {
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "ip");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 2;
pSchema[cols].type = TSDB_DATA_TYPE_SMALLINT;
strcpy(pSchema[cols].name, "vnode");
strcpy(pSchema[cols].name, "dnode");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 9;
pShow->bytes[cols] = 16;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "vnode status");
strcpy(pSchema[cols].name, "ip");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
pShow->bytes[cols] = 16;
pShow->bytes[cols] = 9;
pSchema[cols].type = TSDB_DATA_TYPE_BINARY;
strcpy(pSchema[cols].name, "public ip");
strcpy(pSchema[cols].name, "vstatus");
pSchema[cols].bytes = htons(pShow->bytes[cols]);
cols++;
}
......@@ -416,13 +409,13 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
cols++;
for (int32_t i = 0; i < maxReplica; ++i) {
tinet_ntoa(ipstr, pVgroup->vnodeGid[i].privateIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
*(int16_t *) pWrite = pVgroup->vnodeGid[i].dnodeId;
cols++;
tinet_ntoa(ipstr, pVgroup->vnodeGid[i].privateIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
*(int16_t *) pWrite = pVgroup->vnodeGid[i].vnode;
strcpy(pWrite, ipstr);
cols++;
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
......@@ -433,11 +426,6 @@ int32_t mgmtRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, void *pCo
strcpy(pWrite, "null");
}
cols++;
tinet_ntoa(ipstr, pVgroup->vnodeGid[i].publicIp);
pWrite = data + pShow->offset[cols] * rows + pShow->bytes[cols] * numOfRows;
strcpy(pWrite, ipstr);
cols++;
}
numOfRows++;
......@@ -490,15 +478,15 @@ SMDCreateVnodeMsg *mgmtBuildCreateVnodeMsg(SVgObj *pVgroup) {
pCfg->daysToKeep2 = htonl(pCfg->daysToKeep2);
pCfg->daysToKeep = htonl(pCfg->daysToKeep);
pCfg->commitTime = htonl(pCfg->commitTime);
pCfg->commitLog = pCfg->commitLog;
pCfg->blocksPerTable = htons(pCfg->blocksPerTable);
pCfg->replications = (char) pVgroup->numOfVnodes;
pCfg->rowsInFileBlock = htonl(pCfg->rowsInFileBlock);
pCfg->blocksPerTable = htons(pCfg->blocksPerTable);
pCfg->replications = (int8_t) pVgroup->numOfVnodes;
SVnodeDesc *vpeerDesc = pVnode->vpeerDesc;
for (int32_t j = 0; j < pVgroup->numOfVnodes; ++j) {
vpeerDesc[j].vnode = htonl(pVgroup->vnodeGid[j].vnode);
vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].privateIp);
vpeerDesc[j].vgId = htonl(pVgroup->vgId);
vpeerDesc[j].dnodeId = htonl(pVgroup->vnodeGid[j].dnodeId);
vpeerDesc[j].ip = htonl(pVgroup->vnodeGid[j].privateIp);
}
return pVnode;
......
......@@ -41,6 +41,8 @@ typedef struct {
void *sync;
void *events;
void *cq; // continuous query
int32_t replicas;
SVnodeDesc vpeers[TSDB_MAX_MPEERS];
} SVnodeObj;
int vnodeWriteToQueue(void *param, SWalHead *pHead, int type);
......
......@@ -29,16 +29,17 @@
#include "vnode.h"
#include "vnodeInt.h"
static void *tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
static void vnodeBuildVloadMsg(char *pNode, void * param);
static int vnodeWALCallback(void *arg);
static void *tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
static void vnodeBuildVloadMsg(char *pNode, void * param);
static int vnodeWALCallback(void *arg);
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
static int32_t vnodeReadCfg(SVnodeObj *pVnode);
static int tsOpennedVnodes;
static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
static void vnodeInit() {
vnodeInitWriteFp();
tsDnodeVnodesHash = taosInitIntHash(TSDB_MAX_VNODES, sizeof(SVnodeObj *), taosHashInt);
......@@ -51,12 +52,12 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
int32_t code;
pthread_once(&vnodeModuleInit, vnodeInit);
SVnodeObj *pTemp = (SVnodeObj *) taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId);
SVnodeObj *pTemp = (SVnodeObj *)taosGetIntHashData(tsDnodeVnodesHash, pVnodeCfg->cfg.vgId);
if (pTemp != NULL) {
dPrint("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pTemp);
return TSDB_CODE_SUCCESS;
}
}
STsdbCfg tsdbCfg = {0};
tsdbCfg.precision = pVnodeCfg->cfg.precision;
......@@ -81,12 +82,18 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
}
}
code = vnodeSaveCfg(pVnodeCfg);
if (code != TSDB_CODE_SUCCESS) {
dError("vgId:%d, failed to save vnode cfg, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(code));
return code;
}
char tsdbDir[TSDB_FILENAME_LEN] = {0};
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL);
if (code <0) {
if (code != TSDB_CODE_SUCCESS) {
dError("vgId:%d, failed to create tsdb in vnode, reason:%s", pVnodeCfg->cfg.vgId, tstrerror(terrno));
return code;
return terrno;
}
dPrint("vgId:%d, vnode is created, clog:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.commitLog);
......@@ -121,6 +128,13 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->version = 0;
taosAddIntHash(tsDnodeVnodesHash, pVnode->vgId, (char *)(&pVnode));
int32_t code = vnodeReadCfg(pVnode);
if (code != TSDB_CODE_SUCCESS) {
dError("pVnode:%p vgId:%d, failed to read cfg file", pVnode, pVnode->vgId);
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
return code;
}
pVnode->wqueue = dnodeAllocateWqueue(pVnode);
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
......@@ -258,7 +272,63 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
vnodeRelease(pVnode);
}
// TODO: this is a simple implement
static int vnodeWALCallback(void *arg) {
SVnodeObj *pVnode = arg;
return walRenew(pVnode->wal);
}
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
char cfgFile[TSDB_FILENAME_LEN * 2] = {0};
sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnodeCfg->cfg.vgId);
FILE *fp = fopen(cfgFile, "w");
if (!fp) return errno;
fprintf(fp, "replicas %d\n", pVnodeCfg->cfg.replications);
for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) {
fprintf(fp, "index%d dnode %d ip %u\n", i, pVnodeCfg->vpeerDesc[i].dnodeId, pVnodeCfg->vpeerDesc[i].ip);
}
fclose(fp);
dTrace("vgId:%d, save vnode cfg successed", pVnodeCfg, pVnodeCfg->cfg.vgId);
return TSDB_CODE_SUCCESS;
}
// TODO: this is a simple implement
static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
char cfgFile[TSDB_FILENAME_LEN * 2] = {0};
sprintf(cfgFile, "%s/vnode%d/config", tsVnodeDir, pVnode->vgId);
FILE *fp = fopen(cfgFile, "r");
if (!fp) return errno;
char option[3][32] = {0};
int32_t replicas = 0;
int32_t num = fscanf(fp, "%s %d", option[0], &replicas);
if (num != 2) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[0], "replicas") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (replicas == 0) return TSDB_CODE_INVALID_FILE_FORMAT;
pVnode->replicas = replicas;
for (int32_t i = 0; i < replicas; ++i) {
int32_t dnodeId = 0;
uint32_t dnodeIp = 0;
num = fscanf(fp, "%s %s %d %s %u", option[0], option[1], &dnodeId, option[2], &dnodeIp);
if (num != 5) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[1], "dnode") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (strcmp(option[2], "ip") != 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (dnodeId == 0) return TSDB_CODE_INVALID_FILE_FORMAT;
if (dnodeIp == 0) return TSDB_CODE_INVALID_FILE_FORMAT;
pVnode->vpeers[i].dnodeId = dnodeId;
pVnode->vpeers[i].ip = dnodeIp;
pVnode->vpeers[i].vgId = pVnode->vgId;
}
fclose(fp);
dTrace("pVnode:%p vgId:%d, read vnode cfg successed", pVnode, pVnode->vgId);
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册