提交 73db12c7 编写于 作者: S Shengliang Guan

TD-1746

上级 08fdc299
......@@ -189,7 +189,6 @@ void dnodeSendRpcReadRsp(void *pVnode, SReadMsg *pRead, int32_t code) {
void dnodeDispatchNonRspMsg(void *pVnode, SReadMsg *pRead, int32_t code) {
rpcFreeCont(pRead->rpcMsg.pCont);
vnodeRelease(pVnode);
return;
}
static void *dnodeProcessReadQueue(void *param) {
......
......@@ -111,7 +111,6 @@ void mnodeReleaseConn(SConnObj *pConn) {
}
SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t port) {
uint64_t expireTime = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs();
SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
if (pConn == NULL) {
mDebug("connId:%d, is already destroyed, user:%s ip:%s:%u", connId, user, taosIpStr(ip), port);
......@@ -126,7 +125,7 @@ SConnObj *mnodeAccquireConn(int32_t connId, char *user, uint32_t ip, uint16_t po
}
// mDebug("connId:%d, is incoming, user:%s ip:%s:%u", connId, pConn->user, taosIpStr(pConn->ip), pConn->port);
pConn->lastAccess = expireTime;
pConn->lastAccess = CONN_KEEP_TIME * 1000 + (uint64_t)taosGetTimestampMs();
return pConn;
}
......@@ -626,7 +625,7 @@ static int32_t mnodeProcessKillConnectionMsg(SMnodeMsg *pMsg) {
SCMKillConnMsg *pKill = pMsg->rpcMsg.pCont;
int32_t connId = atoi(pKill->queryId);
SConnObj * pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
SConnObj *pConn = taosCacheAcquireByKey(tsMnodeConnCache, &connId, sizeof(int32_t));
if (pConn == NULL) {
mError("connId:%s, failed to kill, conn not exist", pKill->queryId);
return TSDB_CODE_MND_INVALID_CONN_ID;
......
......@@ -50,6 +50,7 @@ typedef struct {
void *sync;
void *events;
void *cq; // continuous query
void **ppVnode;
int32_t cfgVersion;
STsdbCfg tsdbCfg;
SSyncCfg syncCfg;
......
......@@ -34,7 +34,7 @@
#define TSDB_VNODE_VERSION_CONTENT_LEN 31
static SHashObj*tsDnodeVnodesHash;
static SCacheObj* tsDnodeVnodesCache;
static void vnodeCleanUp(SVnodeObj *pVnode);
static int32_t vnodeSaveCfg(SMDCreateVnodeMsg *pVnodeCfg);
static int32_t vnodeReadCfg(SVnodeObj *pVnode);
......@@ -46,6 +46,7 @@ static int vnodeGetWalInfo(void *ahandle, char *name, uint32_t *index);
static void vnodeNotifyRole(void *ahandle, int8_t role);
static void vnodeCtrlFlow(void *handle, int32_t mseconds);
static int vnodeNotifyFileSynced(void *ahandle, uint64_t fversion);
static void vnodeFreeVnodeObj(void *data);
#ifndef _SYNC
tsync_h syncStart(const SSyncInfo *info) { return NULL; }
......@@ -63,9 +64,9 @@ int32_t vnodeInitResources() {
vnodeInitWriteFp();
vnodeInitReadFp();
tsDnodeVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, true);
if (tsDnodeVnodesHash == NULL) {
vError("failed to init vnode list");
tsDnodeVnodesCache = taosCacheInit(TSDB_DATA_TYPE_INT, 5, true, vnodeFreeVnodeObj, "vnode");
if (tsDnodeVnodesCache == NULL) {
vError("failed to init vnode cache");
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
......@@ -73,9 +74,10 @@ int32_t vnodeInitResources() {
}
void vnodeCleanupResources() {
if (tsDnodeVnodesHash != NULL) {
taosHashCleanup(tsDnodeVnodesHash);
tsDnodeVnodesHash = NULL;
if (tsDnodeVnodesCache != NULL) {
vDebug("vnode cache is cleanup");
taosCacheCleanup(tsDnodeVnodesCache);
tsDnodeVnodesCache = NULL;
}
syncCleanUp();
......@@ -84,9 +86,10 @@ void vnodeCleanupResources() {
int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
int32_t code;
SVnodeObj *pTemp = (SVnodeObj *)taosHashGet(tsDnodeVnodesHash, (const char *)&pVnodeCfg->cfg.vgId, sizeof(int32_t));
if (pTemp != NULL) {
vInfo("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pTemp);
SVnodeObj *pVnode = vnodeAcquire(pVnodeCfg->cfg.vgId);
if (pVnode != NULL) {
vDebug("vgId:%d, vnode already exist, pVnode:%p", pVnodeCfg->cfg.vgId, pVnode);
vnodeRelease(pVnode);
return TSDB_CODE_SUCCESS;
}
......@@ -143,22 +146,24 @@ int32_t vnodeCreate(SMDCreateVnodeMsg *pVnodeCfg) {
return TSDB_CODE_VND_INIT_FAILED;
}
vInfo("vgId:%d, vnode is created, walLevel:%d fsyncPeriod:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel, pVnodeCfg->cfg.fsyncPeriod);
vInfo("vgId:%d, vnode dir is created, walLevel:%d fsyncPeriod:%d", pVnodeCfg->cfg.vgId, pVnodeCfg->cfg.walLevel, pVnodeCfg->cfg.fsyncPeriod);
code = vnodeOpen(pVnodeCfg->cfg.vgId, rootDir);
return code;
}
int32_t vnodeDrop(int32_t vgId) {
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
if (ppVnode == NULL || *ppVnode == NULL) {
vDebug("vgId:%d, failed to drop, vgId not find", vgId);
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) {
vDebug("vgId:%d, failed to drop, vnode not find", vgId);
return TSDB_CODE_VND_INVALID_VGROUP_ID;
}
SVnodeObj *pVnode = *ppVnode;
vTrace("vgId:%d, vnode will be dropped, refCount:%d", pVnode->vgId, pVnode->refCount);
vInfo("vgId:%d, vnode will be dropped, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode,
pVnode->ppVnode);
pVnode->dropped = 1;
vnodeRelease(pVnode);
vnodeCleanUp(pVnode);
return TSDB_CODE_SUCCESS;
......@@ -231,6 +236,10 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->accessState = TSDB_VN_ALL_ACCCESS;
tsem_init(&pVnode->sem, 0, 0);
pVnode->ppVnode = taosCachePut(tsDnodeVnodesCache, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *), 8);
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p data:%p", pVnode->vgId, rootDir, pVnode, pVnode->ppVnode);
assert(pVnode->ppVnode != NULL);
int32_t code = vnodeReadCfg(pVnode);
if (code != TSDB_CODE_SUCCESS) {
vnodeCleanUp(pVnode);
......@@ -332,19 +341,16 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode->events = NULL;
pVnode->status = TAOS_VN_STATUS_READY;
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
taosHashPut(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t), (char *)(&pVnode), sizeof(SVnodeObj *));
return TSDB_CODE_SUCCESS;
}
int32_t vnodeClose(int32_t vgId) {
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
if (ppVnode == NULL || *ppVnode == NULL) return 0;
SVnodeObj *pVnode = vnodeAcquire(vgId);
if (pVnode == NULL) return 0;
SVnodeObj *pVnode = *ppVnode;
vDebug("vgId:%d, vnode will be closed", pVnode->vgId);
vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode);
vnodeRelease(pVnode);
vnodeCleanUp(pVnode);
return 0;
......@@ -357,19 +363,26 @@ void vnodeRelease(void *pVnodeRaw) {
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
assert(refCount >= 0);
void **ppVnode = pVnode->ppVnode;
taosCacheRelease(tsDnodeVnodesCache, (void **)(&ppVnode), false);
vTrace("vgId:%d, release vnode, refCount:%d pVnode:%p data:%p", vgId, refCount, pVnode, pVnode->ppVnode);
if (refCount > 0) {
vDebug("vgId:%d, release vnode, refCount:%d", vgId, refCount);
if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2)
if (pVnode->status == TAOS_VN_STATUS_RESET && refCount == 2) {
tsem_post(&pVnode->sem);
}
return;
}
qCleanupQueryMgmt(pVnode->qMgmt);
pVnode->qMgmt = NULL;
if (pVnode->qMgmt) {
qCleanupQueryMgmt(pVnode->qMgmt);
pVnode->qMgmt = NULL;
}
if (pVnode->tsdb)
if (pVnode->tsdb) {
tsdbCloseRepo(pVnode->tsdb, 1);
pVnode->tsdb = NULL;
pVnode->tsdb = NULL;
}
// stop continuous query
if (pVnode->cq) {
......@@ -378,18 +391,21 @@ void vnodeRelease(void *pVnodeRaw) {
cqClose(cq);
}
if (pVnode->wal)
if (pVnode->wal) {
walClose(pVnode->wal);
pVnode->wal = NULL;
pVnode->wal = NULL;
}
if (pVnode->wqueue)
if (pVnode->wqueue) {
dnodeFreeVnodeWqueue(pVnode->wqueue);
pVnode->wqueue = NULL;
pVnode->wqueue = NULL;
}
if (pVnode->rqueue)
if (pVnode->rqueue) {
dnodeFreeVnodeRqueue(pVnode->rqueue);
pVnode->rqueue = NULL;
pVnode->rqueue = NULL;
}
taosTFree(pVnode->rootDir);
if (pVnode->dropped) {
......@@ -412,21 +428,22 @@ void vnodeRelease(void *pVnodeRaw) {
tsem_destroy(&pVnode->sem);
free(pVnode);
int32_t count = taosHashGetSize(tsDnodeVnodesHash);
int32_t count = taosHashGetSize(tsDnodeVnodesCache->pHashTable);
vDebug("vgId:%d, vnode is released, vnodes:%d", vgId, count);
}
void *vnodeAcquire(int32_t vgId) {
SVnodeObj **ppVnode = (SVnodeObj **)taosHashGet(tsDnodeVnodesHash, (const char *)&vgId, sizeof(int32_t));
SVnodeObj **ppVnode = taosCacheAcquireByKey(tsDnodeVnodesCache, &vgId, sizeof(int32_t));
if (ppVnode == NULL || *ppVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
vInfo("vgId:%d, not exist", vgId);
vDebug("vgId:%d, not exist", vgId);
return NULL;
}
SVnodeObj *pVnode = *ppVnode;
atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("vgId:%d, get vnode, refCount:%d", pVnode->vgId, pVnode->refCount);
vTrace("vgId:%d, get vnode, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode, pVnode->ppVnode);
return pVnode;
}
......@@ -487,7 +504,7 @@ static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SDMStatusMsg *pStatus) {
}
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
SHashMutableIterator *pIter = taosHashCreateIter(tsDnodeVnodesHash);
SHashMutableIterator *pIter = taosHashCreateIter(tsDnodeVnodesCache->pHashTable);
while (taosHashIterNext(pIter)) {
SVnodeObj **pVnode = taosHashIterGet(pIter);
if (pVnode == NULL) continue;
......@@ -508,7 +525,7 @@ int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
void vnodeBuildStatusMsg(void *param) {
SDMStatusMsg *pStatus = param;
SHashMutableIterator *pIter = taosHashCreateIter(tsDnodeVnodesHash);
SHashMutableIterator *pIter = taosHashCreateIter(tsDnodeVnodesCache->pHashTable);
while (taosHashIterNext(pIter)) {
SVnodeObj **pVnode = taosHashIterGet(pIter);
......@@ -528,7 +545,7 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
if (pVnode != NULL) {
pVnode->accessState = pAccess[i].accessState;
if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) {
vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState)
vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState);
}
vnodeRelease(pVnode);
}
......@@ -537,12 +554,13 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
static void vnodeCleanUp(SVnodeObj *pVnode) {
// remove from hash, so new messages wont be consumed
taosHashRemove(tsDnodeVnodesHash, (const char *)&pVnode->vgId, sizeof(int32_t));
int i = 0;
// taosCacheRelease(tsDnodeVnodesCache, (void **)&pVnode->ppVnode, true);
if (pVnode->status != TAOS_VN_STATUS_INIT) {
// it may be in updateing or reset state, then it shall wait
while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) != TAOS_VN_STATUS_READY) {
int i = 0;
while (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_CLOSING) !=
TAOS_VN_STATUS_READY) {
if (++i % 1000 == 0) {
sched_yield();
}
......@@ -556,7 +574,8 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
syncStop(sync);
}
vTrace("vgId:%d, vnode will cleanup, refCount:%d", pVnode->vgId, pVnode->refCount);
vDebug("vgId:%d, vnode will cleanup, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode,
pVnode->ppVnode);
// release local resources only after cutting off outside connections
qQueryMgmtNotifyClosed(pVnode->qMgmt);
......@@ -613,17 +632,19 @@ static int vnodeResetTsdb(SVnodeObj *pVnode)
char rootDir[128] = "\0";
sprintf(rootDir, "%s/tsdb", pVnode->rootDir);
if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY)
if (atomic_val_compare_exchange_8(&pVnode->status, TAOS_VN_STATUS_READY, TAOS_VN_STATUS_RESET) != TAOS_VN_STATUS_READY) {
return -1;
}
void *tsdb = pVnode->tsdb;
pVnode->tsdb = NULL;
// acquire vnode
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
int32_t refCount = atomic_add_fetch_32(&pVnode->refCount, 1);
if (refCount > 2)
if (refCount > 2) {
tsem_wait(&pVnode->sem);
}
// close tsdb, then open tsdb
tsdbCloseRepo(tsdb, 0);
......@@ -1008,3 +1029,9 @@ PARSE_OVER:
if (fp) fclose(fp);
return terrno;
}
static void vnodeFreeVnodeObj(void *data) {
SVnodeObj *pVnode = *(SVnodeObj **)data;
vDebug("vgId:%d, vnode is destroyed, pVnode:%p data:%p", pVnode->vgId, pVnode, pVnode->ppVnode);
//taosTFree(pVnode);
}
\ No newline at end of file
......@@ -74,7 +74,7 @@ static void vnodePutItemIntoReadQueue(SVnodeObj *pVnode, void **qhandle) {
pRead->contLen = 0;
pRead->rpcMsg.handle = NULL;
atomic_add_fetch_32(&pVnode->refCount, 1);
vnodeAcquire(pVnode->vgId);
vDebug("QInfo:%p add to vread queue for exec query, msg:%p", *qhandle, pRead);
taosWriteQitem(pVnode->rqueue, TAOS_QTYPE_QUERY, pRead);
......
......@@ -201,8 +201,9 @@ int vnodeWriteCqMsgToQueue(void *param, void *data, int type) {
SWalHead *pWal = (SWalHead *)(pSync + 1);
memcpy(pWal, pHead, size);
atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("CQ: vgId:%d, get vnode wqueue, refCount:%d", pVnode->vgId, pVnode->refCount);
vnodeAcquire(pVnode->vgId);
vTrace("CQ: vgId:%d, get vnode wqueue, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode,
pVnode->ppVnode);
taosWriteQitem(pVnode->wqueue, type, pSync);
......@@ -218,8 +219,9 @@ int vnodeWriteToQueue(void *param, void *data, int type) {
SWalHead *pWal = (SWalHead *)taosAllocateQitem(size);
memcpy(pWal, pHead, size);
atomic_add_fetch_32(&pVnode->refCount, 1);
vDebug("vgId:%d, get vnode wqueue, refCount:%d", pVnode->vgId, pVnode->refCount);
vnodeAcquire(pVnode->vgId);
vTrace("vgId:%d, get vnode wqueue, refCount:%d pVnode:%p data:%p", pVnode->vgId, pVnode->refCount, pVnode,
pVnode->ppVnode);
taosWriteQitem(pVnode->wqueue, type, pWal);
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册