From 2a4fecebe647936b07d2eb778e5231148a1a5885 Mon Sep 17 00:00:00 2001 From: Jeff Tao Date: Sat, 23 May 2020 13:39:44 +0000 Subject: [PATCH] tune the code to make the vnode cleanup process more clear --- src/cq/src/cqMain.c | 9 ++- src/inc/tsync.h | 4 +- src/util/src/tqueue.c | 2 + src/vnode/src/vnodeMain.c | 151 ++++++++++++++++++++++++-------------- src/wal/src/walMain.c | 12 ++- 5 files changed, 115 insertions(+), 63 deletions(-) diff --git a/src/cq/src/cqMain.c b/src/cq/src/cqMain.c index 7935bb7ff5..5cc3ce0159 100644 --- a/src/cq/src/cqMain.c +++ b/src/cq/src/cqMain.c @@ -18,6 +18,7 @@ #include #include #include +#include #include "taosdef.h" #include "taosmsg.h" #include "tglobal.h" @@ -64,7 +65,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj); void *cqOpen(void *ahandle, const SCqCfg *pCfg) { SCqContext *pContext = calloc(sizeof(SCqContext), 1); - if (pContext == NULL) return NULL; + if (pContext == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } strcpy(pContext->user, pCfg->user); strcpy(pContext->pass, pCfg->pass); @@ -82,6 +86,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) { void cqClose(void *handle) { SCqContext *pContext = handle; + if (handle == NULL) return; // stop all CQs cqStop(pContext); @@ -106,9 +111,9 @@ void cqClose(void *handle) { void cqStart(void *handle) { SCqContext *pContext = handle; - cTrace("vgId:%d, start all CQs", pContext->vgId); if (pContext->dbConn || pContext->master) return; + cTrace("vgId:%d, start all CQs", pContext->vgId); pthread_mutex_lock(&pContext->mutex); pContext->master = 1; diff --git a/src/inc/tsync.h b/src/inc/tsync.h index 05d1d93cf6..fcf26d22c3 100644 --- a/src/inc/tsync.h +++ b/src/inc/tsync.h @@ -94,8 +94,8 @@ typedef void* tsync_h; tsync_h syncStart(const SSyncInfo *); void syncStop(tsync_h shandle); -int syncReconfig(tsync_h shandle, const SSyncCfg *); -int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype); +int32_t syncReconfig(tsync_h shandle, const SSyncCfg *); +int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype); void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code); void syncRecover(tsync_h shandle); // recover from other nodes: int syncGetNodesRole(tsync_h shandle, SNodesRole *); diff --git a/src/util/src/tqueue.c b/src/util/src/tqueue.c index 1e248c9e45..475941dbdb 100644 --- a/src/util/src/tqueue.c +++ b/src/util/src/tqueue.c @@ -65,6 +65,7 @@ taos_queue taosOpenQueue() { } void taosCloseQueue(taos_queue param) { + if (param == NULL) return; STaosQueue *queue = (STaosQueue *)param; STaosQnode *pTemp; STaosQnode *pNode = queue->head; @@ -224,6 +225,7 @@ taos_qset taosOpenQset() { } void taosCloseQset(taos_qset param) { + if (param == NULL) return; STaosQset *qset = (STaosQset *)param; pthread_mutex_destroy(&qset->mutex); tsem_destroy(&qset->sem); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index 6f0b19b0c6..926eee6037 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -35,7 +35,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode); static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg); static int32_t vnodeReadCfg(SVnodeObj *pVnode); static int32_t vnodeSaveVersion(SVnodeObj *pVnode); -static bool vnodeReadVersion(SVnodeObj *pVnode); +static int32_t vnodeReadVersion(SVnodeObj *pVnode); static int vnodeProcessTsdbStatus(void *arg, int status); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); @@ -46,9 +46,9 @@ static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT; #ifndef _SYNC tsync_h syncStart(const SSyncInfo *info) { return NULL; } -int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; } +int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; } void syncStop(tsync_h shandle) {} -int syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; } +int32_t syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; } int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; } void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {} #endif @@ -185,26 +185,40 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { pthread_once(&vnodeModuleInit, vnodeInit); SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1); + if (pVnode == NULL) { + vError("vgId:%d, failed to open vnode since no enough memory", vnode); + return TAOS_SYSTEM_ERROR(errno); + } + + atomic_add_fetch_32(&tsOpennedVnodes, 1); + atomic_add_fetch_32(&pVnode->refCount, 1); + pVnode->vgId = vnode; pVnode->status = TAOS_VN_STATUS_INIT; - pVnode->refCount = 1; pVnode->version = 0; pVnode->tsdbCfg.tsdbId = pVnode->vgId; pVnode->rootDir = strdup(rootDir); - taosHashPut(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *)); int32_t code = vnodeReadCfg(pVnode); if (code != TSDB_CODE_SUCCESS) { - vError("vgId:%d, failed to read cfg file", pVnode->vgId); - taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); + vnodeCleanUp(pVnode); + return code; + } + + code = vnodeReadVersion(pVnode); + if (code != TSDB_CODE_SUCCESS) { + vnodeCleanUp(pVnode); return code; } - vnodeReadVersion(pVnode); pVnode->fversion = pVnode->version; pVnode->wqueue = dnodeAllocateWqueue(pVnode); pVnode->rqueue = dnodeAllocateRqueue(pVnode); + if (pVnode->wqueue == NULL || pVnode->rqueue == NULL) { + vnodeCleanUp(pVnode); + return terrno; + } SCqCfg cqCfg = {0}; sprintf(cqCfg.user, "root"); @@ -212,22 +226,29 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { cqCfg.vgId = vnode; cqCfg.cqWrite = vnodeWriteToQueue; pVnode->cq = cqOpen(pVnode, &cqCfg); + if (pVnode->cq == NULL) { + vnodeCleanUp(pVnode); + return terrno; + } STsdbAppH appH = {0}; appH.appH = (void *)pVnode; appH.notifyStatus = vnodeProcessTsdbStatus; appH.cqH = pVnode->cq; - sprintf(temp, "%s/tsdb", rootDir); pVnode->tsdb = tsdbOpenRepo(temp, &appH); if (pVnode->tsdb == NULL) { - vError("vgId:%d, failed to open tsdb at %s(%s)", pVnode->vgId, temp, tstrerror(terrno)); - taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); + vnodeCleanUp(pVnode); return terrno; } sprintf(temp, "%s/wal", rootDir); pVnode->wal = walOpen(temp, &pVnode->walCfg); + if (pVnode->wal == NULL) { + vnodeCleanUp(pVnode); + return terrno; + } + walRestore(pVnode->wal, pVnode, vnodeWriteToQueue); SSyncInfo syncInfo; @@ -243,6 +264,10 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { syncInfo.notifyRole = vnodeNotifyRole; syncInfo.notifyFileSynced = vnodeNotifyFileSynced; pVnode->sync = syncStart(&syncInfo); + if (pVnode->sync == NULL) { + vnodeCleanUp(pVnode); + return terrno; + } #ifndef _SYNC pVnode->role = TAOS_SYNC_ROLE_MASTER; @@ -253,11 +278,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { cqStart(pVnode->cq); pVnode->events = NULL; - pVnode->status = TAOS_VN_STATUS_READY; vTrace("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); - atomic_add_fetch_32(&tsOpennedVnodes, 1); + taosHashPut(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *)); + return TSDB_CODE_SUCCESS; } @@ -286,13 +311,6 @@ void vnodeRelease(void *pVnodeRaw) { } tfree(pVnode->rootDir); - // remove read queue - dnodeFreeRqueue(pVnode->rqueue); - pVnode->rqueue = NULL; - - // remove write queue - dnodeFreeWqueue(pVnode->wqueue); - pVnode->wqueue = NULL; if (pVnode->status == TAOS_VN_STATUS_DELETING) { char rootDir[TSDB_FILENAME_LEN] = {0}; @@ -387,15 +405,26 @@ static void vnodeCleanUp(SVnodeObj *pVnode) { pVnode->sync = NULL; } - cqClose(pVnode->cq); - pVnode->cq = NULL; + if (pVnode->wal) + walClose(pVnode->wal); + pVnode->wal = NULL; - tsdbCloseRepo(pVnode->tsdb, 1); + if (pVnode->tsdb) + tsdbCloseRepo(pVnode->tsdb, 1); pVnode->tsdb = NULL; - walClose(pVnode->wal); - pVnode->wal = NULL; + if (pVnode->cq) + cqClose(pVnode->cq); + pVnode->cq = NULL; + + if (pVnode->wqueue) + dnodeFreeWqueue(pVnode->wqueue); + pVnode->wqueue = NULL; + if (pVnode->rqueue) + dnodeFreeRqueue(pVnode->rqueue); + pVnode->rqueue = NULL; + vnodeRelease(pVnode); } @@ -512,27 +541,31 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { } static int32_t vnodeReadCfg(SVnodeObj *pVnode) { - char cfgFile[TSDB_FILENAME_LEN + 30] = {0}; + cJSON *root = NULL; + char *content = NULL; + char cfgFile[TSDB_FILENAME_LEN + 30] = {0}; + int maxLen = 1000; + int32_t code = TSDB_CODE_OTHERS; + + terrno = TSDB_CODE_OTHERS; sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnode->vgId); FILE *fp = fopen(cfgFile, "r"); if (!fp) { - vError("vgId:%d, failed to open vnode cfg file for read, file:%s, error:%s", pVnode->vgId, + vError("vgId:%d, failed to open vnode cfg file:%s to read, error:%s", pVnode->vgId, cfgFile, strerror(errno)); - return errno; + code = TAOS_SYSTEM_ERROR(errno); + goto PARSE_OVER; } - int ret = TSDB_CODE_OTHERS; - int maxLen = 1000; - char *content = calloc(1, maxLen + 1); + content = calloc(1, maxLen + 1); + if (content == NULL) goto PARSE_OVER; int len = fread(content, 1, maxLen, fp); if (len <= 0) { - free(content); - fclose(fp); vError("vgId:%d, failed to read vnode cfg, content is null", pVnode->vgId); - return false; + return errno; } - cJSON *root = cJSON_Parse(content); + root = cJSON_Parse(content); if (root == NULL) { vError("vgId:%d, failed to read vnode cfg, invalid json format", pVnode->vgId); goto PARSE_OVER; @@ -691,19 +724,19 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) { pVnode->syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC; } - ret = 0; + code = TSDB_CODE_SUCCESS; - vPrint("vgId:%d, read vnode cfg successed, replcia:%d", pVnode->vgId, pVnode->syncCfg.replica); + vPrint("vgId:%d, read vnode cfg successfully, replcia:%d", pVnode->vgId, pVnode->syncCfg.replica); for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) { vPrint("vgId:%d, dnode:%d, %s:%d", pVnode->vgId, pVnode->syncCfg.nodeInfo[i].nodeId, pVnode->syncCfg.nodeInfo[i].nodeFqdn, pVnode->syncCfg.nodeInfo[i].nodePort); } PARSE_OVER: - free(content); + tfree(content); cJSON_Delete(root); - fclose(fp); - return ret; + if (fp) fclose(fp); + return code; } static int32_t vnodeSaveVersion(SVnodeObj *pVnode) { @@ -713,7 +746,7 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) { if (!fp) { vError("vgId:%d, failed to open vnode version file for write, file:%s error:%s", pVnode->vgId, versionFile, strerror(errno)); - return errno; + return TAOS_SYSTEM_ERROR(errno); } int32_t len = 0; @@ -733,29 +766,33 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) { return 0; } -static bool vnodeReadVersion(SVnodeObj *pVnode) { - char versionFile[TSDB_FILENAME_LEN + 30] = {0}; +static int32_t vnodeReadVersion(SVnodeObj *pVnode) { + char versionFile[TSDB_FILENAME_LEN + 30] = {0}; + char *content = NULL; + cJSON *root = NULL; + int maxLen = 100; + int32_t code = TSDB_CODE_OTHERS; + sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId); FILE *fp = fopen(versionFile, "r"); if (!fp) { if (errno != ENOENT) { vError("vgId:%d, failed to open version file:%s error:%s", pVnode->vgId, versionFile, strerror(errno)); + code = TAOS_SYSTEM_ERROR(errno); + } else { + code = TSDB_CODE_SUCCESS; } - return false; + goto PARSE_OVER; } - bool ret = false; - int maxLen = 100; - char *content = calloc(1, maxLen + 1); + content = calloc(1, maxLen + 1); int len = fread(content, 1, maxLen, fp); if (len <= 0) { - free(content); - fclose(fp); - vPrint("vgId:%d, failed to read vnode version, content is null", pVnode->vgId); - return false; + vError("vgId:%d, failed to read vnode version, content is null", pVnode->vgId); + goto PARSE_OVER; } - cJSON *root = cJSON_Parse(content); + root = cJSON_Parse(content); if (root == NULL) { vError("vgId:%d, failed to read vnode version, invalid json format", pVnode->vgId); goto PARSE_OVER; @@ -768,13 +805,13 @@ static bool vnodeReadVersion(SVnodeObj *pVnode) { } pVnode->version = version->valueint; - ret = true; + code = TSDB_CODE_SUCCESS; - vPrint("vgId:%d, read vnode version succeed, version:%" PRId64, pVnode->vgId, pVnode->version); + vPrint("vgId:%d, read vnode version successfully, version:%" PRId64, pVnode->vgId, pVnode->version); PARSE_OVER: - free(content); + tfree(content); cJSON_Delete(root); - fclose(fp); - return ret; + if(fp) fclose(fp); + return code; } diff --git a/src/wal/src/walMain.c b/src/wal/src/walMain.c index 8d92fac926..ebfc9d98bb 100644 --- a/src/wal/src/walMain.c +++ b/src/wal/src/walMain.c @@ -25,6 +25,7 @@ #include "tlog.h" #include "tchecksum.h" #include "tutil.h" +#include "taoserror.h" #include "twal.h" #include "tqueue.h" @@ -56,7 +57,10 @@ static int walRemoveWalFiles(const char *path); void *walOpen(const char *path, const SWalCfg *pCfg) { SWal *pWal = calloc(sizeof(SWal), 1); - if (pWal == NULL) return NULL; + if (pWal == NULL) { + terrno = TAOS_SYSTEM_ERROR(errno); + return NULL; + } pWal->fd = -1; pWal->max = pCfg->wals; @@ -75,6 +79,7 @@ void *walOpen(const char *path, const SWalCfg *pCfg) { walRenew(pWal); if (pWal->fd <0) { + terrno = TAOS_SYSTEM_ERROR(errno); wError("wal:%s, failed to open", path); pthread_mutex_destroy(&pWal->mutex); free(pWal); @@ -112,9 +117,10 @@ void walClose(void *handle) { } int walRenew(void *handle) { + if (handle == NULL) return 0; SWal *pWal = handle; int code = 0; - + pthread_mutex_lock(&pWal->mutex); if (pWal->fd >=0) { @@ -156,6 +162,7 @@ int walRenew(void *handle) { int walWrite(void *handle, SWalHead *pHead) { SWal *pWal = handle; int code = 0; + if (pWal == NULL) return -1; // no wal if (pWal->level == TAOS_WAL_NOLOG) return 0; @@ -178,6 +185,7 @@ int walWrite(void *handle, SWalHead *pHead) { void walFsync(void *handle) { SWal *pWal = handle; + if (pWal == NULL) return; if (pWal->level == TAOS_WAL_FSYNC && pWal->fd >=0) { if (fsync(pWal->fd) < 0) { -- GitLab