提交 7ad83106 编写于 作者: S Shengliang Guan

TD-2289

上级 90c133a1
...@@ -84,7 +84,7 @@ int32_t dnodeInitMgmt() { ...@@ -84,7 +84,7 @@ int32_t dnodeInitMgmt() {
dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp); dnodeAddClientRspHandle(TSDB_MSG_TYPE_DM_STATUS_RSP, dnodeProcessStatusRsp);
tsRebootTime = taosGetTimestampSec(); tsRebootTime = taosGetTimestampSec();
int32_t code = vnodeInitResources(); int32_t code = vnodeInitMgmt();
if (code != TSDB_CODE_SUCCESS) { if (code != TSDB_CODE_SUCCESS) {
dnodeCleanupMgmt(); dnodeCleanupMgmt();
return -1; return -1;
...@@ -174,7 +174,7 @@ void dnodeCleanupMgmt() { ...@@ -174,7 +174,7 @@ void dnodeCleanupMgmt() {
tsMgmtQset = NULL; tsMgmtQset = NULL;
tsMgmtQueue = NULL; tsMgmtQueue = NULL;
vnodeCleanupResources(); vnodeCleanupMgmt();
} }
static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) { static int32_t dnodeWriteToMgmtQueue(SRpcMsg *pMsg) {
......
...@@ -321,7 +321,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle); ...@@ -321,7 +321,7 @@ void tsdbCleanupQueryHandle(TsdbQueryHandleT queryHandle);
*/ */
void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage); void tsdbReportStat(void *repo, int64_t *totalPoints, int64_t *totalStorage, int64_t *compStorage);
int tsdbInitCommitQueue(int nthreads); int tsdbInitCommitQueue();
void tsdbDestroyCommitQueue(); void tsdbDestroyCommitQueue();
int tsdbSyncCommit(TSDB_REPO_T *repo); int tsdbSyncCommit(TSDB_REPO_T *repo);
void tsdbIncCommitRef(int vgId); void tsdbIncCommitRef(int vgId);
......
...@@ -19,11 +19,9 @@ ...@@ -19,11 +19,9 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "trpc.h" #include "trpc.h"
#include "twal.h" #include "twal.h"
typedef struct { typedef struct {
int32_t len; int32_t len;
void * rsp; void * rsp;
...@@ -53,29 +51,35 @@ typedef struct { ...@@ -53,29 +51,35 @@ typedef struct {
SWalHead pHead[]; SWalHead pHead[];
} SVWriteMsg; } SVWriteMsg;
// vnodeStatus
extern char *vnodeStatus[]; extern char *vnodeStatus[];
// vnodeMain
int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg); int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId); int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId, char *rootDir); int32_t vnodeOpen(int32_t vgId, char *rootDir);
int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg); int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId); int32_t vnodeClose(int32_t vgId);
void* vnodeAcquire(int32_t vgId); // add refcount // vnodeMgmt
void vnodeRelease(void *pVnode); // dec refCount int32_t vnodeInitMgmt();
void vnodeCleanupMgmt();
void* vnodeAcquire(int32_t vgId);
void vnodeRelease(void *pVnode);
void* vnodeGetWal(void *pVnode); void* vnodeGetWal(void *pVnode);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *pStatus);
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
// vnodeWrite
int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg); int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg);
void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite); void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite);
int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet); int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *pStatus);
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code);
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
int32_t vnodeInitResources(); // vnodeSync
void vnodeCleanupResources(); void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code);
// vnodeRead
int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam); int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam);
void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead); void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead);
int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead); int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
......
...@@ -14,6 +14,7 @@ ...@@ -14,6 +14,7 @@
*/ */
#include "os.h" #include "os.h"
#include "tglobal.h"
#include "tlist.h" #include "tlist.h"
#include "tref.h" #include "tref.h"
#include "tsdbMain.h" #include "tsdbMain.h"
...@@ -36,7 +37,8 @@ static void *tsdbLoopCommit(void *arg); ...@@ -36,7 +37,8 @@ static void *tsdbLoopCommit(void *arg);
SCommitQueue tsCommitQueue = {0}; SCommitQueue tsCommitQueue = {0};
int tsdbInitCommitQueue(int nthreads) { int tsdbInitCommitQueue() {
int nthreads = tsNumOfCommitThreads;
SCommitQueue *pQueue = &tsCommitQueue; SCommitQueue *pQueue = &tsCommitQueue;
if (nthreads < 1) nthreads = 1; if (nthreads < 1) nthreads = 1;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_TSTEP_H
#define TDENGINE_TSTEP_H
#ifdef __cplusplus
extern "C" {
#endif
typedef struct {
const char *const name;
int32_t (*initFp)();
void (*cleanupFp)();
int32_t step;
} SStep;
int32_t taosStepInit(SStep *pSteps, int32_t stepSize);
void taosStepCleanup(SStep *pSteps, int32_t stepSize);
#ifdef __cplusplus
}
#endif
#endif // TDENGINE_TUTIL_H
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tstep.h"
#include "tulog.h"
#include "taoserror.h"
void taosStepCleanupImp(SStep *pSteps, int32_t stepId) {
for (int32_t step = stepId; step >= 0; step--) {
SStep *pStep = pSteps + step;
uDebug("step:%s will cleanup", pStep->name);
if (pStep->cleanupFp != NULL) {
(*pStep->cleanupFp)();
}
}
}
int32_t taosStepInit(SStep *pSteps, int32_t stepSize) {
for (int32_t step = 0; step < stepSize; step++) {
SStep *pStep = pSteps + step;
if (pStep->initFp == NULL) continue;
int32_t code = (*pStep->initFp)();
if (code != 0) {
uDebug("step:%s will init", pStep->name);
taosStepCleanupImp(pSteps, step);
return code;
}
}
return 0;
}
void taosStepCleanup(SStep *pSteps, int32_t stepSize) {
return taosStepCleanupImp(pSteps, stepSize - 1);
}
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "vnodeInt.h"
int32_t vnodeReadCfg(SVnodeObj *pVnode); int32_t vnodeReadCfg(SVnodeObj *pVnode);
int32_t vnodeWriteCfg(SCreateVnodeMsg *pVnodeCfg); int32_t vnodeWriteCfg(SCreateVnodeMsg *pVnodeCfg);
......
...@@ -19,12 +19,11 @@ ...@@ -19,12 +19,11 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "tlog.h" #include "tlog.h"
#include "tsync.h" #include "tsync.h"
#include "twal.h"
#include "tcq.h" #include "tcq.h"
#include "tsdb.h" #include "tsdb.h"
#include "vnode.h"
extern int32_t vDebugFlag; extern int32_t vDebugFlag;
...@@ -36,40 +35,37 @@ extern int32_t vDebugFlag; ...@@ -36,40 +35,37 @@ extern int32_t vDebugFlag;
#define vTrace(...) { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }} #define vTrace(...) { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }}
typedef struct { typedef struct {
int32_t vgId; // global vnode group ID int32_t vgId; // global vnode group ID
int32_t refCount; // reference count int32_t refCount; // reference count
int32_t queuedWMsg; int32_t queuedWMsg;
int32_t queuedRMsg; int32_t queuedRMsg;
int32_t flowctrlLevel; int32_t flowctrlLevel;
int8_t status; int8_t status;
int8_t role; int8_t role;
int8_t accessState; int8_t accessState;
int8_t isFull; int8_t isFull;
int8_t isCommiting; int8_t isCommiting;
uint64_t version; // current version uint64_t version; // current version
uint64_t fversion; // version on saved data file uint64_t fversion; // version on saved data file
void *wqueue; void * wqueue;
void *rqueue; void * rqueue;
void *wal; void * wal;
void *tsdb; void * tsdb;
int64_t sync; int64_t sync;
void *events; void * events;
void *cq; // continuous query void * cq; // continuous query
int32_t cfgVersion; int32_t cfgVersion;
STsdbCfg tsdbCfg; STsdbCfg tsdbCfg;
SSyncCfg syncCfg; SSyncCfg syncCfg;
SWalCfg walCfg; SWalCfg walCfg;
void *qMgmt; void * qMgmt;
char *rootDir; char * rootDir;
tsem_t sem; tsem_t sem;
int8_t dropped; int8_t dropped;
char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN]; char db[TSDB_ACCT_LEN + TSDB_DB_NAME_LEN];
pthread_mutex_t statusMutex; pthread_mutex_t statusMutex;
} SVnodeObj; } SVnodeObj;
void vnodeInitWriteFp(void);
void vnodeInitReadFp(void);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_MAIN_H
#define TDENGINE_VNODE_MAIN_H
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId, char *rootDir);
int32_t vnodeAlter(void *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeClose(int32_t vgId);
int32_t vnodeReset(SVnodeObj *pVnode);
void vnodeDestroy(SVnodeObj *pVnode);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_MGMT_H
#define TDENGINE_VNODE_MGMT_H
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeInitMgmt();
void vnodeCleanupMgmt();
void* vnodeAcquire(int32_t vgId);
void vnodeRelease(void *pVnode);
void* vnodeGetWal(void *pVnode);
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes);
void vnodeBuildStatusMsg(void *pStatus);
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes);
void vnodeAddIntoHash(SVnodeObj* pVnode);
void vnodeRemoveFromHash(SVnodeObj * pVnode);
#ifdef __cplusplus
}
#endif
#endif
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_READ_H
#define TDENGINE_VNODE_READ_H
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeInitRead(void);
void vnodeCleanupRead(void);
int32_t vnodeWriteToRQueue(void *pVnode, void *pCont, int32_t contLen, int8_t qtype, void *rparam);
void vnodeFreeFromRQueue(void *pVnode, SVReadMsg *pRead);
int32_t vnodeProcessRead(void *pVnode, SVReadMsg *pRead);
#ifdef __cplusplus
}
#endif
#endif
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "vnodeInt.h"
typedef enum _VN_STATUS { typedef enum _VN_STATUS {
TAOS_VN_STATUS_INIT = 0, TAOS_VN_STATUS_INIT = 0,
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "vnodeInt.h"
uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fver); uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fver);
int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId); int32_t vnodeGetWalInfo(int32_t vgId, char *fileName, int64_t *fileId);
...@@ -29,6 +30,8 @@ void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code); ...@@ -29,6 +30,8 @@ void vnodeConfirmForard(int32_t vgId, void *wparam, int32_t code);
int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam); int32_t vnodeWriteToCache(int32_t vgId, void *wparam, int32_t qtype, void *rparam);
int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver); int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver);
void vnodeConfirmForward(void *pVnode, uint64_t version, int32_t code);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -19,6 +19,7 @@ ...@@ -19,6 +19,7 @@
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
#endif #endif
#include "vnodeInt.h"
int32_t vnodeReadVersion(SVnodeObj *pVnode); int32_t vnodeReadVersion(SVnodeObj *pVnode);
int32_t vnodeSaveVersion(SVnodeObj *pVnode); int32_t vnodeSaveVersion(SVnodeObj *pVnode);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef TDENGINE_VNODE_WRITE_H
#define TDENGINE_VNODE_WRITE_H
#ifdef __cplusplus
extern "C" {
#endif
#include "vnodeInt.h"
int32_t vnodeInitWrite(void);
void vnodeCleanupWrite(void);
int32_t vnodeWriteToWQueue(void *pVnode, void *pHead, int32_t qtype, void *pRpcMsg);
void vnodeFreeFromWQueue(void *pVnode, SVWriteMsg *pWrite);
int32_t vnodeProcessWrite(void *pVnode, void *pHead, int32_t qtype, void *pRspRet);
#ifdef __cplusplus
}
#endif
#endif
...@@ -15,13 +15,9 @@ ...@@ -15,13 +15,9 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taosmsg.h"
#include "taoserror.h"
#include "cJSON.h" #include "cJSON.h"
#include "tglobal.h" #include "tglobal.h"
#include "tsdb.h"
#include "dnode.h" #include "dnode.h"
#include "vnodeInt.h"
#include "vnodeCfg.h" #include "vnodeCfg.h"
static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) { static void vnodeLoadCfg(SVnodeObj *pVnode, SCreateVnodeMsg* vnodeMsg) {
......
...@@ -18,56 +18,17 @@ ...@@ -18,56 +18,17 @@
#include "taoserror.h" #include "taoserror.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tglobal.h" #include "tglobal.h"
#include "trpc.h" #include "tfs.h"
#include "tutil.h" #include "query.h"
#include "vnode.h" #include "dnode.h"
#include "vnodeInt.h"
#include "vnodeCfg.h" #include "vnodeCfg.h"
#include "vnodeStatus.h" #include "vnodeStatus.h"
#include "vnodeSync.h" #include "vnodeSync.h"
#include "vnodeVersion.h" #include "vnodeVersion.h"
#include "query.h" #include "vnodeMgmt.h"
#include "dnode.h"
#include "dnodeVWrite.h"
#include "dnodeVRead.h"
#include "tfs.h"
static SHashObj*tsVnodesHash;
static void vnodeCleanUp(SVnodeObj *pVnode);
static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
int32_t vnodeInitResources() {
int32_t code = syncInit();
if (code != 0) return code;
vnodeInitWriteFp();
vnodeInitReadFp();
tsVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK); static void vnodeCleanUp(SVnodeObj *pVnode);
if (tsVnodesHash == NULL) { static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno);
vError("failed to init vnode list");
return TSDB_CODE_VND_OUT_OF_MEMORY;
}
if (tsdbInitCommitQueue(tsNumOfCommitThreads) < 0) {
vError("failed to init vnode commit queue");
return terrno;
}
return TSDB_CODE_SUCCESS;
}
void vnodeCleanupResources() {
tsdbDestroyCommitQueue();
if (tsVnodesHash != NULL) {
vDebug("vnode list is cleanup");
taosHashCleanup(tsVnodesHash);
tsVnodesHash = NULL;
}
syncCleanUp();
}
int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) { int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg) {
int32_t code; int32_t code;
...@@ -313,8 +274,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) { ...@@ -313,8 +274,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode); vDebug("vgId:%d, vnode is opened in %s, pVnode:%p", pVnode->vgId, rootDir, pVnode);
tsdbIncCommitRef(pVnode->vgId); tsdbIncCommitRef(pVnode->vgId);
taosHashPut(tsVnodesHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *)); vnodeAddIntoHash(pVnode);
SSyncInfo syncInfo; SSyncInfo syncInfo;
syncInfo.vgId = pVnode->vgId; syncInfo.vgId = pVnode->vgId;
syncInfo.version = pVnode->version; syncInfo.version = pVnode->version;
...@@ -353,25 +314,10 @@ int32_t vnodeClose(int32_t vgId) { ...@@ -353,25 +314,10 @@ int32_t vnodeClose(int32_t vgId) {
return 0; return 0;
} }
void vnodeRelease(void *vparam) { void vnodeDestroy(SVnodeObj *pVnode) {
if (vparam == NULL) return; int32_t code = 0;
SVnodeObj *pVnode = vparam; int32_t vgId = pVnode->vgId;
int32_t code = 0;
int32_t vgId = pVnode->vgId;
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, release vnode, refCount:%d pVnode:%p", vgId, refCount, pVnode);
assert(refCount >= 0);
if (refCount > 0) {
if (vnodeInResetStatus(pVnode) && refCount <= 3) {
tsem_post(&pVnode->sem);
}
return;
}
vDebug("vgId:%d, vnode will be destroyed, refCount:%d pVnode:%p", vgId, refCount, pVnode);
if (pVnode->qMgmt) { if (pVnode->qMgmt) {
qCleanupQueryMgmt(pVnode->qMgmt); qCleanupQueryMgmt(pVnode->qMgmt);
pVnode->qMgmt = NULL; pVnode->qMgmt = NULL;
...@@ -436,113 +382,11 @@ void vnodeRelease(void *vparam) { ...@@ -436,113 +382,11 @@ void vnodeRelease(void *vparam) {
pthread_mutex_destroy(&pVnode->statusMutex); pthread_mutex_destroy(&pVnode->statusMutex);
free(pVnode); free(pVnode);
tsdbDecCommitRef(vgId); tsdbDecCommitRef(vgId);
int32_t count = taosHashGetSize(tsVnodesHash);
vDebug("vgId:%d, vnode is destroyed, vnodes:%d", vgId, count);
}
static void vnodeIncRef(void *ptNode) {
assert(ptNode != NULL);
SVnodeObj **ppVnode = (SVnodeObj **)ptNode;
assert(ppVnode);
assert(*ppVnode);
SVnodeObj *pVnode = *ppVnode;
atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
}
void *vnodeAcquire(int32_t vgId) {
SVnodeObj **ppVnode = taosHashGetCB(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *));
if (ppVnode == NULL || *ppVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
vDebug("vgId:%d, not exist", vgId);
return NULL;
}
return *ppVnode;
}
void *vnodeGetWal(void *pVnode) {
return ((SVnodeObj *)pVnode)->wal;
}
static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) {
int64_t totalStorage = 0;
int64_t compStorage = 0;
int64_t pointsWritten = 0;
if (!vnodeInReadyStatus(pVnode)) return;
if (pStatus->openVnodes >= TSDB_MAX_VNODES) return;
if (pVnode->tsdb) {
tsdbReportStat(pVnode->tsdb, &pointsWritten, &totalStorage, &compStorage);
}
SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
pLoad->vgId = htonl(pVnode->vgId);
pLoad->cfgVersion = htonl(pVnode->cfgVersion);
pLoad->totalStorage = htobe64(totalStorage);
pLoad->compStorage = htobe64(compStorage);
pLoad->pointsWritten = htobe64(pointsWritten);
pLoad->status = pVnode->status;
pLoad->role = pVnode->role;
pLoad->replica = pVnode->syncCfg.replica;
}
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
void *pIter = taosHashIterate(tsVnodesHash, NULL);
while (pIter) {
SVnodeObj **pVnode = pIter;
if (*pVnode) {
(*numOfVnodes)++;
if (*numOfVnodes >= TSDB_MAX_VNODES) {
vError("vgId:%d, too many open vnodes, exist:%d max:%d", (*pVnode)->vgId, *numOfVnodes, TSDB_MAX_VNODES);
continue;
} else {
vnodeList[*numOfVnodes - 1] = (*pVnode)->vgId;
}
}
pIter = taosHashIterate(tsVnodesHash, pIter);
}
return TSDB_CODE_SUCCESS;
}
void vnodeBuildStatusMsg(void *param) {
SStatusMsg *pStatus = param;
void *pIter = taosHashIterate(tsVnodesHash, NULL);
while (pIter) {
SVnodeObj **pVnode = pIter;
if (*pVnode) {
vnodeBuildVloadMsg(*pVnode, pStatus);
}
pIter = taosHashIterate(tsVnodesHash, pIter);
}
}
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) {
for (int32_t i = 0; i < numOfVnodes; ++i) {
pAccess[i].vgId = htonl(pAccess[i].vgId);
SVnodeObj *pVnode = vnodeAcquire(pAccess[i].vgId);
if (pVnode != NULL) {
pVnode->accessState = pAccess[i].accessState;
if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) {
vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState);
}
vnodeRelease(pVnode);
}
}
} }
static void vnodeCleanUp(SVnodeObj *pVnode) { static void vnodeCleanUp(SVnodeObj *pVnode) {
// remove from hash, so new messages wont be consumed // remove from hash, so new messages wont be consumed
taosHashRemove(tsVnodesHash, &pVnode->vgId, sizeof(int32_t)); vnodeRemoveFromHash(pVnode);
if (!vnodeInInitStatus(pVnode)) { if (!vnodeInInitStatus(pVnode)) {
// it may be in updateing or reset state, then it shall wait // it may be in updateing or reset state, then it shall wait
...@@ -602,8 +446,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) { ...@@ -602,8 +446,7 @@ static int32_t vnodeProcessTsdbStatus(void *arg, int32_t status, int32_t eno) {
return 0; return 0;
} }
int32_t vnodeReset(SVnodeObj *pVnode) {
int32_t vnodeResetTsdb(SVnodeObj *pVnode) {
char rootDir[128] = "\0"; char rootDir[128] = "\0";
sprintf(rootDir, "vnode/vnode%d/tsdb", pVnode->vgId); sprintf(rootDir, "vnode/vnode%d/tsdb", pVnode->vgId);
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#define _DEFAULT_SOURCE
#include "os.h"
#include "tstep.h"
#include "vnodeStatus.h"
#include "vnodeRead.h"
#include "vnodeWrite.h"
#include "vnodeMain.h"
static SHashObj *tsVnodesHash = NULL;
static int32_t vnodeInitHash(void);
static void vnodeCleanupHash(void);
static void vnodeIncRef(void *ptNode);
static SStep tsVnodeSteps[] = {
{"vsync", syncInit, syncCleanUp, 0},
{"vwrite", vnodeInitWrite, vnodeCleanupWrite, 1},
{"vread", vnodeInitRead, vnodeCleanupRead, 2},
{"vhash", vnodeInitHash, vnodeCleanupHash, 3},
{"vqueue", tsdbInitCommitQueue, tsdbDestroyCommitQueue, 4}
};
int32_t vnodeInitMgmt() {
int32_t stepSize = sizeof(tsVnodeSteps) / sizeof(SStep);
return taosStepInit(tsVnodeSteps, stepSize);
}
void vnodeCleanupMgmt() {
int32_t stepSize = sizeof(tsVnodeSteps) / sizeof(SStep);
taosStepCleanup(tsVnodeSteps, stepSize);
}
static int32_t vnodeInitHash() {
tsVnodesHash = taosHashInit(TSDB_MIN_VNODES, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_ENTRY_LOCK);
if (tsVnodesHash == NULL) {
vError("failed to init vnode mgmt");
return -1;
}
return 0;
}
static void vnodeCleanupHash() {
if (tsVnodesHash != NULL) {
vDebug("vnode mgmt is cleanup");
taosHashCleanup(tsVnodesHash);
tsVnodesHash = NULL;
}
}
void *vnodeGetWal(void *pVnode) {
return ((SVnodeObj *)pVnode)->wal;
}
void vnodeAddIntoHash(SVnodeObj *pVnode) {
taosHashPut(tsVnodesHash, &pVnode->vgId, sizeof(int32_t), &pVnode, sizeof(SVnodeObj *));
}
void vnodeRemoveFromHash(SVnodeObj *pVnode) {
taosHashRemove(tsVnodesHash, &pVnode->vgId, sizeof(int32_t));
}
static void vnodeIncRef(void *ptNode) {
assert(ptNode != NULL);
SVnodeObj **ppVnode = (SVnodeObj **)ptNode;
assert(ppVnode);
assert(*ppVnode);
SVnodeObj *pVnode = *ppVnode;
atomic_add_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, get vnode, refCount:%d pVnode:%p", pVnode->vgId, pVnode->refCount, pVnode);
}
void *vnodeAcquire(int32_t vgId) {
SVnodeObj **ppVnode = taosHashGetCB(tsVnodesHash, &vgId, sizeof(int32_t), vnodeIncRef, NULL, sizeof(void *));
if (ppVnode == NULL || *ppVnode == NULL) {
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
vDebug("vgId:%d, not exist", vgId);
return NULL;
}
return *ppVnode;
}
void vnodeRelease(void *vparam) {
SVnodeObj *pVnode = vparam;
if (vparam == NULL) return;
int32_t refCount = atomic_sub_fetch_32(&pVnode->refCount, 1);
vTrace("vgId:%d, release vnode, refCount:%d pVnode:%p", pVnode->vgId, refCount, pVnode);
assert(refCount >= 0);
if (refCount > 0) {
if (vnodeInResetStatus(pVnode) && refCount <= 3) {
tsem_post(&pVnode->sem);
}
} else {
vDebug("vgId:%d, vnode will be destroyed, refCount:%d pVnode:%p", pVnode->vgId, refCount, pVnode);
vnodeDestroy(pVnode);
int32_t count = taosHashGetSize(tsVnodesHash);
vDebug("vgId:%d, vnode is destroyed, vnodes:%d", pVnode->vgId, count);
}
}
static void vnodeBuildVloadMsg(SVnodeObj *pVnode, SStatusMsg *pStatus) {
int64_t totalStorage = 0;
int64_t compStorage = 0;
int64_t pointsWritten = 0;
if (!vnodeInReadyStatus(pVnode)) return;
if (pStatus->openVnodes >= TSDB_MAX_VNODES) return;
if (pVnode->tsdb) {
tsdbReportStat(pVnode->tsdb, &pointsWritten, &totalStorage, &compStorage);
}
SVnodeLoad *pLoad = &pStatus->load[pStatus->openVnodes++];
pLoad->vgId = htonl(pVnode->vgId);
pLoad->cfgVersion = htonl(pVnode->cfgVersion);
pLoad->totalStorage = htobe64(totalStorage);
pLoad->compStorage = htobe64(compStorage);
pLoad->pointsWritten = htobe64(pointsWritten);
pLoad->status = pVnode->status;
pLoad->role = pVnode->role;
pLoad->replica = pVnode->syncCfg.replica;
}
int32_t vnodeGetVnodeList(int32_t vnodeList[], int32_t *numOfVnodes) {
void *pIter = taosHashIterate(tsVnodesHash, NULL);
while (pIter) {
SVnodeObj **pVnode = pIter;
if (*pVnode) {
(*numOfVnodes)++;
if (*numOfVnodes >= TSDB_MAX_VNODES) {
vError("vgId:%d, too many open vnodes, exist:%d max:%d", (*pVnode)->vgId, *numOfVnodes, TSDB_MAX_VNODES);
continue;
} else {
vnodeList[*numOfVnodes - 1] = (*pVnode)->vgId;
}
}
pIter = taosHashIterate(tsVnodesHash, pIter);
}
return TSDB_CODE_SUCCESS;
}
void vnodeBuildStatusMsg(void *param) {
SStatusMsg *pStatus = param;
void *pIter = taosHashIterate(tsVnodesHash, NULL);
while (pIter) {
SVnodeObj **pVnode = pIter;
if (*pVnode) {
vnodeBuildVloadMsg(*pVnode, pStatus);
}
pIter = taosHashIterate(tsVnodesHash, pIter);
}
}
void vnodeSetAccess(SVgroupAccess *pAccess, int32_t numOfVnodes) {
for (int32_t i = 0; i < numOfVnodes; ++i) {
pAccess[i].vgId = htonl(pAccess[i].vgId);
SVnodeObj *pVnode = vnodeAcquire(pAccess[i].vgId);
if (pVnode != NULL) {
pVnode->accessState = pAccess[i].accessState;
if (pVnode->accessState != TSDB_VN_ALL_ACCCESS) {
vDebug("vgId:%d, access state is set to %d", pAccess[i].vgId, pVnode->accessState);
}
vnodeRelease(pVnode);
}
}
}
...@@ -16,27 +16,25 @@ ...@@ -16,27 +16,25 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#define _NON_BLOCKING_RETRIEVE 0 #define _NON_BLOCKING_RETRIEVE 0
#include "os.h" #include "os.h"
#include "tglobal.h"
#include "taoserror.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tqueue.h"
#include "query.h" #include "query.h"
#include "trpc.h"
#include "tsdb.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "vnodeStatus.h" #include "vnodeStatus.h"
#include "tqueue.h"
static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t (*vnodeProcessReadMsgFp[TSDB_MSG_TYPE_MAX])(SVnodeObj *pVnode, SVReadMsg *pRead);
static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead); static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SVReadMsg *pRead);
static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId); static int32_t vnodeNotifyCurrentQhandle(void* handle, void* qhandle, int32_t vgId);
void vnodeInitReadFp(void) { int32_t vnodeInitRead(void) {
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_QUERY] = vnodeProcessQueryMsg;
vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg; vnodeProcessReadMsgFp[TSDB_MSG_TYPE_FETCH] = vnodeProcessFetchMsg;
return 0;
} }
void vnodeCleanupRead() {}
// //
// After the fetch request enters the vnode queue, if the vnode cannot provide services, the process function are // After the fetch request enters the vnode queue, if the vnode cannot provide services, the process function are
// still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the // still required, or there will be a deadlock, so we don’t do any check here, but put the check codes before the
......
...@@ -15,9 +15,6 @@ ...@@ -15,9 +15,6 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taosmsg.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "vnodeStatus.h" #include "vnodeStatus.h"
char* vnodeStatus[] = { char* vnodeStatus[] = {
......
...@@ -15,21 +15,11 @@ ...@@ -15,21 +15,11 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tglobal.h"
#include "trpc.h"
#include "tutil.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "vnodeCfg.h"
#include "vnodeStatus.h"
#include "vnodeVersion.h"
#include "query.h" #include "query.h"
#include "dnode.h" #include "dnode.h"
#include "dnodeVWrite.h" #include "vnodeVersion.h"
#include "dnodeVRead.h" #include "vnodeMain.h"
#include "tfs.h"
uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fver) { uint32_t vnodeGetFileInfo(int32_t vgId, char *name, uint32_t *index, uint32_t eindex, int64_t *size, uint64_t *fver) {
SVnodeObj *pVnode = vnodeAcquire(vgId); SVnodeObj *pVnode = vnodeAcquire(vgId);
...@@ -105,7 +95,7 @@ int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion) { ...@@ -105,7 +95,7 @@ int32_t vnodeNotifyFileSynced(int32_t vgId, uint64_t fversion) {
vnodeSaveVersion(pVnode); vnodeSaveVersion(pVnode);
vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion); vDebug("vgId:%d, data file is synced, fver:%" PRIu64 " vver:%" PRIu64, vgId, fversion, fversion);
int32_t code = vnodeResetTsdb(pVnode); int32_t code = vnodeReset(pVnode);
vnodeRelease(pVnode); vnodeRelease(pVnode);
return code; return code;
...@@ -154,3 +144,8 @@ int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) { ...@@ -154,3 +144,8 @@ int32_t vnodeGetVersion(int32_t vgId, uint64_t *fver, uint64_t *wver) {
vnodeRelease(pVnode); vnodeRelease(pVnode);
return code; return code;
} }
void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code) {
SVnodeObj *pVnode = vparam;
syncConfirmForward(pVnode->sync, version, code);
}
...@@ -15,11 +15,8 @@ ...@@ -15,11 +15,8 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "taoserror.h"
#include "cJSON.h" #include "cJSON.h"
#include "tglobal.h" #include "tglobal.h"
#include "tsdb.h"
#include "vnodeInt.h"
#include "vnodeVersion.h" #include "vnodeVersion.h"
int32_t vnodeReadVersion(SVnodeObj *pVnode) { int32_t vnodeReadVersion(SVnodeObj *pVnode) {
......
...@@ -19,18 +19,9 @@ ...@@ -19,18 +19,9 @@
#include "taoserror.h" #include "taoserror.h"
#include "tglobal.h" #include "tglobal.h"
#include "tqueue.h" #include "tqueue.h"
#include "trpc.h"
#include "tsdb.h"
#include "twal.h"
#include "tsync.h"
#include "ttimer.h" #include "ttimer.h"
#include "tdataformat.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "vnodeStatus.h"
#include "syncInt.h"
#include "tcq.h"
#include "dnode.h" #include "dnode.h"
#include "vnodeStatus.h"
#define MAX_QUEUED_MSG_NUM 10000 #define MAX_QUEUED_MSG_NUM 10000
...@@ -44,15 +35,19 @@ static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet ...@@ -44,15 +35,19 @@ static int32_t vnodeProcessDropStableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *); static int32_t vnodeProcessUpdateTagValMsg(SVnodeObj *pVnode, void *pCont, SRspRet *);
static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite); static int32_t vnodePerformFlowCtrl(SVWriteMsg *pWrite);
void vnodeInitWriteFp(void) { int32_t vnodeInitWrite(void) {
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_SUBMIT] = vnodeProcessSubmitMsg;
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessCreateTableMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_CREATE_TABLE] = vnodeProcessCreateTableMsg;
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessDropTableMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_TABLE] = vnodeProcessDropTableMsg;
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessAlterTableMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_ALTER_TABLE] = vnodeProcessAlterTableMsg;
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessDropStableMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_MD_DROP_STABLE] = vnodeProcessDropStableMsg;
vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg; vnodeProcessWriteMsgFp[TSDB_MSG_TYPE_UPDATE_TAG_VAL] = vnodeProcessUpdateTagValMsg;
return 0;
} }
void vnodeCleanupWrite() {}
int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) { int32_t vnodeProcessWrite(void *vparam, void *wparam, int32_t qtype, void *rparam) {
int32_t code = 0; int32_t code = 0;
SVnodeObj *pVnode = vparam; SVnodeObj *pVnode = vparam;
...@@ -133,11 +128,6 @@ static int32_t vnodeCheckWrite(void *vparam) { ...@@ -133,11 +128,6 @@ static int32_t vnodeCheckWrite(void *vparam) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
void vnodeConfirmForward(void *vparam, uint64_t version, int32_t code) {
SVnodeObj *pVnode = vparam;
syncConfirmForward(pVnode->sync, version, code);
}
static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) { static int32_t vnodeProcessSubmitMsg(SVnodeObj *pVnode, void *pCont, SRspRet *pRet) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册