提交 7002d963 编写于 作者: Y yifan hao

[vnode] Make vnode versioning more robust.

This patch changes version read/write path such that it does not
allocate heap memory while dealing with versioning. Instead, because
version data is short, it uses stack allcocated array to hold the versioning
data. This eliminate one potential failure in read/write version, which is
used to guarantee data consistency.

* Bonus fix
The patch also fixes a bunch of failure handling in the path of vnode
write.
上级 bdfc2b7f
......@@ -33,6 +33,8 @@
#include "tcq.h"
//#include "tsync.h"
#define TSDB_VNODE_VERSION_CONTENT_LEN 31
static int32_t tsOpennedVnodes;
static void *tsDnodeVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
......@@ -41,7 +43,7 @@ static int vnodeWalCallback(void *arg);
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
static int32_t vnodeReadCfg(SVnodeObj *pVnode);
static int32_t vnodeSaveVersion(SVnodeObj *pVnode);
static bool vnodeReadVersion(SVnodeObj *pVnode);
static int32_t vnodeReadVersion(SVnodeObj *pVnode);
static int vnodeWalCallback(void *arg);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size);
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
......@@ -108,7 +110,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock;
tsdbCfg.precision = pVnodeCfg->cfg.precision;
tsdbCfg.compression = pVnodeCfg->cfg.compression;;
char tsdbDir[TSDB_FILENAME_LEN] = {0};
sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId);
code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL);
......@@ -134,7 +136,7 @@ int32_t vnodeDrop(int32_t vgId) {
dTrace("pVnode:%p vgId:%d, vnode will be dropped", pVnode, pVnode->vgId);
pVnode->status = TAOS_VN_STATUS_DELETING;
vnodeCleanUp(pVnode);
return TSDB_CODE_SUCCESS;
}
......@@ -177,26 +179,46 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
int32_t vnodeOpen(int32_t vnode, char *rootDir) {
char temp[TSDB_FILENAME_LEN];
int32_t code;
pthread_once(&vnodeModuleInit, vnodeInit);
SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1);
if (pVnode == NULL) {
code = TSDB_CODE_NO_RESOURCE;
goto vnodeOpenError;
}
pVnode->vgId = vnode;
pVnode->status = TAOS_VN_STATUS_INIT;
pVnode->refCount = 1;
pVnode->version = 0;
pVnode->version = 0;
taosAddIntHash(tsDnodeVnodesHash, pVnode->vgId, (char *)(&pVnode));
int32_t code = vnodeReadCfg(pVnode);
code = vnodeReadCfg(pVnode);
if (code != TSDB_CODE_SUCCESS) {
dError("pVnode:%p vgId:%d, failed to read cfg file", pVnode, pVnode->vgId);
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
return code;
goto vnodeOpenError;
}
code = vnodeReadVersion(pVnode);
if (code != TSDB_CODE_SUCCESS) {
dError("pVnode:%p vgId:%d, failed to read version file", pVnode, pVnode->vgId);
goto vnodeOpenError;
}
vnodeReadVersion(pVnode);
pVnode->wqueue = dnodeAllocateWqueue(pVnode);
if (pVnode->wqueue == NULL) {
dError("pVnode:%p vgId:%d, failed to allocate Wqueue", pVnode, pVnode->vgId);
code = TSDB_CODE_NO_RESOURCE;
goto vnodeOpenError;
}
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
if (pVnode->wqueue == NULL) {
dError("pVnode:%p vgId:%d, failed to allocate Rqueue", pVnode, pVnode->vgId);
code = TSDB_CODE_NO_RESOURCE;
goto vnodeOpenError;
}
SCqCfg cqCfg = {0};
sprintf(cqCfg.user, "root");
......@@ -214,8 +236,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->tsdb = tsdbOpenRepo(temp, &appH);
if (pVnode->tsdb == NULL) {
dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno));
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
return terrno;
code = TSDB_CODE_VG_INIT_FAILED;
goto vnodeOpenError;
}
sprintf(temp, "%s/wal", rootDir);
......@@ -231,12 +253,12 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo.getWalInfo = vnodeGetWalInfo;
syncInfo.getFileInfo = vnodeGetFileInfo;
syncInfo.writeToCache = vnodeWriteToQueue;
syncInfo.confirmForward = dnodeSendRpcWriteRsp;
syncInfo.confirmForward = dnodeSendRpcWriteRsp;
syncInfo.notifyRole = vnodeNotifyRole;
pVnode->sync = syncStart(&syncInfo);
// start continuous query
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
cqStart(pVnode->cq);
pVnode->events = NULL;
......@@ -246,6 +268,18 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
atomic_add_fetch_32(&tsOpennedVnodes, 1);
return TSDB_CODE_SUCCESS;
vnodeOpenError:
if (pVnode != NULL && pVnode->wqueue != NULL) {
dnodeFreeWqueue(pVnode->wqueue);
}
if (pVnode != NULL && pVnode->rqueue != NULL) {
dnodeFreeRqueue(pVnode->rqueue);
}
if (pVnode != NULL) {
taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId);
}
return code;
}
int32_t vnodeClose(int32_t vgId) {
......@@ -320,7 +354,7 @@ void *vnodeAccquireVnode(int32_t vgId) {
}
void *vnodeGetRqueue(void *pVnode) {
return ((SVnodeObj *)pVnode)->rqueue;
return ((SVnodeObj *)pVnode)->rqueue;
}
void *vnodeGetWqueue(int32_t vgId) {
......@@ -330,7 +364,7 @@ void *vnodeGetWqueue(int32_t vgId) {
}
void *vnodeGetWal(void *pVnode) {
return ((SVnodeObj *)pVnode)->wal;
return ((SVnodeObj *)pVnode)->wal;
}
void vnodeBuildStatusMsg(void *param) {
......@@ -398,9 +432,9 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
SVnodeObj *pVnode = ahandle;
pVnode->role = role;
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
if (pVnode->role == TAOS_SYNC_ROLE_MASTER)
cqStart(pVnode->cq);
else
else
cqStop(pVnode->cq);
}
......@@ -411,12 +445,16 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
if (!fp) {
dError("vgId:%d, failed to open vnode cfg file for write, file:%s error:%s", pVnodeCfg->cfg.vgId, cfgFile,
strerror(errno));
return errno;
return TSDB_CODE_OTHERS;
}
int32_t len = 0;
int32_t maxLen = 1000;
char * content = calloc(1, maxLen + 1);
if (content == NULL) {
fclose(fp);
return TSDB_CODE_NO_RESOURCE;
}
len += snprintf(content + len, maxLen - len, "{\n");
......@@ -430,14 +468,14 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2);
len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock);
len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock);
len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime);
len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime);
len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision);
len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression);
len += snprintf(content + len, maxLen - len, " \"commitLog\": %d,\n", pVnodeCfg->cfg.commitLog);
len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications);
len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals);
len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum);
len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n");
for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) {
len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId);
......@@ -457,7 +495,7 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
dPrint("vgId:%d, save vnode cfg successed", pVnodeCfg->cfg.vgId);
return 0;
return TSDB_CODE_SUCCESS;
}
static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
......@@ -467,12 +505,17 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
if (!fp) {
dError("pVnode:%p vgId:%d, failed to open vnode cfg file for read, file:%s, error:%s", pVnode, pVnode->vgId,
cfgFile, strerror(errno));
return errno;
return TSDB_CODE_OTHERS;
}
int ret = TSDB_CODE_OTHERS;
int maxLen = 1000;
char *content = calloc(1, maxLen + 1);
if (content == NULL) {
fclose(fp);
return TSDB_CODE_NO_RESOURCE;
}
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
free(content);
......@@ -640,7 +683,7 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
pVnode->syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC;
}
ret = 0;
ret = TSDB_CODE_SUCCESS;
dPrint("pVnode:%p vgId:%d, read vnode cfg successed, replcia:%d", pVnode, pVnode->vgId, pVnode->syncCfg.replica);
for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) {
......@@ -662,12 +705,12 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
if (!fp) {
dError("pVnode:%p vgId:%d, failed to open vnode version file for write, file:%s error:%s", pVnode, pVnode->vgId,
versionFile, strerror(errno));
return errno;
return TSDB_CODE_OTHERS;
}
int32_t len = 0;
int32_t maxLen = 30;
char * content = calloc(1, maxLen + 1);
char content[TSDB_VNODE_VERSION_CONTENT_LEN] = {0};
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"version\": %" PRId64 "\n", pVnode->version);
......@@ -675,32 +718,31 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
fwrite(content, 1, len, fp);
fclose(fp);
free(content);
dPrint("pVnode:%p vgId:%d, save vnode version:%" PRId64 " successed", pVnode, pVnode->vgId, pVnode->version);
return 0;
return TSDB_CODE_SUCCESS;
}
static bool vnodeReadVersion(SVnodeObj *pVnode) {
static int32_t vnodeReadVersion(SVnodeObj *pVnode) {
char versionFile[TSDB_FILENAME_LEN + 30] = {0};
sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
FILE *fp = fopen(versionFile, "r");
if (!fp) {
dTrace("pVnode:%p vgId:%d, failed to open version file:%s error:%s", pVnode, pVnode->vgId,
versionFile, strerror(errno));
return false;
return TSDB_CODE_OTHERS;
}
bool ret = false;
int maxLen = 100;
char *content = calloc(1, maxLen + 1);
char content[TSDB_VNODE_VERSION_CONTENT_LEN] = {0};
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
free(content);
fclose(fp);
dPrint("pVnode:%p vgId:%d, failed to read vnode version, content is null", pVnode, pVnode->vgId);
return false;
return TSDB_CODE_OTHERS;
}
cJSON *root = cJSON_Parse(content);
......@@ -716,12 +758,11 @@ static bool vnodeReadVersion(SVnodeObj *pVnode) {
}
pVnode->version = version->valueint;
ret = true;
ret = TSDB_CODE_SUCCESS;
dPrint("pVnode:%p vgId:%d, read vnode version successed, version:%%" PRId64, pVnode, pVnode->vgId, pVnode->version);
PARSE_OVER:
free(content);
cJSON_Delete(root);
fclose(fp);
return ret;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册