提交 2a4feceb 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

tune the code to make the vnode cleanup process more clear

上级 c733b76b
......@@ -18,6 +18,7 @@
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include <errno.h>
#include "taosdef.h"
#include "taosmsg.h"
#include "tglobal.h"
......@@ -64,7 +65,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj);
void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
SCqContext *pContext = calloc(sizeof(SCqContext), 1);
if (pContext == NULL) return NULL;
if (pContext == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
strcpy(pContext->user, pCfg->user);
strcpy(pContext->pass, pCfg->pass);
......@@ -82,6 +86,7 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
void cqClose(void *handle) {
SCqContext *pContext = handle;
if (handle == NULL) return;
// stop all CQs
cqStop(pContext);
......@@ -106,9 +111,9 @@ void cqClose(void *handle) {
void cqStart(void *handle) {
SCqContext *pContext = handle;
cTrace("vgId:%d, start all CQs", pContext->vgId);
if (pContext->dbConn || pContext->master) return;
cTrace("vgId:%d, start all CQs", pContext->vgId);
pthread_mutex_lock(&pContext->mutex);
pContext->master = 1;
......
......@@ -94,8 +94,8 @@ typedef void* tsync_h;
tsync_h syncStart(const SSyncInfo *);
void syncStop(tsync_h shandle);
int syncReconfig(tsync_h shandle, const SSyncCfg *);
int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype);
int32_t syncReconfig(tsync_h shandle, const SSyncCfg *);
int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype);
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code);
void syncRecover(tsync_h shandle); // recover from other nodes:
int syncGetNodesRole(tsync_h shandle, SNodesRole *);
......
......@@ -65,6 +65,7 @@ taos_queue taosOpenQueue() {
}
void taosCloseQueue(taos_queue param) {
if (param == NULL) return;
STaosQueue *queue = (STaosQueue *)param;
STaosQnode *pTemp;
STaosQnode *pNode = queue->head;
......@@ -224,6 +225,7 @@ taos_qset taosOpenQset() {
}
void taosCloseQset(taos_qset param) {
if (param == NULL) return;
STaosQset *qset = (STaosQset *)param;
pthread_mutex_destroy(&qset->mutex);
tsem_destroy(&qset->sem);
......
......@@ -35,7 +35,7 @@ static void vnodeCleanUp(SVnodeObj *pVnode);
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
static int32_t vnodeReadCfg(SVnodeObj *pVnode);
static int32_t vnodeSaveVersion(SVnodeObj *pVnode);
static bool vnodeReadVersion(SVnodeObj *pVnode);
static int32_t vnodeReadVersion(SVnodeObj *pVnode);
static int vnodeProcessTsdbStatus(void *arg, int status);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, int32_t *size, uint64_t *fversion);
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
......@@ -46,9 +46,9 @@ static pthread_once_t vnodeModuleInit = PTHREAD_ONCE_INIT;
#ifndef _SYNC
tsync_h syncStart(const SSyncInfo *info) { return NULL; }
int syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; }
int32_t syncForwardToPeer(tsync_h shandle, void *pHead, void *mhandle, int qtype) { return 0; }
void syncStop(tsync_h shandle) {}
int syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; }
int32_t syncReconfig(tsync_h shandle, const SSyncCfg * cfg) { return 0; }
int syncGetNodesRole(tsync_h shandle, SNodesRole * cfg) { return 0; }
void syncConfirmForward(tsync_h shandle, uint64_t version, int32_t code) {}
#endif
......@@ -185,26 +185,40 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pthread_once(&vnodeModuleInit, vnodeInit);
SVnodeObj *pVnode = calloc(sizeof(SVnodeObj), 1);
if (pVnode == NULL) {
vError("vgId:%d, failed to open vnode since no enough memory", vnode);
return TAOS_SYSTEM_ERROR(errno);
}
atomic_add_fetch_32(&tsOpennedVnodes, 1);
atomic_add_fetch_32(&pVnode->refCount, 1);
pVnode->vgId = vnode;
pVnode->status = TAOS_VN_STATUS_INIT;
pVnode->refCount = 1;
pVnode->version = 0;
pVnode->tsdbCfg.tsdbId = pVnode->vgId;
pVnode->rootDir = strdup(rootDir);
taosHashPut(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *));
int32_t code = vnodeReadCfg(pVnode);
if (code != TSDB_CODE_SUCCESS) {
vError("vgId:%d, failed to read cfg file", pVnode->vgId);
taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t));
vnodeCleanUp(pVnode);
return code;
}
code = vnodeReadVersion(pVnode);
if (code != TSDB_CODE_SUCCESS) {
vnodeCleanUp(pVnode);
return code;
}
vnodeReadVersion(pVnode);
pVnode->fversion = pVnode->version;
pVnode->wqueue = dnodeAllocateWqueue(pVnode);
pVnode->rqueue = dnodeAllocateRqueue(pVnode);
if (pVnode->wqueue == NULL || pVnode->rqueue == NULL) {
vnodeCleanUp(pVnode);
return terrno;
}
SCqCfg cqCfg = {0};
sprintf(cqCfg.user, "root");
......@@ -212,22 +226,29 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
cqCfg.vgId = vnode;
cqCfg.cqWrite = vnodeWriteToQueue;
pVnode->cq = cqOpen(pVnode, &cqCfg);
if (pVnode->cq == NULL) {
vnodeCleanUp(pVnode);
return terrno;
}
STsdbAppH appH = {0};
appH.appH = (void *)pVnode;
appH.notifyStatus = vnodeProcessTsdbStatus;
appH.cqH = pVnode->cq;
sprintf(temp, "%s/tsdb", rootDir);
pVnode->tsdb = tsdbOpenRepo(temp, &appH);
if (pVnode->tsdb == NULL) {
vError("vgId:%d, failed to open tsdb at %s(%s)", pVnode->vgId, temp, tstrerror(terrno));
taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t));
vnodeCleanUp(pVnode);
return terrno;
}
sprintf(temp, "%s/wal", rootDir);
pVnode->wal = walOpen(temp, &pVnode->walCfg);
if (pVnode->wal == NULL) {
vnodeCleanUp(pVnode);
return terrno;
}
walRestore(pVnode->wal, pVnode, vnodeWriteToQueue);
SSyncInfo syncInfo;
......@@ -243,6 +264,10 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
syncInfo.notifyRole = vnodeNotifyRole;
syncInfo.notifyFileSynced = vnodeNotifyFileSynced;
pVnode->sync = syncStart(&syncInfo);
if (pVnode->sync == NULL) {
vnodeCleanUp(pVnode);
return terrno;
}
#ifndef _SYNC
pVnode->role = TAOS_SYNC_ROLE_MASTER;
......@@ -253,11 +278,11 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
cqStart(pVnode->cq);
pVnode->events = NULL;
pVnode->status = TAOS_VN_STATUS_READY;
vTrace("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
atomic_add_fetch_32(&tsOpennedVnodes, 1);
taosHashPut(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *));
return TSDB_CODE_SUCCESS;
}
......@@ -286,13 +311,6 @@ void vnodeRelease(void *pVnodeRaw) {
}
tfree(pVnode->rootDir);
// remove read queue
dnodeFreeRqueue(pVnode->rqueue);
pVnode->rqueue = NULL;
// remove write queue
dnodeFreeWqueue(pVnode->wqueue);
pVnode->wqueue = NULL;
if (pVnode->status == TAOS_VN_STATUS_DELETING) {
char rootDir[TSDB_FILENAME_LEN] = {0};
......@@ -387,15 +405,26 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
pVnode->sync = NULL;
}
cqClose(pVnode->cq);
pVnode->cq = NULL;
if (pVnode->wal)
walClose(pVnode->wal);
pVnode->wal = NULL;
tsdbCloseRepo(pVnode->tsdb, 1);
if (pVnode->tsdb)
tsdbCloseRepo(pVnode->tsdb, 1);
pVnode->tsdb = NULL;
walClose(pVnode->wal);
pVnode->wal = NULL;
if (pVnode->cq)
cqClose(pVnode->cq);
pVnode->cq = NULL;
if (pVnode->wqueue)
dnodeFreeWqueue(pVnode->wqueue);
pVnode->wqueue = NULL;
if (pVnode->rqueue)
dnodeFreeRqueue(pVnode->rqueue);
pVnode->rqueue = NULL;
vnodeRelease(pVnode);
}
......@@ -512,27 +541,31 @@ static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
}
static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
char cfgFile[TSDB_FILENAME_LEN + 30] = {0};
cJSON *root = NULL;
char *content = NULL;
char cfgFile[TSDB_FILENAME_LEN + 30] = {0};
int maxLen = 1000;
int32_t code = TSDB_CODE_OTHERS;
terrno = TSDB_CODE_OTHERS;
sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnode->vgId);
FILE *fp = fopen(cfgFile, "r");
if (!fp) {
vError("vgId:%d, failed to open vnode cfg file for read, file:%s, error:%s", pVnode->vgId,
vError("vgId:%d, failed to open vnode cfg file:%s to read, error:%s", pVnode->vgId,
cfgFile, strerror(errno));
return errno;
code = TAOS_SYSTEM_ERROR(errno);
goto PARSE_OVER;
}
int ret = TSDB_CODE_OTHERS;
int maxLen = 1000;
char *content = calloc(1, maxLen + 1);
content = calloc(1, maxLen + 1);
if (content == NULL) goto PARSE_OVER;
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
free(content);
fclose(fp);
vError("vgId:%d, failed to read vnode cfg, content is null", pVnode->vgId);
return false;
return errno;
}
cJSON *root = cJSON_Parse(content);
root = cJSON_Parse(content);
if (root == NULL) {
vError("vgId:%d, failed to read vnode cfg, invalid json format", pVnode->vgId);
goto PARSE_OVER;
......@@ -691,19 +724,19 @@ static int32_t vnodeReadCfg(SVnodeObj *pVnode) {
pVnode->syncCfg.nodeInfo[i].nodePort += TSDB_PORT_SYNC;
}
ret = 0;
code = TSDB_CODE_SUCCESS;
vPrint("vgId:%d, read vnode cfg successed, replcia:%d", pVnode->vgId, pVnode->syncCfg.replica);
vPrint("vgId:%d, read vnode cfg successfully, replcia:%d", pVnode->vgId, pVnode->syncCfg.replica);
for (int32_t i = 0; i < pVnode->syncCfg.replica; i++) {
vPrint("vgId:%d, dnode:%d, %s:%d", pVnode->vgId, pVnode->syncCfg.nodeInfo[i].nodeId,
pVnode->syncCfg.nodeInfo[i].nodeFqdn, pVnode->syncCfg.nodeInfo[i].nodePort);
}
PARSE_OVER:
free(content);
tfree(content);
cJSON_Delete(root);
fclose(fp);
return ret;
if (fp) fclose(fp);
return code;
}
static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
......@@ -713,7 +746,7 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
if (!fp) {
vError("vgId:%d, failed to open vnode version file for write, file:%s error:%s", pVnode->vgId,
versionFile, strerror(errno));
return errno;
return TAOS_SYSTEM_ERROR(errno);
}
int32_t len = 0;
......@@ -733,29 +766,33 @@ static int32_t vnodeSaveVersion(SVnodeObj *pVnode) {
return 0;
}
static bool vnodeReadVersion(SVnodeObj *pVnode) {
char versionFile[TSDB_FILENAME_LEN + 30] = {0};
static int32_t vnodeReadVersion(SVnodeObj *pVnode) {
char versionFile[TSDB_FILENAME_LEN + 30] = {0};
char *content = NULL;
cJSON *root = NULL;
int maxLen = 100;
int32_t code = TSDB_CODE_OTHERS;
sprintf(versionFile, "%s/vnode%d/version.json", tsVnodeDir, pVnode->vgId);
FILE *fp = fopen(versionFile, "r");
if (!fp) {
if (errno != ENOENT) {
vError("vgId:%d, failed to open version file:%s error:%s", pVnode->vgId, versionFile, strerror(errno));
code = TAOS_SYSTEM_ERROR(errno);
} else {
code = TSDB_CODE_SUCCESS;
}
return false;
goto PARSE_OVER;
}
bool ret = false;
int maxLen = 100;
char *content = calloc(1, maxLen + 1);
content = calloc(1, maxLen + 1);
int len = fread(content, 1, maxLen, fp);
if (len <= 0) {
free(content);
fclose(fp);
vPrint("vgId:%d, failed to read vnode version, content is null", pVnode->vgId);
return false;
vError("vgId:%d, failed to read vnode version, content is null", pVnode->vgId);
goto PARSE_OVER;
}
cJSON *root = cJSON_Parse(content);
root = cJSON_Parse(content);
if (root == NULL) {
vError("vgId:%d, failed to read vnode version, invalid json format", pVnode->vgId);
goto PARSE_OVER;
......@@ -768,13 +805,13 @@ static bool vnodeReadVersion(SVnodeObj *pVnode) {
}
pVnode->version = version->valueint;
ret = true;
code = TSDB_CODE_SUCCESS;
vPrint("vgId:%d, read vnode version succeed, version:%" PRId64, pVnode->vgId, pVnode->version);
vPrint("vgId:%d, read vnode version successfully, version:%" PRId64, pVnode->vgId, pVnode->version);
PARSE_OVER:
free(content);
tfree(content);
cJSON_Delete(root);
fclose(fp);
return ret;
if(fp) fclose(fp);
return code;
}
......@@ -25,6 +25,7 @@
#include "tlog.h"
#include "tchecksum.h"
#include "tutil.h"
#include "taoserror.h"
#include "twal.h"
#include "tqueue.h"
......@@ -56,7 +57,10 @@ static int walRemoveWalFiles(const char *path);
void *walOpen(const char *path, const SWalCfg *pCfg) {
SWal *pWal = calloc(sizeof(SWal), 1);
if (pWal == NULL) return NULL;
if (pWal == NULL) {
terrno = TAOS_SYSTEM_ERROR(errno);
return NULL;
}
pWal->fd = -1;
pWal->max = pCfg->wals;
......@@ -75,6 +79,7 @@ void *walOpen(const char *path, const SWalCfg *pCfg) {
walRenew(pWal);
if (pWal->fd <0) {
terrno = TAOS_SYSTEM_ERROR(errno);
wError("wal:%s, failed to open", path);
pthread_mutex_destroy(&pWal->mutex);
free(pWal);
......@@ -112,9 +117,10 @@ void walClose(void *handle) {
}
int walRenew(void *handle) {
if (handle == NULL) return 0;
SWal *pWal = handle;
int code = 0;
pthread_mutex_lock(&pWal->mutex);
if (pWal->fd >=0) {
......@@ -156,6 +162,7 @@ int walRenew(void *handle) {
int walWrite(void *handle, SWalHead *pHead) {
SWal *pWal = handle;
int code = 0;
if (pWal == NULL) return -1;
// no wal
if (pWal->level == TAOS_WAL_NOLOG) return 0;
......@@ -178,6 +185,7 @@ int walWrite(void *handle, SWalHead *pHead) {
void walFsync(void *handle) {
SWal *pWal = handle;
if (pWal == NULL) return;
if (pWal->level == TAOS_WAL_FSYNC && pWal->fd >=0) {
if (fsync(pWal->fd) < 0) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册