diff --git a/src/dnode/src/dnodeModule.c b/src/dnode/src/dnodeModule.c index dc1515282d86abec8d1e177d963b4ba6b473464b..c2e8c1d102ddc79958baa3285f3f5c57988fe1f1 100644 --- a/src/dnode/src/dnodeModule.c +++ b/src/dnode/src/dnodeModule.c @@ -84,7 +84,7 @@ static void dnodeAllocModules() { } void dnodeCleanupModules() { - for (int32_t module = 1; module < TSDB_MOD_MAX; ++module) { + for (EModuleType module = 1; module < TSDB_MOD_MAX; ++module) { if (tsModule[module].enable && tsModule[module].stopFp) { (*tsModule[module].stopFp)(); } diff --git a/src/dnode/src/dnodeVWrite.c b/src/dnode/src/dnodeVWrite.c index 20c50c5f8c3528d8be2773486d859d3d007841b6..4b26c05649d0e97fffd7602f0cf4bddeb19bc10b 100644 --- a/src/dnode/src/dnodeVWrite.c +++ b/src/dnode/src/dnodeVWrite.c @@ -32,9 +32,9 @@ typedef struct { taos_qall qall; taos_qset qset; // queue set - pthread_t thread; // thread + pthread_t thread; // thread int32_t workerId; // worker ID -} SWriteWorker; +} SWriteWorker; typedef struct { SRspRet rspRet; @@ -136,17 +136,24 @@ void *dnodeAllocateVnodeWqueue(void *pVnode) { taosAddIntoQset(pWorker->qset, queue, pVnode); pWorker->qall = taosAllocateQall(); - wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; - + if (pWorker->qall == NULL) { + taosCloseQset(pWorker->qset); + taosCloseQueue(queue); + return NULL; + } pthread_attr_t thAttr; pthread_attr_init(&thAttr); pthread_attr_setdetachstate(&thAttr, PTHREAD_CREATE_JOINABLE); if (pthread_create(&pWorker->thread, &thAttr, dnodeProcessWriteQueue, pWorker) != 0) { dError("failed to create thread to process read queue, reason:%s", strerror(errno)); + taosFreeQall(pWorker->qall); taosCloseQset(pWorker->qset); + taosCloseQueue(queue); + queue = NULL; } else { dTrace("write worker:%d is launched", pWorker->workerId); + wWorkerPool.nextId = (wWorkerPool.nextId + 1) % wWorkerPool.max; } pthread_attr_destroy(&thAttr); @@ -195,7 +202,7 @@ static void *dnodeProcessWriteQueue(void *param) { while (1) { numOfMsgs = taosReadAllQitemsFromQset(pWorker->qset, pWorker->qall, &pVnode); - if (numOfMsgs ==0) { + if (numOfMsgs == 0) { dTrace("dnodeProcessWriteQueee: got no message from qset, exiting..."); break; } @@ -243,7 +250,7 @@ static void dnodeHandleIdleWorker(SWriteWorker *pWorker) { if (num > 0) { usleep(30000); - sched_yield(); + sched_yield(); } else { taosFreeQall(pWorker->qall); taosCloseQset(pWorker->qset); diff --git a/src/vnode/src/vnodeMain.c b/src/vnode/src/vnodeMain.c index c2a0e4189f72403c25636f5417505acde35c93b5..6e949db7e5ab90fc9a4b22f0e539c6f023e6021a 100644 --- a/src/vnode/src/vnodeMain.c +++ b/src/vnode/src/vnodeMain.c @@ -108,7 +108,7 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) { tsdbCfg.maxRowsPerFileBlock = pVnodeCfg->cfg.maxRowsPerFileBlock; tsdbCfg.precision = pVnodeCfg->cfg.precision; tsdbCfg.compression = pVnodeCfg->cfg.compression;; - + char tsdbDir[TSDB_FILENAME_LEN] = {0}; sprintf(tsdbDir, "%s/vnode%d/tsdb", tsVnodeDir, pVnodeCfg->cfg.vgId); code = tsdbCreateRepo(tsdbDir, &tsdbCfg, NULL); @@ -139,7 +139,7 @@ int32_t vnodeDrop(int32_t vgId) { vTrace("vgId:%d, vnode will be dropped", pVnode->vgId); pVnode->status = TAOS_VN_STATUS_DELETING; vnodeCleanUp(pVnode); - + return TSDB_CODE_SUCCESS; } @@ -262,7 +262,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { #endif // start continuous query - if (pVnode->role == TAOS_SYNC_ROLE_MASTER) + if (pVnode->role == TAOS_SYNC_ROLE_MASTER) cqStart(pVnode->cq); pVnode->events = NULL; @@ -342,7 +342,7 @@ void *vnodeAccquireVnode(int32_t vgId) { } void *vnodeGetRqueue(void *pVnode) { - return ((SVnodeObj *)pVnode)->rqueue; + return ((SVnodeObj *)pVnode)->rqueue; } void *vnodeGetWqueue(int32_t vgId) { @@ -352,7 +352,7 @@ void *vnodeGetWqueue(int32_t vgId) { } void *vnodeGetWal(void *pVnode) { - return ((SVnodeObj *)pVnode)->wal; + return ((SVnodeObj *)pVnode)->wal; } static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SDMStatusMsg *pStatus) { @@ -447,9 +447,9 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { vPrint("vgId:%d, sync role changed from %d to %d", pVnode->vgId, pVnode->role, role); pVnode->role = role; - if (pVnode->role == TAOS_SYNC_ROLE_MASTER) + if (pVnode->role == TAOS_SYNC_ROLE_MASTER) cqStart(pVnode->cq); - else + else cqStop(pVnode->cq); } @@ -501,14 +501,14 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { len += snprintf(content + len, maxLen - len, " \"daysToKeep2\": %d,\n", pVnodeCfg->cfg.daysToKeep2); len += snprintf(content + len, maxLen - len, " \"minRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.minRowsPerFileBlock); len += snprintf(content + len, maxLen - len, " \"maxRowsPerFileBlock\": %d,\n", pVnodeCfg->cfg.maxRowsPerFileBlock); - len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime); + len += snprintf(content + len, maxLen - len, " \"commitTime\": %d,\n", pVnodeCfg->cfg.commitTime); len += snprintf(content + len, maxLen - len, " \"precision\": %d,\n", pVnodeCfg->cfg.precision); len += snprintf(content + len, maxLen - len, " \"compression\": %d,\n", pVnodeCfg->cfg.compression); len += snprintf(content + len, maxLen - len, " \"walLevel\": %d,\n", pVnodeCfg->cfg.walLevel); len += snprintf(content + len, maxLen - len, " \"replica\": %d,\n", pVnodeCfg->cfg.replications); len += snprintf(content + len, maxLen - len, " \"wals\": %d,\n", pVnodeCfg->cfg.wals); len += snprintf(content + len, maxLen - len, " \"quorum\": %d,\n", pVnodeCfg->cfg.quorum); - + len += snprintf(content + len, maxLen - len, " \"nodeInfos\": [{\n"); for (int32_t i = 0; i < pVnodeCfg->cfg.replications; i++) { len += snprintf(content + len, maxLen - len, " \"nodeId\": %d,\n", pVnodeCfg->nodes[i].nodeId);