未验证 提交 05613886 编写于 作者: S Shengliang Guan 提交者: GitHub

Merge pull request #8431 from taosdata/feature/dnode3

Feature/dnode3
...@@ -14,12 +14,12 @@ ...@@ -14,12 +14,12 @@
*/ */
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "tthread.h"
#include "dnodeEps.h"
#include "dnodeMsg.h" #include "dnodeMsg.h"
#include "dnodeEps.h"
#include "mnode.h" #include "mnode.h"
#include "vnode.h" #include "tthread.h"
#include "ttime.h" #include "ttime.h"
#include "vnode.h"
static struct { static struct {
pthread_t *threadId; pthread_t *threadId;
......
...@@ -16,6 +16,7 @@ target_link_libraries( ...@@ -16,6 +16,7 @@ target_link_libraries(
PUBLIC tq PUBLIC tq
PUBLIC tsdb PUBLIC tsdb
PUBLIC wal PUBLIC wal
PUBLIC sync
PUBLIC cjson PUBLIC cjson
) )
......
...@@ -16,18 +16,19 @@ ...@@ -16,18 +16,19 @@
#ifndef _TD_VNODE_INT_H_ #ifndef _TD_VNODE_INT_H_
#define _TD_VNODE_INT_H_ #define _TD_VNODE_INT_H_
#include "os.h"
#include "amalloc.h" #include "amalloc.h"
#include "meta.h" #include "meta.h"
#include "os.h"
#include "sync.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "tlog.h"
#include "tq.h" #include "tq.h"
#include "tqueue.h"
#include "trpc.h" #include "trpc.h"
#include "tsdb.h" #include "tsdb.h"
#include "tworker.h"
#include "vnode.h" #include "vnode.h"
#include "tlog.h"
#include "tqueue.h"
#include "wal.h" #include "wal.h"
#include "tworker.h"
#ifdef __cplusplus #ifdef __cplusplus
extern "C" { extern "C" {
...@@ -43,51 +44,28 @@ extern int32_t vDebugFlag; ...@@ -43,51 +44,28 @@ extern int32_t vDebugFlag;
#define vTrace(...) { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }} #define vTrace(...) { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", vDebugFlag, __VA_ARGS__); }}
typedef struct { typedef struct {
SMeta * pMeta; int32_t vgId; // global vnode group ID
STsdb * pTsdb; int32_t refCount; // reference count
STQ * pTQ; SMemAllocator *allocator;
SMemAllocator *allocator; SMeta *pMeta;
STsdb *pTsdb;
int32_t vgId; // global vnode group ID STQ *pTQ;
int32_t refCount; // reference count twalh pWal;
int64_t queuedWMsgSize; SyncNodeId syncNode;
int32_t queuedWMsg; taos_queue pWriteQ; // write queue
int32_t queuedRMsg; taos_queue pQueryQ; // read query queue
int32_t numOfExistQHandle; // current initialized and existed query handle in current dnode taos_queue pFetchQ; // read fetch/cancel queue
int32_t flowctrlLevel; SWalCfg walCfg;
int8_t preClose; // drop and close switch SSyncCluster syncCfg;
int8_t reserved[3]; char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
int64_t sequence; // for topic int64_t queuedWMsgSize;
int8_t status; int32_t queuedWMsg;
int8_t role; int32_t queuedRMsg;
int8_t accessState; int32_t numOfQHandle; // current initialized and existed query handle in current dnode
int8_t isFull; int8_t status;
int8_t isCommiting; int8_t role;
int8_t dbReplica; int8_t accessState;
int8_t dropped; int8_t dropped;
int8_t dbType;
uint64_t version; // current version
uint64_t cversion; // version while commit start
uint64_t fversion; // version on saved data file
void * wqueue; // write queue
void * qqueue; // read query queue
void * fqueue; // read fetch/cancel queue
void * wal;
void * tsdb;
int64_t sync;
void * events;
void * cq; // continuous query
int32_t dbCfgVersion;
int32_t vgCfgVersion;
// STsdbCfg tsdbCfg;
#if 0
SSyncCfg syncCfg;
#endif
SWalCfg walCfg;
void * qMgmt;
char * rootDir;
tsem_t sem;
char db[TSDB_ACCT_ID_LEN + TSDB_DB_NAME_LEN];
pthread_mutex_t statusMutex; pthread_mutex_t statusMutex;
} SVnode; } SVnode;
...@@ -97,8 +75,26 @@ typedef struct { ...@@ -97,8 +75,26 @@ typedef struct {
void * qhandle; // used by query and retrieve msg void * qhandle; // used by query and retrieve msg
} SVnRsp; } SVnRsp;
void vnodeSendMsgToDnode(struct SRpcEpSet *epSet, struct SRpcMsg *rpcMsg);
void vnodeSendMsgToMnode(struct SRpcMsg *rpcMsg);
void vnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port); void vnodeGetDnodeEp(int32_t dnodeId, char *ep, char *fqdn, uint16_t *port);
int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId);
int32_t vnodeAlter(SVnode *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeSync(int32_t vgId);
int32_t vnodeClose(int32_t vgId);
void vnodeCleanUp(SVnode *pVnode);
void vnodeDestroy(SVnode *pVnode);
int32_t vnodeCompact(int32_t vgId);
void vnodeBackup(int32_t vgId);
void vnodeGetStatus(struct SStatusMsg *status);
SVnode *vnodeAcquire(int32_t vgId);
SVnode *vnodeAcquireNotClose(int32_t vgId);
void vnodeRelease(SVnode *pVnode);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_MAIN_H_
#define _TD_VNODE_MAIN_H_
#include "vnodeInt.h"
#ifdef __cplusplus
extern "C" {
#endif
int32_t vnodeInitMain();
void vnodeCleanupMain();
int32_t vnodeCreate(SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeDrop(int32_t vgId);
int32_t vnodeOpen(int32_t vgId);
int32_t vnodeAlter(SVnode *pVnode, SCreateVnodeMsg *pVnodeCfg);
int32_t vnodeSync(int32_t vgId);
int32_t vnodeClose(int32_t vgId);
void vnodeCleanUp(SVnode *pVnode);
void vnodeDestroy(SVnode *pVnode);
int32_t vnodeCompact(int32_t vgId);
void vnodeBackup(int32_t vgId);
void vnodeGetStatus(struct SStatusMsg *status);
SVnode *vnodeAcquire(int32_t vgId);
SVnode *vnodeAcquireNotClose(int32_t vgId);
void vnodeRelease(SVnode *pVnode);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_MAIN_H_*/
此差异已折叠。
此差异已折叠。
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "vnodeMain.h"
#include "vnodeMgmt.h" #include "vnodeMgmt.h"
#include "vnodeMgmtMsg.h" #include "vnodeMgmtMsg.h"
......
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "vnodeMain.h"
#include "vnodeMgmtMsg.h" #include "vnodeMgmtMsg.h"
static SCreateVnodeMsg* vnodeParseVnodeMsg(SRpcMsg *rpcMsg) { static SCreateVnodeMsg* vnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
......
...@@ -18,7 +18,7 @@ ...@@ -18,7 +18,7 @@
#include "taosmsg.h" #include "taosmsg.h"
#include "tglobal.h" #include "tglobal.h"
// #include "query.h" // #include "query.h"
#include "vnodeMain.h"
#include "vnodeRead.h" #include "vnodeRead.h"
#include "vnodeReadMsg.h" #include "vnodeReadMsg.h"
#include "vnodeStatus.h" #include "vnodeStatus.h"
...@@ -81,9 +81,9 @@ static int32_t vnodeWriteToRQueue(SVnode *pVnode, void *pCont, int32_t contLen, ...@@ -81,9 +81,9 @@ static int32_t vnodeWriteToRQueue(SVnode *pVnode, void *pCont, int32_t contLen,
atomic_add_fetch_32(&pVnode->queuedRMsg, 1); atomic_add_fetch_32(&pVnode->queuedRMsg, 1);
if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) { if (pRead->code == TSDB_CODE_RPC_NETWORK_UNAVAIL || pRead->msgType == TSDB_MSG_TYPE_FETCH) {
return taosWriteQitem(pVnode->fqueue, qtype, pRead); return taosWriteQitem(pVnode->pFetchQ, qtype, pRead);
} else { } else {
return taosWriteQitem(pVnode->qqueue, qtype, pRead); return taosWriteQitem(pVnode->pQueryQ, qtype, pRead);
} }
} }
......
...@@ -158,7 +158,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) { ...@@ -158,7 +158,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) {
} }
} }
int32_t remain = atomic_add_fetch_32(&pVnode->numOfExistQHandle, 1); int32_t remain = atomic_add_fetch_32(&pVnode->numOfQHandle, 1);
vTrace("vgId:%d, new qhandle created, total qhandle:%d", pVnode->vgId, remain); vTrace("vgId:%d, new qhandle created, total qhandle:%d", pVnode->vgId, remain);
} else { } else {
assert(pCont != NULL); assert(pCont != NULL);
...@@ -203,7 +203,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) { ...@@ -203,7 +203,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SReadMsg *pRead) {
// If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle // If the building of result is not required, simply free it. Otherwise, mandatorily free the qhandle
if (freehandle || (!buildRes)) { if (freehandle || (!buildRes)) {
if (freehandle) { if (freehandle) {
int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1); int32_t remain = atomic_sub_fetch_32(&pVnode->numOfQHandle, 1);
vTrace("vgId:%d, QInfo:%p, start to free qhandle, remain qhandle:%d", pVnode->vgId, *qhandle, remain); vTrace("vgId:%d, QInfo:%p, start to free qhandle, remain qhandle:%d", pVnode->vgId, *qhandle, remain);
} }
...@@ -283,7 +283,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) { ...@@ -283,7 +283,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) {
// kill current query and free corresponding resources. // kill current query and free corresponding resources.
if (pRetrieve->free == 1) { if (pRetrieve->free == 1) {
int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1); int32_t remain = atomic_sub_fetch_32(&pVnode->numOfQHandle, 1);
vWarn("vgId:%d, QInfo:%" PRIx64 "-%p, retrieve msg received to kill query and free qhandle, remain qhandle:%d", vWarn("vgId:%d, QInfo:%" PRIx64 "-%p, retrieve msg received to kill query and free qhandle, remain qhandle:%d",
pVnode->vgId, pRetrieve->qId, *handle, remain); pVnode->vgId, pRetrieve->qId, *handle, remain);
...@@ -297,7 +297,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) { ...@@ -297,7 +297,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) {
// register the qhandle to connect to quit query immediate if connection is broken // register the qhandle to connect to quit query immediate if connection is broken
if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, pRetrieve->qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) { if (vnodeNotifyCurrentQhandle(pRead->rpcHandle, pRetrieve->qId, *handle, pVnode->vgId) != TSDB_CODE_SUCCESS) {
int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1); int32_t remain = atomic_sub_fetch_32(&pVnode->numOfQHandle, 1);
vError("vgId:%d, QInfo:%" PRIu64 "-%p, retrieve discarded since link is broken, conn:%p, remain qhandle:%d", vError("vgId:%d, QInfo:%" PRIu64 "-%p, retrieve discarded since link is broken, conn:%p, remain qhandle:%d",
pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle, remain); pVnode->vgId, pRetrieve->qhandle, *handle, pRead->rpcHandle, remain);
...@@ -334,7 +334,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) { ...@@ -334,7 +334,7 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SReadMsg *pRead) {
// If qhandle is not added into vread queue, the query should be completed already or paused with error. // If qhandle is not added into vread queue, the query should be completed already or paused with error.
// Here free qhandle immediately // Here free qhandle immediately
if (freeHandle) { if (freeHandle) {
int32_t remain = atomic_sub_fetch_32(&pVnode->numOfExistQHandle, 1); int32_t remain = atomic_sub_fetch_32(&pVnode->numOfQHandle, 1);
vTrace("vgId:%d, QInfo:%p, start to free qhandle, remain qhandle:%d", pVnode->vgId, *handle, remain); vTrace("vgId:%d, QInfo:%p, start to free qhandle, remain qhandle:%d", pVnode->vgId, *handle, remain);
qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true); qReleaseQInfo(pVnode->qMgmt, (void **)&handle, true);
} }
......
...@@ -58,10 +58,12 @@ int32_t vnodeReadVersion(SVnode *pVnode) { ...@@ -58,10 +58,12 @@ int32_t vnodeReadVersion(SVnode *pVnode) {
vError("vgId:%d, failed to read %s, version not found", pVnode->vgId, file); vError("vgId:%d, failed to read %s, version not found", pVnode->vgId, file);
goto PARSE_VER_ERROR; goto PARSE_VER_ERROR;
} }
#if 0
pVnode->version = (uint64_t)ver->valueint; pVnode->version = (uint64_t)ver->valueint;
terrno = TSDB_CODE_SUCCESS; terrno = TSDB_CODE_SUCCESS;
vInfo("vgId:%d, read %s successfully, fver:%" PRIu64, pVnode->vgId, file, pVnode->version); vInfo("vgId:%d, read %s successfully, fver:%" PRIu64, pVnode->vgId, file, pVnode->version);
#endif
PARSE_VER_ERROR: PARSE_VER_ERROR:
if (content != NULL) free(content); if (content != NULL) free(content);
...@@ -85,16 +87,17 @@ int32_t vnodeSaveVersion(SVnode *pVnode) { ...@@ -85,16 +87,17 @@ int32_t vnodeSaveVersion(SVnode *pVnode) {
int32_t maxLen = 100; int32_t maxLen = 100;
char * content = calloc(1, maxLen + 1); char * content = calloc(1, maxLen + 1);
#if 0
len += snprintf(content + len, maxLen - len, "{\n"); len += snprintf(content + len, maxLen - len, "{\n");
len += snprintf(content + len, maxLen - len, " \"version\": %" PRIu64 "\n", pVnode->fversion); len += snprintf(content + len, maxLen - len, " \"version\": %" PRIu64 "\n", pVnode->fversion);
len += snprintf(content + len, maxLen - len, "}\n"); len += snprintf(content + len, maxLen - len, "}\n");
#endif
fwrite(content, 1, len, fp); fwrite(content, 1, len, fp);
taosFsyncFile(fileno(fp)); taosFsyncFile(fileno(fp));
fclose(fp); fclose(fp);
free(content); free(content);
terrno = 0; terrno = 0;
vInfo("vgId:%d, successed to write %s, fver:%" PRIu64, pVnode->vgId, file, pVnode->fversion); // vInfo("vgId:%d, successed to write %s, fver:%" PRIu64, pVnode->vgId, file, pVnode->fversion);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
\ No newline at end of file
...@@ -15,7 +15,7 @@ ...@@ -15,7 +15,7 @@
#define _DEFAULT_SOURCE #define _DEFAULT_SOURCE
#include "os.h" #include "os.h"
#include "vnodeMain.h"
#include "vnodeWorker.h" #include "vnodeWorker.h"
enum { CLEANUP_TASK = 0, DESTROY_TASK = 1, BACKUP_TASK = 2 }; enum { CLEANUP_TASK = 0, DESTROY_TASK = 1, BACKUP_TASK = 2 };
......
...@@ -19,7 +19,7 @@ ...@@ -19,7 +19,7 @@
#include "tqueue.h" #include "tqueue.h"
#include "tworker.h" #include "tworker.h"
#include "taosmsg.h" #include "taosmsg.h"
#include "vnodeMain.h"
#include "vnodeStatus.h" #include "vnodeStatus.h"
#include "vnodeWrite.h" #include "vnodeWrite.h"
#include "vnodeWriteMsg.h" #include "vnodeWriteMsg.h"
...@@ -96,7 +96,7 @@ static int32_t vnodeWriteToWQueue(SVnode *pVnode, SWalHead *pHead, int32_t qtype ...@@ -96,7 +96,7 @@ static int32_t vnodeWriteToWQueue(SVnode *pVnode, SWalHead *pHead, int32_t qtype
atomic_add_fetch_32(&tsVwrite.queuedMsgs, 1); atomic_add_fetch_32(&tsVwrite.queuedMsgs, 1);
atomic_add_fetch_32(&pVnode->refCount, 1); atomic_add_fetch_32(&pVnode->refCount, 1);
atomic_add_fetch_32(&pVnode->queuedWMsg, 1); atomic_add_fetch_32(&pVnode->queuedWMsg, 1);
taosWriteQitem(pVnode->wqueue, pWrite->qtype, pWrite); taosWriteQitem(pVnode->pWriteQ, pWrite->qtype, pWrite);
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
...@@ -153,10 +153,10 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t ...@@ -153,10 +153,10 @@ static bool vnodeProcessWriteStart(SVnode *pVnode, SVnWriteMsg *pWrite, int32_t
#if 0 #if 0
pWrite->code = walWrite(pVnode->wal, pHead); pWrite->code = walWrite(pVnode->wal, pHead);
if (pWrite->code < 0) return false; if (pWrite->code < 0) return false;
#endif
pVnode->version = pHead->version;
pVnode->version = pHead->version;
#endif
// write data locally // write data locally
switch (msgType) { switch (msgType) {
case TSDB_MSG_TYPE_SUBMIT: case TSDB_MSG_TYPE_SUBMIT:
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册