提交 0e7ba80d 编写于 作者: S Shengliang Guan

TD-10432 vnodeint.h

上级 ad3dbbd5
......@@ -16,6 +16,7 @@ target_link_libraries(
PUBLIC tq
PUBLIC tsdb
PUBLIC wal
PUBLIC sync
PUBLIC cjson
)
......
......@@ -16,18 +16,19 @@
#ifndef _TD_VNODE_INT_H_
#define _TD_VNODE_INT_H_
#include "os.h"
#include "amalloc.h"
#include "meta.h"
#include "os.h"
#include "sync.h"
#include "taosmsg.h"
#include "tlog.h"
#include "tq.h"
#include "tqueue.h"
#include "trpc.h"
#include "tsdb.h"
#include "tworker.h"
#include "vnode.h"
#include "tlog.h"
#include "tqueue.h"
#include "wal.h"
#include "tworker.h"
#ifdef __cplusplus
extern "C" {
......@@ -43,51 +44,28 @@ extern int32_t vDebugFlag;
#define vTrace(...) { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }}
typedef struct {
SMeta * pMeta;
STsdb * pTsdb;
STQ * pTQ;
SMemAllocator *allocator;
int32_t vgId; // global vnode group ID
int32_t refCount; // reference count
int64_t queuedWMsgSize;
int32_t queuedWMsg;
int32_t queuedRMsg;
int32_t numOfExistQHandle; // current initialized and existed query handle in current dnode
int32_t flowctrlLevel;
int8_t preClose; // drop and close switch
int8_t reserved[3];
int64_t sequence; // for topic
int8_t status;
int8_t role;
int8_t accessState;
int8_t isFull;
int8_t isCommiting;
int8_t dbReplica;
int8_t dropped;
int8_t dbType;
uint64_t version; // current version
uint64_t cversion; // version while commit start
uint64_t fversion; // version on saved data file
void * wqueue; // write queue
void * qqueue; // read query queue
void * fqueue; // read fetch/cancel queue
void * wal;
void * tsdb;
int64_t sync;
void * events;
void * cq; // continuous query
int32_t dbCfgVersion;
int32_t vgCfgVersion;
// STsdbCfg tsdbCfg;
#if 0
SSyncCfg syncCfg;
#endif
SWalCfg walCfg;
void * qMgmt;
char * rootDir;
tsem_t sem;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int32_t vgId; // global vnode group ID
int32_t refCount; // reference count
SMemAllocator *allocator;
SMeta *pMeta;
STsdb *pTsdb;
STQ *pTQ;
twalh pWal;
SyncNodeId syncNode;
taos_queue pWriteQ; // write queue
taos_queue pQueryQ; // read query queue
taos_queue pFetchQ; // read fetch/cancel queue
SWalCfg walCfg;
SSyncCluster syncCfg;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int64_t queuedWMsgSize;
int32_t queuedWMsg;
int32_t queuedRMsg;
int32_t numOfQHandle; // current initialized and existed query handle in current dnode
int8_t status;
int8_t role;
int8_t accessState;
int8_t dropped;
pthread_mutex_t statusMutex;
} SVnode;
......@@ -97,6 +75,8 @@ typedef struct {
void * qhandle; // used by query and retrieve msg
} SVnRsp;
void vnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
void vnodeSendMsgToMnode(struct SRpcMsg *rpcMsg);
void vnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
#ifdef __cplusplus
......
......@@ -23,8 +23,8 @@
#include "vnodeWrite.h"
static struct {
struct SSteps *steps;
SVnodeFp fp;
SSteps *steps;
SVnodeFp fp;
} tsVint;
int32_t vnodeInit(SVnodePara para) {
......@@ -34,11 +34,10 @@ int32_t vnodeInit(SVnodePara para) {
if (steps == NULL) return -1;
taosStepAdd(steps, "vnode-main", vnodeInitMain, vnodeCleanupMain);
taosStepAdd(steps, "vnode-worker",vnodeInitWorker, vnodeCleanupWorker);
taosStepAdd(steps, "vnode-worker", vnodeInitWorker, vnodeCleanupWorker);
taosStepAdd(steps, "vnode-read", vnodeInitRead, vnodeCleanupRead);
taosStepAdd(steps, "vnode-mgmt", vnodeInitMgmt, vnodeCleanupMgmt);
taosStepAdd(steps, "vnode-write", vnodeInitWrite, vnodeCleanupWrite);
// taosStepAdd(steps, "vnode-queue", tsdbInitCommitQueue, tsdbDestroyCommitQueue);
tsVint.steps = steps;
return taosStepExec(tsVint.steps);
......@@ -48,4 +47,10 @@ void vnodeCleanup() { taosStepCleanup(tsVint.steps); }
void vnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port) {
return (*tsVint.fp.GetDnodeEp)(dnodeId, ep, fqdn, port);
}
\ No newline at end of file
}
void vnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg) {
(*tsVint.fp.SendMsgToDnode)(epSet, rpcMsg);
}
void vnodeSendMsgToMnode(struct SRpcMsg *rpcMsg) { return (*tsVint.fp.SendMsgToMnode)(rpcMsg); }
......@@ -75,15 +75,16 @@ SVnode *vnodeAcquire(int32_t vgId) {
}
SVnode *vnodeAcquireNotClose(int32_t vgId) {
SVnode *pVnode = vnodeAcquire(vgId);
if (pVnode != NULL && pVnode->preClose == 1) {
vnodeRelease(pVnode);
terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
vDebug("vgId:%d, not exist, pre closing", vgId);
return NULL;
}
return pVnode;
// SVnode *pVnode = vnodeAcquire(vgId);
// if (pVnode != NULL && pVnode->preClose == 1) {
// vnodeRelease(pVnode);
// terrno = TSDB_CODE_VND_INVALID_VGROUP_ID;
// vDebug("vgId:%d, not exist, pre closing", vgId);
// return NULL;
// }
// return pVnode;
return NULL;
}
void vnodeRelease(SVnode *pVnode) {
......@@ -287,6 +288,7 @@ static int32_t vnodeAlterImp(SVnode *pVnode, SCreateVnodeMsg *pVnodeCfg) {
}
int32_t vnodeAlter(SVnode *pVnode, SCreateVnodeMsg *pVnodeCfg) {
#if 0
vDebug("vgId:%d, current dbCfgVersion:%d vgCfgVersion:%d, input dbCfgVersion:%d vgCfgVersion:%d", pVnode->vgId,
pVnode->dbCfgVersion, pVnode->vgCfgVersion, pVnodeCfg->cfg.dbCfgVersion, pVnodeCfg->cfg.vgCfgVersion);
......@@ -304,6 +306,8 @@ int32_t vnodeAlter(SVnode *pVnode, SCreateVnodeMsg *pVnodeCfg) {
}
return code;
#endif
return 0;
}
static void vnodeFindWalRootDir(int32_t vgId, char *walRootDir) {
......@@ -371,10 +375,10 @@ int32_t vnodeOpen(int32_t vgId) {
pVnode->fversion = pVnode->version;
pVnode->wqueue = vnodeAllocWriteQueue(pVnode);
pVnode->qqueue = vnodeAllocQueryQueue(pVnode);
pVnode->fqueue = vnodeAllocFetchQueue(pVnode);
if (pVnode->wqueue == NULL || pVnode->qqueue == NULL || pVnode->fqueue == NULL) {
pVnode->pWriteQ = vnodeAllocWriteQueue(pVnode);
pVnode->pQueryQ = vnodeAllocQueryQueue(pVnode);
pVnode->pFetchQ = vnodeAllocFetchQueue(pVnode);
if (pVnode->pWriteQ == NULL || pVnode->pQueryQ == NULL || pVnode->pFetchQ == NULL) {
vnodeCleanUp(pVnode);
return terrno;
}
......@@ -467,7 +471,7 @@ int32_t vnodeClose(int32_t vgId) {
return 0;
}
pVnode->preClose = 1;
// pVnode->preClose = 1;
vDebug("vgId:%d, vnode will be closed, pVnode:%p", pVnode->vgId, pVnode);
vnodeRelease(pVnode);
......@@ -516,19 +520,19 @@ void vnodeDestroy(SVnode *pVnode) {
pVnode->wal = NULL;
}
if (pVnode->wqueue) {
vnodeFreeWriteQueue(pVnode->wqueue);
pVnode->wqueue = NULL;
if (pVnode->pWriteQ) {
vnodeFreeWriteQueue(pVnode->pWriteQ);
pVnode->pWriteQ = NULL;
}
if (pVnode->qqueue) {
vnodeFreeQueryQueue(pVnode->qqueue);
pVnode->qqueue = NULL;
if (pVnode->pQueryQ) {
vnodeFreeQueryQueue(pVnode->pQueryQ);
pVnode->pQueryQ = NULL;
}
if (pVnode->fqueue) {
vnodeFreeFetchQueue(pVnode->fqueue);
pVnode->fqueue = NULL;
if (pVnode->pFetchQ) {
vnodeFreeFetchQueue(pVnode->pFetchQ);
pVnode->pFetchQ = NULL;
}
tfree(pVnode->rootDir);
......
......@@ -81,9 +81,9 @@ static int32_t vnodeWriteToRQueue(SVnode *pVnode, void *pCont, int32_t contLen,
atomic_add_fetch_32(&pVnode->queuedRMsg, 1);
if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) {
return taosWriteQitem(pVnode->fqueue, qtype, pRead);
return taosWriteQitem(pVnode->pFetchQ, qtype, pRead);
} else {
return taosWriteQitem(pVnode->qqueue, qtype, pRead);
return taosWriteQitem(pVnode->pQueryQ, qtype, pRead);
}
}
......
......@@ -158,7 +158,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) {
}
}
int32_t remain = atomic_add_fetch_32(&pVnode->numOfExistQHandle, 1);
int32_t remain = atomic_add_fetch_32(&pVnode->numOfQHandle, 1);
vTrace("vgId:%d, new qhandle created, total qhandle:%d", pVnode->vgId, remain);
} else {
assert(pCont != NULL);
......@@ -203,7 +203,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) {
// If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle
if (freehandle || (!buildRes)) {
if (freehandle) {
int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1);
int32_t remain = atomic_sub_fetch_32(&pVnode->numOfQHandle, 1);
vTrace("vgId:%d, QInfo:%p, start to free qhandle, remain qhandle:%d", pVnode->vgId, *qhandle, remain);
}
......@@ -282,7 +282,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) {
// kill current query and free corresponding resources.
if (pRetrieve->free == 1) {
int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1);
int32_t remain = atomic_sub_fetch_32(&pVnode->numOfQHandle, 1);
vWarn("vgId:%d, QInfo:%" PRIx64 "-%p, retrieve msg received to kill query and free qhandle, remain qhandle:%d",
pVnode->vgId, pRetrieve->qId, *handle, remain);
......@@ -296,7 +296,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) {
// register the qhandle to connect to quit query immediate if connection is broken
if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, pRetrieve->qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1);
int32_t remain = atomic_sub_fetch_32(&pVnode->numOfQHandle, 1);
vError("vgId:%d, QInfo:%" PRIu64 "-%p, retrieve discarded since link is broken, conn:%p, remain qhandle:%d",
pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle, remain);
......@@ -333,7 +333,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) {
// If qhandle is not added into vread queue, the query should be completed already or paused with error.
// Here free qhandle immediately
if (freeHandle) {
int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1);
int32_t remain = atomic_sub_fetch_32(&pVnode->numOfQHandle, 1);
vTrace("vgId:%d, QInfo:%p, start to free qhandle, remain qhandle:%d", pVnode->vgId, *handle, remain);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
}
......
......@@ -58,10 +58,12 @@ int32_t vnodeReadVersion(SVnode *pVnode) {
vError("vgId:%d, failed to read %s, version not found", pVnode->vgId, file);
goto PARSE_VER_ERROR;
}
#if 0
pVnode->version = (uint64_t)ver->valueint;
terrno = TSDB_CODE_SUCCESS;
vInfo("vgId:%d, read %s successfully, fver:%" PRIu64, pVnode->vgId, file, pVnode->version);
#endif
PARSE_VER_ERROR:
if (content != NULL) free(content);
......@@ -85,16 +87,17 @@ int32_t vnodeSaveVersion(SVnode *pVnode) {
int32_t maxLen = 100;
char * content = calloc(1, maxLen + 1);
#if 0
len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"version\": %" PRIu64 "\n", pVnode->fversion);
len += snprintf(content + len, maxLen - len, "}\n");
#endif
fwrite(content, 1, len, fp);
taosFsyncFile(fileno(fp));
fclose(fp);
free(content);
terrno = 0;
vInfo("vgId:%d, successed to write %s, fver:%" PRIu64, pVnode->vgId, file, pVnode->fversion);
// vInfo("vgId:%d, successed to write %s, fver:%" PRIu64, pVnode->vgId, file, pVnode->fversion);
return TSDB_CODE_SUCCESS;
}
\ No newline at end of file
......@@ -96,7 +96,7 @@ static int32_t vnodeWriteToWQueue(SVnode *pVnode, SWalHead *pHead, int32_t qtype
atomic_add_fetch_32(&tsVwrite.queuedMsgs, 1);
atomic_add_fetch_32(&pVnode->refCount, 1);
atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite);
taosWriteQitem(pVnode->pWriteQ, pWrite->qtype, pWrite);
return TSDB_CODE_SUCCESS;
}
......@@ -153,10 +153,10 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t
#if 0
pWrite->code = walWrite(pVnode->wal, pHead);
if (pWrite->code < 0) return false;
#endif
pVnode->version = pHead->version;
pVnode->version = pHead->version;
#endif
// write data locally
switch (msgType) {
case TSDB_MSG_TYPE_SUBMIT:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册