提交 afdd343e 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

minor changes

上级 bba1e0bc
...@@ -164,18 +164,11 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -164,18 +164,11 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS // vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
// cfgVersion can be corrected by status msg // cfgVersion can be corrected by status msg
if (pVnode->status != TAOS_VN_STATUS_READY) { if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_UPDATING) != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId); vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// the vnode may always fail to synchronize because of it in low cfgVersion
// so cannot use the following codes
// if (pVnode->syncCfg.replica > 1 && pVnode->role == TAOS_SYNC_ROLE_UNSYNCED)
// return TSDB_CODE_VND_NOT_SYNCED;
pVnode->status = TAOS_VN_STATUS_UPDATING;
int32_t code = vnodeSaveCfg(pVnodeCfg); int32_t code = vnodeSaveCfg(pVnodeCfg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY; pVnode->status = TAOS_VN_STATUS_READY;
...@@ -403,12 +396,6 @@ void *vnodeAcquireVnode(int32_t vgId) { ...@@ -403,12 +396,6 @@ void *vnodeAcquireVnode(int32_t vgId) {
} }
SVnodeObj *pVnode = *ppVnode; SVnodeObj *pVnode = *ppVnode;
if (pVnode->status == TAOS_VN_STATUS_RESET) {
terrno = TSDB_CODE_VND_INVALID_STATUS;
vInfo("vgId:%d, status is in reset", vgId);
return NULL;
}
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount); vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount);
...@@ -418,12 +405,28 @@ void *vnodeAcquireVnode(int32_t vgId) { ...@@ -418,12 +405,28 @@ void *vnodeAcquireVnode(int32_t vgId) {
void *vnodeAcquireRqueue(int32_t vgId) { void *vnodeAcquireRqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquireVnode(vgId); SVnodeObj *pVnode = vnodeAcquireVnode(vgId);
if (pVnode == NULL) return NULL; if (pVnode == NULL) return NULL;
if (pVnode->status == TAOS_VN_STATUS_RESET) {
terrno = TSDB_CODE_VND_INVALID_STATUS;
vInfo("vgId:%d, status is in reset", vgId);
vnodeRelease(pVnode);
return NULL;
}
return pVnode->rqueue; return pVnode->rqueue;
} }
void *vnodeAcquireWqueue(int32_t vgId) { void *vnodeAcquireWqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquireVnode(vgId); SVnodeObj *pVnode = vnodeAcquireVnode(vgId);
if (pVnode == NULL) return NULL; if (pVnode == NULL) return NULL;
if (pVnode->status == TAOS_VN_STATUS_RESET) {
terrno = TSDB_CODE_VND_INVALID_STATUS;
vInfo("vgId:%d, status is in reset", vgId);
vnodeRelease(pVnode);
return NULL;
}
return pVnode->wqueue; return pVnode->wqueue;
} }
...@@ -509,8 +512,16 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { ...@@ -509,8 +512,16 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
static void vnodeCleanUp(SVnodeObj *pVnode) { static void vnodeCleanUp(SVnodeObj *pVnode) {
// remove from hash, so new messages wont be consumed // remove from hash, so new messages wont be consumed
taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t));
int i = 0;
pVnode->status = TSDB_VN_STATUS_CLOSING; if (pVnode->status != TAOS_VN_STATUS_INIT) {
// it may be in updateing or reset state, then it shall wait
while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != TAOS_VN_STATUS_READY) {
if (++i % 1000 == 0) {
sched_yield();
}
}
}
// stop replication module // stop replication module
if (pVnode->sync) { if (pVnode->sync) {
...@@ -562,20 +573,17 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { ...@@ -562,20 +573,17 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
cqStop(pVnode->cq); cqStop(pVnode->cq);
} }
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { static int vnodeResetTsdb(SVnodeObj *pVnode)
SVnodeObj *pVnode = ahandle; {
vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion);
pVnode->fversion = fversion;
pVnode->version = fversion;
vnodeSaveVersion(pVnode);
char rootDir[128] = "\0"; char rootDir[128] = "\0";
sprintf(rootDir, "%s/tsdb", pVnode->rootDir); sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY) if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY)
return -1; return -1;
void *tsdb = pVnode->tsdb;
pVnode->tsdb = NULL;
// acquire vnode // acquire vnode
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1); int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
...@@ -583,9 +591,6 @@ static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { ...@@ -583,9 +591,6 @@ static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
tsem_wait(&pVnode->sem); tsem_wait(&pVnode->sem);
// close tsdb, then open tsdb // close tsdb, then open tsdb
void *tsdb = pVnode->tsdb;
pVnode->tsdb = NULL;
tsdbCloseRepo(tsdb, 0); tsdbCloseRepo(tsdb, 0);
STsdbAppH appH = {0}; STsdbAppH appH = {0};
appH.appH = (void *)pVnode; appH.appH = (void *)pVnode;
...@@ -596,13 +601,23 @@ static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { ...@@ -596,13 +601,23 @@ static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
appH.configFunc = dnodeSendCfgTableToRecv; appH.configFunc = dnodeSendCfgTableToRecv;
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH); pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
// vnode status may be changed to DELETING or CLOSING pVnode->status = TAOS_VN_STATUS_READY;
atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_RESET, TAOS_VN_STATUS_READY);
vnodeRelease(pVnode); vnodeRelease(pVnode);
return 0; return 0;
} }
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
SVnodeObj *pVnode = ahandle;
vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion);
pVnode->fversion = fversion;
pVnode->version = fversion;
vnodeSaveVersion(pVnode);
return vnodeResetTsdb(pVnode);
}
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
char cfgFile[TSDB_FILENAME_LEN + 30] = {0}; char cfgFile[TSDB_FILENAME_LEN + 30] = {0};
sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnodeCfg->cfg.vgId); sprintf(cfgFile, "%s/vnode%d/config.json", tsVnodeDir, pVnodeCfg->cfg.vgId);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册