未验证 提交 372c6168 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #2689 from taosdata/hotfix/TD-919

Hotfix/td 919
...@@ -399,7 +399,7 @@ static void* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -399,7 +399,7 @@ static void* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
SMDCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg); SMDCreateVnodeMsg *pCreate = dnodeParseVnodeMsg(rpcMsg);
void *pVnode = vnodeAcquireVnode(pCreate->cfg.vgId); void *pVnode = vnodeAcquire(pCreate->cfg.vgId);
if (pVnode != NULL) { if (pVnode != NULL) {
dDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId); dDebug("vgId:%d, already exist, return success", pCreate->cfg.vgId);
vnodeRelease(pVnode); vnodeRelease(pVnode);
...@@ -413,7 +413,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) { ...@@ -413,7 +413,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) { static int32_t dnodeProcessAlterVnodeMsg(SRpcMsg *rpcMsg) {
SMDAlterVnodeMsg *pAlter = dnodeParseVnodeMsg(rpcMsg); SMDAlterVnodeMsg *pAlter = dnodeParseVnodeMsg(rpcMsg);
void *pVnode = vnodeAcquireVnode(pAlter->cfg.vgId); void *pVnode = vnodeAcquire(pAlter->cfg.vgId);
if (pVnode != NULL) { if (pVnode != NULL) {
dDebug("vgId:%d, alter vnode msg is received", pAlter->cfg.vgId); dDebug("vgId:%d, alter vnode msg is received", pAlter->cfg.vgId);
int32_t code = vnodeAlter(pVnode, pAlter); int32_t code = vnodeAlter(pVnode, pAlter);
......
...@@ -91,23 +91,21 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) { ...@@ -91,23 +91,21 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0; int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen; int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont; char *pCont = (char *) pMsg->pCont;
void *pVnode;
while (leftLen > 0) { while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont; SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
pVnode = vnodeAcquireVnode(pHead->vgId); taos_queue queue = vnodeAcquireRqueue(pHead->vgId);
if (pVnode == NULL) { if (queue == NULL) {
leftLen -= pHead->contLen; leftLen -= pHead->contLen;
pCont -= pHead->contLen; pCont -= pHead->contLen;
continue; continue;
} }
// put message into queue // put message into queue
taos_queue queue = vnodeGetRqueue(pVnode);
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg)); SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = *pMsg; pRead->rpcMsg = *pMsg;
pRead->pCont = pCont; pRead->pCont = pCont;
...@@ -175,18 +173,6 @@ void dnodeFreeVnodeRqueue(void *rqueue) { ...@@ -175,18 +173,6 @@ void dnodeFreeVnodeRqueue(void *rqueue) {
// dynamically adjust the number of threads // dynamically adjust the number of threads
} }
void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle;
pRead->contLen = 0;
assert(pVnode != NULL);
taos_queue queue = vnodeAcquireRqueue(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead);
}
void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) { void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
SRpcMsg rpcRsp = { SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle, .handle = pRead->rpcMsg.handle,
......
...@@ -104,7 +104,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) { ...@@ -104,7 +104,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
pHead->vgId = htonl(pHead->vgId); pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen); pHead->contLen = htonl(pHead->contLen);
taos_queue queue = vnodeGetWqueue(pHead->vgId); taos_queue queue = vnodeAcquireWqueue(pHead->vgId);
if (queue) { if (queue) {
// put message into queue // put message into queue
SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg)); SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
......
...@@ -53,7 +53,6 @@ void *dnodeAllocateVnodeWqueue(void *pVnode); ...@@ -53,7 +53,6 @@ void *dnodeAllocateVnodeWqueue(void *pVnode);
void dnodeFreeVnodeWqueue(void *queue); void dnodeFreeVnodeWqueue(void *queue);
void *dnodeAllocateVnodeRqueue(void *pVnode); void *dnodeAllocateVnodeRqueue(void *pVnode);
void dnodeFreeVnodeRqueue(void *rqueue); void dnodeFreeVnodeRqueue(void *rqueue);
void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle);
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code); void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code);
int32_t dnodeAllocateMnodePqueue(); int32_t dnodeAllocateMnodePqueue();
......
...@@ -79,7 +79,7 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code); ...@@ -79,7 +79,7 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code);
typedef void (*FNotifyRole)(void *ahandle, int8_t role); typedef void (*FNotifyRole)(void *ahandle, int8_t role);
// when data file is synced successfully, notity app // when data file is synced successfully, notity app
typedef void (*FNotifyFileSynced)(void *ahandle, uint64_t fversion); typedef int (*FNotifyFileSynced)(void *ahandle, uint64_t fversion);
typedef struct { typedef struct {
int32_t vgId; // vgroup ID int32_t vgId; // vgroup ID
......
...@@ -22,10 +22,10 @@ extern "C" { ...@@ -22,10 +22,10 @@ extern "C" {
typedef enum _VN_STATUS { typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT, TAOS_VN_STATUS_INIT,
TAOS_VN_STATUS_UPDATING,
TAOS_VN_STATUS_READY, TAOS_VN_STATUS_READY,
TAOS_VN_STATUS_CLOSING, TAOS_VN_STATUS_CLOSING,
TAOS_VN_STATUS_DELETING, TAOS_VN_STATUS_UPDATING,
TAOS_VN_STATUS_RESET,
} EVnStatus; } EVnStatus;
typedef struct { typedef struct {
...@@ -47,13 +47,10 @@ int32_t vnodeOpen(int32_t vgId, char *rootDir); ...@@ -47,13 +47,10 @@ int32_t vnodeOpen(int32_t vgId, char *rootDir);
int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg); int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId); int32_t vnodeClose(int32_t vgId);
void vnodeRelease(void *pVnode); void* vnodeAcquire(int32_t vgId); // add refcount
void* vnodeAcquireVnode(int32_t vgId); // add refcount void* vnodeAcquireRqueue(int32_t vgId); // add refCount, get read queue
void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged void* vnodeAcquireWqueue(int32_t vgId); // add recCount, get write queue
void vnodeRelease(void *pVnode); // dec refCount
void* vnodeAcquireRqueue(void *);
void* vnodeGetRqueue(void *);
void* vnodeGetWqueue(int32_t vgId);
void* vnodeGetWal(void *pVnode); void* vnodeGetWal(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item); int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
......
...@@ -270,6 +270,14 @@ int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientI ...@@ -270,6 +270,14 @@ int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientI
return -1; return -1;
} }
/* set REUSEADDR option, so the portnumber can be re-used */
int reuse = 1;
if (taosSetSockOpt(sockFd, SOL_SOCKET, SO_REUSEADDR, (void *)&reuse, sizeof(reuse)) < 0) {
uError("setsockopt SO_REUSEADDR failed: %d (%s)", errno, strerror(errno));
close(sockFd);
return -1;
};
if ( clientIp != 0) { if ( clientIp != 0) {
memset((char *)&clientAddr, 0, sizeof(clientAddr)); memset((char *)&clientAddr, 0, sizeof(clientAddr));
clientAddr.sin_family = AF_INET; clientAddr.sin_family = AF_INET;
......
...@@ -37,7 +37,7 @@ extern int32_t vDebugFlag; ...@@ -37,7 +37,7 @@ extern int32_t vDebugFlag;
typedef struct { typedef struct {
int32_t vgId; // global vnode group ID int32_t vgId; // global vnode group ID
int32_t refCount; // reference count int32_t refCount; // reference count
int status; int8_t status;
int8_t role; int8_t role;
int8_t accessState; int8_t accessState;
int64_t version; // current version int64_t version; // current version
...@@ -55,6 +55,8 @@ typedef struct { ...@@ -55,6 +55,8 @@ typedef struct {
SWalCfg walCfg; SWalCfg walCfg;
void *qMgmt; void *qMgmt;
char *rootDir; char *rootDir;
tsem_t sem;
int8_t dropped;
char db[TSDB_DB_NAME_LEN]; char db[TSDB_DB_NAME_LEN];
} SVnodeObj; } SVnodeObj;
......
...@@ -44,7 +44,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status); ...@@ -44,7 +44,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion); static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion);
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index); static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role); static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion); static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
#ifndef _SYNC #ifndef _SYNC
tsync_h syncStart(const SSyncInfo *info) { return NULL; } tsync_h syncStart(const SSyncInfo *info) { return NULL; }
...@@ -153,7 +153,7 @@ int32_t vnodeDrop(int32_t vgId) { ...@@ -153,7 +153,7 @@ int32_t vnodeDrop(int32_t vgId) {
SVnodeObj *pVnode = *ppVnode; SVnodeObj *pVnode = *ppVnode;
vTrace("vgId:%d, vnode will be dropped, refCount:%d", pVnode->vgId, pVnode->refCount); vTrace("vgId:%d, vnode will be dropped, refCount:%d", pVnode->vgId, pVnode->refCount);
pVnode->status = TAOS_VN_STATUS_DELETING; pVnode->dropped = 1;
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
...@@ -164,18 +164,11 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -164,18 +164,11 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS // vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
// cfgVersion can be corrected by status msg // cfgVersion can be corrected by status msg
if (pVnode->status != TAOS_VN_STATUS_READY) { if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_UPDATING) != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId); vDebug("vgId:%d, vnode is not ready, do alter operation later", pVnode->vgId);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
// the vnode may always fail to synchronize because of it in low cfgVersion
// so cannot use the following codes
// if (pVnode->syncCfg.replica > 1 && pVnode->role == TAOS_SYNC_ROLE_UNSYNCED)
// return TSDB_CODE_VND_NOT_SYNCED;
pVnode->status = TAOS_VN_STATUS_UPDATING;
int32_t code = vnodeSaveCfg(pVnodeCfg); int32_t code = vnodeSaveCfg(pVnodeCfg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY; pVnode->status = TAOS_VN_STATUS_READY;
...@@ -194,11 +187,13 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) { ...@@ -194,11 +187,13 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
return code; return code;
} }
if (pVnode->tsdb) {
code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg); code = tsdbConfigRepo(pVnode->tsdb, &pVnode->tsdbCfg);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
pVnode->status = TAOS_VN_STATUS_READY; pVnode->status = TAOS_VN_STATUS_READY;
return code; return code;
} }
}
pVnode->status = TAOS_VN_STATUS_READY; pVnode->status = TAOS_VN_STATUS_READY;
vDebug("vgId:%d, vnode is altered", pVnode->vgId); vDebug("vgId:%d, vnode is altered", pVnode->vgId);
...@@ -223,6 +218,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -223,6 +218,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->tsdbCfg.tsdbId = pVnode->vgId; pVnode->tsdbCfg.tsdbId = pVnode->vgId;
pVnode->rootDir = strdup(rootDir); pVnode->rootDir = strdup(rootDir);
pVnode->accessState = TSDB_VN_ALL_ACCCESS; pVnode->accessState = TSDB_VN_ALL_ACCCESS;
tsem_init(&pVnode->sem, 0, 0);
int32_t code = vnodeReadCfg(pVnode); int32_t code = vnodeReadCfg(pVnode);
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
...@@ -319,7 +315,6 @@ int32_t vnodeClose(int32_t vgId) { ...@@ -319,7 +315,6 @@ int32_t vnodeClose(int32_t vgId) {
SVnodeObj *pVnode = *ppVnode; SVnodeObj *pVnode = *ppVnode;
vDebug("vgId:%d, vnode will be closed", pVnode->vgId); vDebug("vgId:%d, vnode will be closed", pVnode->vgId);
pVnode->status = TAOS_VN_STATUS_CLOSING;
vnodeCleanUp(pVnode); vnodeCleanUp(pVnode);
return 0; return 0;
...@@ -334,6 +329,8 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -334,6 +329,8 @@ void vnodeRelease(void *pVnodeRaw) {
if (refCount > 0) { if (refCount > 0) {
vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount); vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount);
if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2)
tsem_post(&pVnode->sem);
return; return;
} }
...@@ -344,11 +341,6 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -344,11 +341,6 @@ void vnodeRelease(void *pVnodeRaw) {
tsdbCloseRepo(pVnode->tsdb, 1); tsdbCloseRepo(pVnode->tsdb, 1);
pVnode->tsdb = NULL; pVnode->tsdb = NULL;
// stop continuous query
if (pVnode->cq)
cqClose(pVnode->cq);
pVnode->cq = NULL;
if (pVnode->wal) if (pVnode->wal)
walClose(pVnode->wal); walClose(pVnode->wal);
pVnode->wal = NULL; pVnode->wal = NULL;
...@@ -363,20 +355,21 @@ void vnodeRelease(void *pVnodeRaw) { ...@@ -363,20 +355,21 @@ void vnodeRelease(void *pVnodeRaw) {
tfree(pVnode->rootDir); tfree(pVnode->rootDir);
if (pVnode->status == TAOS_VN_STATUS_DELETING) { if (pVnode->dropped) {
char rootDir[TSDB_FILENAME_LEN] = {0}; char rootDir[TSDB_FILENAME_LEN] = {0};
sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId); sprintf(rootDir, "%s/vnode%d", tsVnodeDir, vgId);
taosMvDir(tsVnodeBakDir, rootDir); taosMvDir(tsVnodeBakDir, rootDir);
taosRemoveDir(rootDir); taosRemoveDir(rootDir);
} }
tsem_destroy(&pVnode->sem);
free(pVnode); free(pVnode);
int32_t count = taosHashGetSize(tsDnodeVnodesHash); int32_t count = taosHashGetSize(tsDnodeVnodesHash);
vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count); vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count);
} }
void *vnodeGetVnode(int32_t vgId) { void *vnodeAcquire(int32_t vgId) {
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t)); SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
if (ppVnode == NULL || *ppVnode == NULL) { if (ppVnode == NULL || *ppVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID; terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
...@@ -384,35 +377,38 @@ void *vnodeGetVnode(int32_t vgId) { ...@@ -384,35 +377,38 @@ void *vnodeGetVnode(int32_t vgId) {
return NULL; return NULL;
} }
return *ppVnode; SVnodeObj *pVnode = *ppVnode;
}
void *vnodeAcquireVnode(int32_t vgId) {
SVnodeObj *pVnode = vnodeGetVnode(vgId);
if (pVnode == NULL) return pVnode;
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount); vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount);
return pVnode; return pVnode;
} }
void *vnodeAcquireRqueue(void *param) { void *vnodeAcquireRqueue(int32_t vgId) {
SVnodeObj *pVnode = param; SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return NULL; if (pVnode == NULL) return NULL;
atomic_add_fetch_32(&pVnode->refCount, 1); if (pVnode->status == TAOS_VN_STATUS_RESET) {
vDebug("vgId:%d, get vnode rqueue, refCount:%d", pVnode->vgId, pVnode->refCount); terrno = TSDB_CODE_VND_INVALID_STATUS;
return ((SVnodeObj *)pVnode)->rqueue; vInfo("vgId:%d, status is in reset", vgId);
} vnodeRelease(pVnode);
return NULL;
}
void *vnodeGetRqueue(void *pVnode) { return pVnode->rqueue;
return ((SVnodeObj *)pVnode)->rqueue;
} }
void *vnodeGetWqueue(int32_t vgId) { void *vnodeAcquireWqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquireVnode(vgId); SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return NULL; if (pVnode == NULL) return NULL;
if (pVnode->status == TAOS_VN_STATUS_RESET) {
terrno = TSDB_CODE_VND_INVALID_STATUS;
vInfo("vgId:%d, status is in reset", vgId);
vnodeRelease(pVnode);
return NULL;
}
return pVnode->wqueue; return pVnode->wqueue;
} }
...@@ -484,7 +480,7 @@ void vnodeBuildStatusMsg(void *param) { ...@@ -484,7 +480,7 @@ void vnodeBuildStatusMsg(void *param) {
void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
for (int32_t i = 0; i < numOfVnodes; ++i) { for (int32_t i = 0; i < numOfVnodes; ++i) {
pAccess[i].vgId = htonl(pAccess[i].vgId); pAccess[i].vgId = htonl(pAccess[i].vgId);
SVnodeObj *pVnode = vnodeAcquireVnode(pAccess[i].vgId); SVnodeObj *pVnode = vnodeAcquire(pAccess[i].vgId);
if (pVnode != NULL) { if (pVnode != NULL) {
pVnode->accessState = pAccess[i].accessState; pVnode->accessState = pAccess[i].accessState;
if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) { if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) {
...@@ -498,11 +494,29 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) { ...@@ -498,11 +494,29 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
static void vnodeCleanUp(SVnodeObj *pVnode) { static void vnodeCleanUp(SVnodeObj *pVnode) {
// remove from hash, so new messages wont be consumed // remove from hash, so new messages wont be consumed
taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t)); taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t));
int i = 0;
if (pVnode->status != TAOS_VN_STATUS_INIT) {
// it may be in updateing or reset state, then it shall wait
while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != TAOS_VN_STATUS_READY) {
if (++i % 1000 == 0) {
sched_yield();
}
}
}
// stop replication module // stop replication module
if (pVnode->sync) { if (pVnode->sync) {
syncStop(pVnode->sync); void *sync = pVnode->sync;
pVnode->sync = NULL; pVnode->sync = NULL;
syncStop(sync);
}
// stop continuous query
if (pVnode->cq) {
void *cq = pVnode->cq;
pVnode->cq = NULL;
cqClose(cq);
} }
vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount); vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount);
...@@ -549,18 +563,25 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) { ...@@ -549,18 +563,25 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
cqStop(pVnode->cq); cqStop(pVnode->cq);
} }
static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { static int vnodeResetTsdb(SVnodeObj *pVnode)
SVnodeObj *pVnode = ahandle; {
vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion);
pVnode->fversion = fversion;
pVnode->version = fversion;
vnodeSaveVersion(pVnode);
char rootDir[128] = "\0"; char rootDir[128] = "\0";
sprintf(rootDir, "%s/tsdb", pVnode->rootDir); sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
// clsoe tsdb, then open tsdb
tsdbCloseRepo(pVnode->tsdb, 0); if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY)
return -1;
void *tsdb = pVnode->tsdb;
pVnode->tsdb = NULL;
// acquire vnode
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
if (refCount > 2)
tsem_wait(&pVnode->sem);
// close tsdb, then open tsdb
tsdbCloseRepo(tsdb, 0);
STsdbAppH appH = {0}; STsdbAppH appH = {0};
appH.appH = (void *)pVnode; appH.appH = (void *)pVnode;
appH.notifyStatus = vnodeProcessTsdbStatus; appH.notifyStatus = vnodeProcessTsdbStatus;
...@@ -569,6 +590,22 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) { ...@@ -569,6 +590,22 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
appH.cqDropFunc = cqDrop; appH.cqDropFunc = cqDrop;
appH.configFunc = dnodeSendCfgTableToRecv; appH.configFunc = dnodeSendCfgTableToRecv;
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH); pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
pVnode->status = TAOS_VN_STATUS_READY;
vnodeRelease(pVnode);
return 0;
}
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
SVnodeObj *pVnode = ahandle;
vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion);
pVnode->fversion = fversion;
pVnode->version = fversion;
vnodeSaveVersion(pVnode);
return vnodeResetTsdb(pVnode);
} }
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) { static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
......
...@@ -26,6 +26,7 @@ ...@@ -26,6 +26,7 @@
#include "tsdb.h" #include "tsdb.h"
#include "vnode.h" #include "vnode.h"
#include "vnodeInt.h" #include "vnodeInt.h"
#include "tqueue.h"
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
...@@ -51,6 +52,11 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { ...@@ -51,6 +52,11 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
return TSDB_CODE_VND_INVALID_STATUS; return TSDB_CODE_VND_INVALID_STATUS;
} }
// tsdb may be in reset state
if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY;
if (pVnode->status == TAOS_VN_STATUS_CLOSING)
return TSDB_CODE_RPC_NOT_READY;
// TODO: Later, let slave to support query // TODO: Later, let slave to support query
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role); vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
...@@ -60,6 +66,16 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) { ...@@ -60,6 +66,16 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg); return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
} }
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle;
pRead->contLen = 0;
atomic_add_fetch_32(&pVnode->refCount, 1);
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
}
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
void *pCont = pReadMsg->pCont; void *pCont = pReadMsg->pCont;
int32_t contLen = pReadMsg->contLen; int32_t contLen = pReadMsg->contLen;
...@@ -131,7 +147,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -131,7 +147,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (handle != NULL) { if (handle != NULL) {
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle); vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle);
dnodePutItemIntoReadQueue(pVnode, *handle); vnodePutItemIntoReadQueue(pVnode, *handle);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false); qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
} }
...@@ -208,7 +224,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) { ...@@ -208,7 +224,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
} else { // if failed to dump result, free qhandle immediately } else { // if failed to dump result, free qhandle immediately
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) { if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) {
if (qHasMoreResultsToRetrieve(*handle)) { if (qHasMoreResultsToRetrieve(*handle)) {
dnodePutItemIntoReadQueue(pVnode, *handle); vnodePutItemIntoReadQueue(pVnode, *handle);
pRet->qhandle = *handle; pRet->qhandle = *handle;
freeHandle = false; freeHandle = false;
} else { } else {
......
...@@ -59,13 +59,18 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) { ...@@ -59,13 +59,18 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
return TSDB_CODE_VND_NO_WRITE_AUTH; return TSDB_CODE_VND_NO_WRITE_AUTH;
} }
// tsdb may be in reset state
if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY;
if (pVnode->status == TAOS_VN_STATUS_CLOSING)
return TSDB_CODE_RPC_NOT_READY;
if (pHead->version == 0) { // from client or CQ if (pHead->version == 0) { // from client or CQ
if (pVnode->status != TAOS_VN_STATUS_READY) { if (pVnode->status != TAOS_VN_STATUS_READY) {
vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->status); vDebug("vgId:%d, msgType:%s not processed, vnode status is %d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->status);
return TSDB_CODE_VND_INVALID_STATUS; // it may be in deleting or closing state return TSDB_CODE_VND_INVALID_STATUS; // it may be in deleting or closing state
} }
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) { if (pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role); vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[pHead->msgType], pVnode->syncCfg.replica, pVnode->role);
return TSDB_CODE_RPC_NOT_READY; return TSDB_CODE_RPC_NOT_READY;
} }
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册