From 7002d9635e1facc247c773fb0aef901baadf3671 Mon Sep 17 00:00:00 2001 From: yifan hao Date: Tue, 5 May 2020 21:21:27 -0600 Subject: [PATCH] [vnode] Make vnode versioning more robust. This patch changes version read/write path such that it does not allocate heap memory while dealing with versioning. Instead, because version data is short, it uses stack allcocated array to hold the versioning data. This eliminate one potential failure in read/write version, which is used to guarantee data consistency. * Bonus fix The patch also fixes a bunch of failure handling in the path of vnode write. --- src/vnode/src/vnodeMain.c | 111 ++++++++++++++++++++++++++------------ 1 file changed, 76 insertions(+), 35 deletions(-) diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index f9904f7fa8..9cb47d367b 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -33,6 +33,8 @@ #include "tcq.h" //#include "tsync.h" +#define TSDB_VNODE_VERSION_CONTENT_LEN 31 + static int32_t tsOpennedVnodes; static void *tsDnodeVnodesHash; static void vnodeCleanUp(SVnodeObj *pVnode); @@ -41,7 +43,7 @@ static int vnodeWalCallback(void *arg); static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg); static int32_t vnodeReadCfg(SVnodeObj *pVnode); static int32_t vnodeSaveVersion(SVnodeObj *pVnode); -static bool vnodeReadVersion(SVnodeObj *pVnode); +static int32_t vnodeReadVersion(SVnodeObj *pVnode); static int vnodeWalCallback(void *arg); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); @@ -108,7 +110,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock; tsdbCfg.precision = pVnodeCfg->cfg.precision; tsdbCfg.compression = pVnodeCfg->cfg.compression;; - + char tsdbDir[TSDB_FILENAME_LEN] = {0}; sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL); @@ -134,7 +136,7 @@ int32_t vnodeDrop(int32_t vgId) { dTrace("pVnode:%p vgId:%d, vnode will be dropped", pVnode, pVnode->vgId); pVnode->status = TAOS_VN_STATUS_DELETING; vnodeCleanUp(pVnode); - + return TSDB_CODE_SUCCESS; } @@ -177,26 +179,46 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { int32_t vnodeOpen(int32_t vnode, char *rootDir) { char temp[TSDB_FILENAME_LEN]; + int32_t code; pthread_once(&vnodeModuleInit, vnodeInit); SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1); + if (pVnode == NULL) { + code = TSDB_CODE_NO_RESOURCE; + goto vnodeOpenError; + } + pVnode->vgId = vnode; pVnode->status = TAOS_VN_STATUS_INIT; pVnode->refCount = 1; - pVnode->version = 0; + pVnode->version = 0; taosAddIntHash(tsDnodeVnodesHash, pVnode->vgId, (char *)(&pVnode)); - - int32_t code = vnodeReadCfg(pVnode); + + code = vnodeReadCfg(pVnode); if (code != TSDB_CODE_SUCCESS) { dError("pVnode:%p vgId:%d, failed to read cfg file", pVnode, pVnode->vgId); - taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); - return code; + goto vnodeOpenError; + } + + code = vnodeReadVersion(pVnode); + if (code != TSDB_CODE_SUCCESS) { + dError("pVnode:%p vgId:%d, failed to read version file", pVnode, pVnode->vgId); + goto vnodeOpenError; } - vnodeReadVersion(pVnode); - pVnode->wqueue = dnodeAllocateWqueue(pVnode); + if (pVnode->wqueue == NULL) { + dError("pVnode:%p vgId:%d, failed to allocate Wqueue", pVnode, pVnode->vgId); + code = TSDB_CODE_NO_RESOURCE; + goto vnodeOpenError; + } + pVnode->rqueue = dnodeAllocateRqueue(pVnode); + if (pVnode->wqueue == NULL) { + dError("pVnode:%p vgId:%d, failed to allocate Rqueue", pVnode, pVnode->vgId); + code = TSDB_CODE_NO_RESOURCE; + goto vnodeOpenError; + } SCqCfg cqCfg = {0}; sprintf(cqCfg.user, "root"); @@ -214,8 +236,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pVnode->tsdb = tsdbOpenRepo(temp, &appH); if (pVnode->tsdb == NULL) { dError("pVnode:%p vgId:%d, failed to open tsdb at %s(%s)", pVnode, pVnode->vgId, temp, tstrerror(terrno)); - taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); - return terrno; + code = TSDB_CODE_VG_INIT_FAILED; + goto vnodeOpenError; } sprintf(temp, "%s/wal", rootDir); @@ -231,12 +253,12 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.getWalInfo = vnodeGetWalInfo; syncInfo.getFileInfo = vnodeGetFileInfo; syncInfo.writeToCache = vnodeWriteToQueue; - syncInfo.confirmForward = dnodeSendRpcWriteRsp; + syncInfo.confirmForward = dnodeSendRpcWriteRsp; syncInfo.notifyRole = vnodeNotifyRole; pVnode->sync = syncStart(&syncInfo); // start continuous query - if (pVnode->role == TAOS_SYNC_ROLE_MASTER) + if (pVnode->role == TAOS_SYNC_ROLE_MASTER) cqStart(pVnode->cq); pVnode->events = NULL; @@ -246,6 +268,18 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { atomic_add_fetch_32(&tsOpennedVnodes, 1); return TSDB_CODE_SUCCESS; + +vnodeOpenError: + if (pVnode != NULL && pVnode->wqueue != NULL) { + dnodeFreeWqueue(pVnode->wqueue); + } + if (pVnode != NULL && pVnode->rqueue != NULL) { + dnodeFreeRqueue(pVnode->rqueue); + } + if (pVnode != NULL) { + taosDeleteIntHash(tsDnodeVnodesHash, pVnode->vgId); + } + return code; } int32_t vnodeClose(int32_t vgId) { @@ -320,7 +354,7 @@ void *vnodeAccquireVnode(int32_t vgId) { } void *vnodeGetRqueue(void *pVnode) { - return ((SVnodeObj *)pVnode)->rqueue; + return ((SVnodeObj *)pVnode)->rqueue; } void *vnodeGetWqueue(int32_t vgId) { @@ -330,7 +364,7 @@ void *vnodeGetWqueue(int32_t vgId) { } void *vnodeGetWal(void *pVnode) { - return ((SVnodeObj *)pVnode)->wal; + return ((SVnodeObj *)pVnode)->wal; } void vnodeBuildStatusMsg(void *param) { @@ -398,9 +432,9 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { SVnodeObj *pVnode = ahandle; pVnode->role = role; - if (pVnode->role == TAOS_SYNC_ROLE_MASTER) + if (pVnode->role == TAOS_SYNC_ROLE_MASTER) cqStart(pVnode->cq); - else + else cqStop(pVnode->cq); } @@ -411,12 +445,16 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { if (!fp) { dError("vgId:%d, failed to open vnode cfg file for write, file:%s error:%s", pVnodeCfg->cfg.vgId, cfgFile, strerror(errno)); - return errno; + return TSDB_CODE_OTHERS; } int32_t len = 0; int32_t maxLen = 1000; char * content = calloc(1, maxLen + 1); + if (content == NULL) { + fclose(fp); + return TSDB_CODE_NO_RESOURCE; + } len += snprintf(content + len, maxLen - len, "{\n"); @@ -430,14 +468,14 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2); len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock); len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock); - len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime); + len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime); len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision); len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression); len += snprintf(content + len, maxLen - len, " \"commitLog\": %d,\n", pVnodeCfg->cfg.commitLog); len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications); len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals); len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum); - + len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) { len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId); @@ -457,7 +495,7 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { dPrint("vgId:%d, save vnode cfg successed", pVnodeCfg->cfg.vgId); - return 0; + return TSDB_CODE_SUCCESS; } static int32_t vnodeReadCfg(SVnodeObj *pVnode) { @@ -467,12 +505,17 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { if (!fp) { dError("pVnode:%p vgId:%d, failed to open vnode cfg file for read, file:%s, error:%s", pVnode, pVnode->vgId, cfgFile, strerror(errno)); - return errno; + return TSDB_CODE_OTHERS; } int ret = TSDB_CODE_OTHERS; int maxLen = 1000; char *content = calloc(1, maxLen + 1); + if (content == NULL) { + fclose(fp); + return TSDB_CODE_NO_RESOURCE; + } + int len = fread(content, 1, maxLen, fp); if (len <= 0) { free(content); @@ -640,7 +683,7 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { pVnode->syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC; } - ret = 0; + ret = TSDB_CODE_SUCCESS; dPrint("pVnode:%p vgId:%d, read vnode cfg successed, replcia:%d", pVnode, pVnode->vgId, pVnode->syncCfg.replica); for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) { @@ -662,12 +705,12 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) { if (!fp) { dError("pVnode:%p vgId:%d, failed to open vnode version file for write, file:%s error:%s", pVnode, pVnode->vgId, versionFile, strerror(errno)); - return errno; + return TSDB_CODE_OTHERS; } int32_t len = 0; int32_t maxLen = 30; - char * content = calloc(1, maxLen + 1); + char content[TSDB_VNODE_VERSION_CONTENT_LEN] = {0}; len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, " \"version\": %" PRId64 "\n", pVnode->version); @@ -675,32 +718,31 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) { fwrite(content, 1, len, fp); fclose(fp); - free(content); dPrint("pVnode:%p vgId:%d, save vnode version:%" PRId64 " successed", pVnode, pVnode->vgId, pVnode->version); - return 0; + return TSDB_CODE_SUCCESS; } -static bool vnodeReadVersion(SVnodeObj *pVnode) { +static int32_t vnodeReadVersion(SVnodeObj *pVnode) { char versionFile[TSDB_FILENAME_LEN + 30] = {0}; sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId); FILE *fp = fopen(versionFile, "r"); if (!fp) { dTrace("pVnode:%p vgId:%d, failed to open version file:%s error:%s", pVnode, pVnode->vgId, versionFile, strerror(errno)); - return false; + return TSDB_CODE_OTHERS; } bool ret = false; int maxLen = 100; - char *content = calloc(1, maxLen + 1); + char content[TSDB_VNODE_VERSION_CONTENT_LEN] = {0}; + int len = fread(content, 1, maxLen, fp); if (len <= 0) { - free(content); fclose(fp); dPrint("pVnode:%p vgId:%d, failed to read vnode version, content is null", pVnode, pVnode->vgId); - return false; + return TSDB_CODE_OTHERS; } cJSON *root = cJSON_Parse(content); @@ -716,12 +758,11 @@ static bool vnodeReadVersion(SVnodeObj *pVnode) { } pVnode->version = version->valueint; - ret = true; + ret = TSDB_CODE_SUCCESS; dPrint("pVnode:%p vgId:%d, read vnode version successed, version:%%" PRId64, pVnode, pVnode->vgId, pVnode->version); PARSE_OVER: - free(content); cJSON_Delete(root); fclose(fp); return ret; -- GitLab