提交 11dc189a 编写于 作者: 陶建辉(Jeff)'s avatar 陶建辉(Jeff)

add new state, TSDB reset wont happen if status is not ready or refCount not one

上级 2e975352
......@@ -91,23 +91,21 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
int32_t queuedMsgNum = 0;
int32_t leftLen = pMsg->contLen;
char *pCont = (char *) pMsg->pCont;
void *pVnode;
while (leftLen > 0) {
SMsgHead *pHead = (SMsgHead *) pCont;
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen);
pVnode = vnodeAcquireVnode(pHead->vgId);
taos_queue queue = vnodeAcquireRqueue(pHead->vgId);
if (pVnode == NULL) {
if (queue == NULL) {
leftLen -= pHead->contLen;
pCont -= pHead->contLen;
continue;
}
// put message into queue
taos_queue queue = vnodeGetRqueue(pVnode);
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg = *pMsg;
pRead->pCont = pCont;
......@@ -175,18 +173,6 @@ void dnodeFreeVnodeRqueue(void *rqueue) {
// dynamically adjust the number of threads
}
void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle;
pRead->contLen = 0;
assert(pVnode != NULL);
taos_queue queue = vnodeAcquireRqueue(pVnode);
taosWriteQitem(queue, TAOS_QTYPE_QUERY, pRead);
}
void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
SRpcMsg rpcRsp = {
.handle = pRead->rpcMsg.handle,
......
......@@ -104,7 +104,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
pHead->vgId = htonl(pHead->vgId);
pHead->contLen = htonl(pHead->contLen);
taos_queue queue = vnodeGetWqueue(pHead->vgId);
taos_queue queue = vnodeAcquireWqueue(pHead->vgId);
if (queue) {
// put message into queue
SWriteMsg *pWrite = (SWriteMsg *)taosAllocateQitem(sizeof(SWriteMsg));
......
......@@ -53,7 +53,6 @@ void *dnodeAllocateVnodeWqueue(void *pVnode);
void dnodeFreeVnodeWqueue(void *queue);
void *dnodeAllocateVnodeRqueue(void *pVnode);
void dnodeFreeVnodeRqueue(void *rqueue);
void dnodePutItemIntoReadQueue(void *pVnode, void *qhandle);
void dnodeSendRpcVnodeWriteRsp(void *pVnode, void *param, int32_t code);
int32_t dnodeAllocateMnodePqueue();
......
......@@ -79,7 +79,7 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code);
typedef void (*FNotifyRole)(void *ahandle, int8_t role);
// when data file is synced successfully, notity app
typedef void (*FNotifyFileSynced)(void *ahandle, uint64_t fversion);
typedef int (*FNotifyFileSynced)(void *ahandle, uint64_t fversion);
typedef struct {
int32_t vgId; // vgroup ID
......
......@@ -23,6 +23,7 @@ extern "C" {
typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT,
TAOS_VN_STATUS_UPDATING,
TAOS_VN_STATUS_RESET,
TAOS_VN_STATUS_READY,
TAOS_VN_STATUS_CLOSING,
TAOS_VN_STATUS_DELETING,
......@@ -48,13 +49,11 @@ int32_t vnodeStartStream(int32_t vgId);
int32_t vnodeAlter(void *pVnode, SMDCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId);
void vnodeRelease(void *pVnode);
void* vnodeAcquireVnode(int32_t vgId); // add refcount
void* vnodeGetVnode(int32_t vgId); // keep refcount unchanged
void vnodeRelease(void *pVnode);
void* vnodeAcquireRqueue(void *);
void* vnodeGetRqueue(void *);
void* vnodeGetWqueue(int32_t vgId);
void* vnodeAcquireRqueue(int32_t vgId);
void* vnodeAcquireWqueue(int32_t vgId);
void* vnodeGetWal(void *pVnode);
int32_t vnodeProcessWrite(void *pVnode, int qtype, void *pHead, void *item);
......
......@@ -37,7 +37,7 @@ extern int32_t vDebugFlag;
typedef struct {
int32_t vgId; // global vnode group ID
int32_t refCount; // reference count
int status;
int8_t status;
int8_t role;
int8_t accessState;
int64_t version; // current version
......@@ -55,6 +55,7 @@ typedef struct {
SWalCfg walCfg;
void *qMgmt;
char *rootDir;
tsem_t sem;
char db[TSDB_DB_NAME_LEN];
} SVnodeObj;
......
......@@ -44,7 +44,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status);
static uint32_t vnodeGetFileInfo(void *ahandle, char *name, uint32_t *index, uint32_t eindex, int32_t *size, uint64_t *fversion);
static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
#ifndef _SYNC
tsync_h syncStart(const SSyncInfo *info) { return NULL; }
......@@ -224,6 +224,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->tsdbCfg.tsdbId = pVnode->vgId;
pVnode->rootDir = strdup(rootDir);
pVnode->accessState = TSDB_VN_ALL_ACCCESS;
tsem_init(&pVnode->sem, 0, 0);
int32_t code = vnodeReadCfg(pVnode);
if (code != TSDB_CODE_SUCCESS) {
......@@ -348,6 +349,8 @@ void vnodeRelease(void *pVnodeRaw) {
if (refCount > 0) {
vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount);
if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2)
tsem_post(&pVnode->sem);
return;
}
......@@ -384,13 +387,14 @@ void vnodeRelease(void *pVnodeRaw) {
taosRemoveDir(rootDir);
}
tsem_destroy(&pVnode->sem);
free(pVnode);
int32_t count = taosHashGetSize(tsDnodeVnodesHash);
vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count);
}
void *vnodeGetVnode(int32_t vgId) {
void *vnodeAcquireVnode(int32_t vgId) {
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
if (ppVnode == NULL || *ppVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
......@@ -398,12 +402,12 @@ void *vnodeGetVnode(int32_t vgId) {
return NULL;
}
return *ppVnode;
}
void *vnodeAcquireVnode(int32_t vgId) {
SVnodeObj *pVnode = vnodeGetVnode(vgId);
if (pVnode == NULL) return pVnode;
SVnodeObj *pVnode = *ppVnode;
if (pVnode->status == TAOS_VN_STATUS_RESET) {
terrno = TSDB_CODE_VND_INVALID_STATUS;
vInfo("vgId:%d, status is in reset", vgId);
return NULL;
}
atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount);
......@@ -411,20 +415,13 @@ void *vnodeAcquireVnode(int32_t vgId) {
return pVnode;
}
void *vnodeAcquireRqueue(void *param) {
SVnodeObj *pVnode = param;
void *vnodeAcquireRqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquireVnode(vgId);
if (pVnode == NULL) return NULL;
atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("vgId:%d, get vnode rqueue, refCount:%d", pVnode->vgId, pVnode->refCount);
return ((SVnodeObj *)pVnode)->rqueue;
return pVnode->rqueue;
}
void *vnodeGetRqueue(void *pVnode) {
return ((SVnodeObj *)pVnode)->rqueue;
}
void *vnodeGetWqueue(int32_t vgId) {
void *vnodeAcquireWqueue(int32_t vgId) {
SVnodeObj *pVnode = vnodeAcquireVnode(vgId);
if (pVnode == NULL) return NULL;
return pVnode->wqueue;
......@@ -563,7 +560,7 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
cqStop(pVnode->cq);
}
static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
SVnodeObj *pVnode = ahandle;
vDebug("vgId:%d, data file is synced, fversion:%" PRId64, pVnode->vgId, fversion);
......@@ -573,8 +570,19 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
char rootDir[128] = "\0";
sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
// clsoe tsdb, then open tsdb
if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY)
return -1;
// acquire vnode
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
if (refCount > 2)
tsem_wait(&pVnode->sem);
// close tsdb, then open tsdb
void *tsdb = pVnode->tsdb;
pVnode->tsdb = NULL;
tsdbCloseRepo(tsdb, 0);
STsdbAppH appH = {0};
......@@ -585,6 +593,12 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
appH.cqDropFunc = cqDrop;
appH.configFunc = dnodeSendCfgTableToRecv;
pVnode->tsdb = tsdbOpenRepo(rootDir, &appH);
// vnode status may be changed to DELETING or CLOSING
atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_RESET, TAOS_VN_STATUS_READY);
vnodeRelease(pVnode);
return 0;
}
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg) {
......
......@@ -26,6 +26,7 @@
#include "tsdb.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "tqueue.h"
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SReadMsg *pReadMsg);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg);
......@@ -51,6 +52,11 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
return TSDB_CODE_VND_INVALID_STATUS;
}
// tsdb may be in reset state
if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY;
if (pVnode->status == TAOS_VN_STATUS_CLOSING || pVnode->status == TAOS_VN_STATUS_DELETING)
return TSDB_CODE_RPC_NOT_READY;
// TODO: Later, let slave to support query
if (pVnode->syncCfg.replica > 1 && pVnode->role != TAOS_SYNC_ROLE_MASTER) {
vDebug("vgId:%d, msgType:%s not processed, replica:%d role:%d", pVnode->vgId, taosMsg[msgType], pVnode->syncCfg.replica, pVnode->role);
......@@ -60,6 +66,16 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
return (*vnodeProcessReadMsgFp[msgType])(pVnode, pReadMsg);
}
static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void *qhandle) {
SReadMsg *pRead = (SReadMsg *)taosAllocateQitem(sizeof(SReadMsg));
pRead->rpcMsg.msgType = TSDB_MSG_TYPE_QUERY;
pRead->pCont = qhandle;
pRead->contLen = 0;
atomic_add_fetch_32(&pVnode->refCount, 1);
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
}
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
void *pCont = pReadMsg->pCont;
int32_t contLen = pReadMsg->contLen;
......@@ -131,7 +147,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if (handle != NULL) {
vDebug("vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app", vgId, *handle);
dnodePutItemIntoReadQueue(pVnode, *handle);
vnodePutItemIntoReadQueue(pVnode, *handle);
qReleaseQInfo(pVnode->qMgmt, (void**) &handle, false);
}
......@@ -208,7 +224,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
} else { // if failed to dump result, free qhandle immediately
if ((code = qDumpRetrieveResult(*handle, (SRetrieveTableRsp **)&pRet->rsp, &pRet->len)) == TSDB_CODE_SUCCESS) {
if (qHasMoreResultsToRetrieve(*handle)) {
dnodePutItemIntoReadQueue(pVnode, *handle);
vnodePutItemIntoReadQueue(pVnode, *handle);
pRet->qhandle = *handle;
freeHandle = false;
} else {
......
......@@ -60,7 +60,9 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
}
// tsdb may be in reset state
if (pVnode->tsdb == NULL) return 0;
if (pVnode->tsdb == NULL) return TSDB_CODE_RPC_NOT_READY;
if (pVnode->status == TAOS_VN_STATUS_CLOSING || pVnode->status == TAOS_VN_STATUS_DELETING)
return TSDB_CODE_RPC_NOT_READY;
if (pHead->version == 0) { // from client or CQ
if (pVnode->status != TAOS_VN_STATUS_READY) {
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册