Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f8175117
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
f8175117
编写于
4月 14, 2022
作者:
H
Hongze Cheng
提交者:
GitHub
4月 14, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11480 from taosdata/feature/vnode_refact
refactor: vnode
上级
1e265d76
17ecc24e
变更
18
展开全部
隐藏空白更改
内联
并排
Showing
18 changed file
with
509 addition
and
576 deletion
+509
-576
.devcontainer/Dockerfile
.devcontainer/Dockerfile
+1
-1
.devcontainer/devcontainer.json
.devcontainer/devcontainer.json
+1
-1
source/dnode/mgmt/vm/vmInt.c
source/dnode/mgmt/vm/vmInt.c
+1
-1
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+2
-6
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+6
-80
source/dnode/vnode/src/inc/meta.h
source/dnode/vnode/src/inc/meta.h
+41
-18
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+59
-42
source/dnode/vnode/src/inc/tsdb.h
source/dnode/vnode/src/inc/tsdb.h
+139
-117
source/dnode/vnode/src/inc/vnd.h
source/dnode/vnode/src/inc/vnd.h
+92
-0
source/dnode/vnode/src/inc/vnodeInt.h
source/dnode/vnode/src/inc/vnodeInt.h
+7
-131
source/dnode/vnode/src/meta/metaCache.c
source/dnode/vnode/src/meta/metaCache.c
+0
-40
source/dnode/vnode/src/meta/metaCfg.c
source/dnode/vnode/src/meta/metaCfg.c
+0
-29
source/dnode/vnode/src/meta/metaMain.c
source/dnode/vnode/src/meta/metaMain.c
+0
-22
source/dnode/vnode/src/meta/metaQuery.c
source/dnode/vnode/src/meta/metaQuery.c
+0
-16
source/dnode/vnode/src/meta/metaTbCfg.c
source/dnode/vnode/src/meta/metaTbCfg.c
+0
-48
source/dnode/vnode/src/meta/metaTbTag.c
source/dnode/vnode/src/meta/metaTbTag.c
+0
-16
source/dnode/vnode/src/vnd/vnodeCommit.c
source/dnode/vnode/src/vnd/vnodeCommit.c
+2
-8
source/dnode/vnode/src/vnd/vnodeModule.c
source/dnode/vnode/src/vnd/vnodeModule.c
+158
-0
未找到文件。
.devcontainer/Dockerfile
浏览文件 @
f8175117
...
...
@@ -7,4 +7,4 @@ FROM mcr.microsoft.com/vscode/devcontainers/cpp:0-${VARIANT}
# [Optional] Uncomment this section to install additional packages.
# RUN apt-get update && export DEBIAN_FRONTEND=noninteractive \
# && apt-get -y install --no-install-recommends <your-package-list-here>
RUN
apt-get update
&&
apt-get
-y
install
tree vim
RUN
apt-get update
&&
apt-get
-y
install
tree vim
tmux
.devcontainer/devcontainer.json
浏览文件 @
f8175117
...
...
@@ -32,7 +32,7 @@
//
Use
'forwardPorts'
to
make
a
list
of
ports
inside
the
container
available
locally.
//
"forwardPorts"
:
[],
//
Use
'postCreateCommand'
to
run
commands
after
the
container
is
created.
//
"postCreateCommand"
:
"gcc -v
"
,
"postCreateCommand"
:
"wget https://raw.githubusercontent.com/hzcheng/config/master/.tmux.conf -P /root
"
,
//
Comment
out
connect
as
root
instead.
More
info:
https://aka.ms/vscode-remote/containers/non-root.
"remoteUser"
:
"root"
}
\ No newline at end of file
source/dnode/mgmt/vm/vmInt.c
浏览文件 @
f8175117
...
...
@@ -299,7 +299,7 @@ static int32_t vmInit(SMgmtWrapper *pWrapper) {
goto
_OVER
;
}
if
(
vnodeInit
()
!=
0
)
{
if
(
vnodeInit
(
tsNumOfCommitThreads
)
!=
0
)
{
dError
(
"failed to init vnode since %s"
,
terrstr
());
goto
_OVER
;
}
...
...
source/dnode/vnode/CMakeLists.txt
浏览文件 @
f8175117
...
...
@@ -10,21 +10,17 @@ target_sources(
"src/vnd/vnodeCommit.c"
"src/vnd/vnodeInt.c"
"src/vnd/vnodeMain.c"
"src/vnd/vnodeMgr.c"
"src/vnd/vnodeQuery.c"
"src/vnd/vnodeStateMgr.c"
"src/vnd/vnodeWrite.c"
"src/vnd/vnodeModule.c"
# "src/vnd/vnodeMgr.c"
# meta
# "src/meta/metaBDBImpl.c"
"src/meta/metaCache.c"
"src/meta/metaCfg.c"
"src/meta/metaIdx.c"
"src/meta/metaMain.c"
"src/meta/metaQuery.c"
"src/meta/metaTable.c"
"src/meta/metaTbCfg.c"
"src/meta/metaTbTag.c"
"src/meta/metaTbUid.c"
"src/meta/metaTDBImpl.c"
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
f8175117
...
...
@@ -60,34 +60,15 @@ int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad);
int
vnodeValidateTableHash
(
SVnodeCfg
*
pVnodeOptions
,
char
*
tableFName
);
// meta
typedef
struct
SMeta
SMeta
;
// todo: remove
typedef
struct
SMTbCursor
SMTbCursor
;
// todo: remove
typedef
struct
SMCtbCursor
SMCtbCursor
;
// todo: remove
typedef
struct
SMSmaCursor
SMSmaCursor
;
// todo: remove
#define META_SUPER_TABLE TD_SUPER_TABLE
#define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE
typedef
struct
SMeta
SMeta
;
// todo: remove
typedef
struct
SMTbCursor
SMTbCursor
;
typedef
SVCreateTbReq
STbCfg
;
typedef
SVCreateTSmaReq
SSmaCfg
;
SSchemaWrapper
*
metaGetTableSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
,
bool
isinline
);
STSchema
*
metaGetTbTSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
);
void
*
metaGetSmaInfoByIndex
(
SMeta
*
pMeta
,
int64_t
indexUid
,
bool
isDecode
);
STSmaWrapper
*
metaGetSmaInfoByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
SArray
*
metaGetSmaTbUids
(
SMeta
*
pMeta
,
bool
isDup
);
int
metaGetTbNum
(
SMeta
*
pMeta
);
SMTbCursor
*
metaOpenTbCursor
(
SMeta
*
pMeta
);
void
metaCloseTbCursor
(
SMTbCursor
*
pTbCur
);
char
*
metaTbCursorNext
(
SMTbCursor
*
pTbCur
);
SMCtbCursor
*
metaOpenCtbCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseCtbCurosr
(
SMCtbCursor
*
pCtbCur
);
tb_uid_t
metaCtbCursorNext
(
SMCtbCursor
*
pCtbCur
);
SMSmaCursor
*
metaOpenSmaCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseSmaCursor
(
SMSmaCursor
*
pSmaCur
);
int64_t
metaSmaCursorNext
(
SMSmaCursor
*
pSmaCur
);
SMTbCursor
*
metaOpenTbCursor
(
SMeta
*
pMeta
);
void
metaCloseTbCursor
(
SMTbCursor
*
pTbCur
);
char
*
metaTbCursorNext
(
SMTbCursor
*
pTbCur
);
// tsdb
typedef
struct
STsdb
STsdb
;
...
...
@@ -98,18 +79,7 @@ typedef void *tsdbReaderT;
#define BLOCK_LOAD_OFFSET_SEQ_ORDER 1
#define BLOCK_LOAD_TABLE_SEQ_ORDER 2
#define BLOCK_LOAD_TABLE_RR_ORDER 3
#define TABLE_TID(t) (t)->tid
#define TABLE_UID(t) (t)->uid
STsdb
*
tsdbOpen
(
const
char
*
path
,
int32_t
vgId
,
const
STsdbCfg
*
pTsdbCfg
,
SMemAllocatorFactory
*
pMAF
,
SMeta
*
pMeta
,
STfs
*
pTfs
);
void
tsdbClose
(
STsdb
*
);
void
tsdbRemove
(
const
char
*
path
);
int
tsdbInsertData
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
SSubmitRsp
*
pRsp
);
int
tsdbPrepareCommit
(
STsdb
*
pTsdb
);
int
tsdbCommit
(
STsdb
*
pTsdb
);
int32_t
tsdbInitSma
(
STsdb
*
pTsdb
);
int32_t
tsdbCreateTSma
(
STsdb
*
pTsdb
,
char
*
pMsg
);
int32_t
tsdbDropTSma
(
STsdb
*
pTsdb
,
char
*
pMsg
);
tsdbReaderT
*
tsdbQueryTables
(
STsdb
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
tableInfoGroup
,
uint64_t
qId
,
uint64_t
taskId
);
tsdbReaderT
tsdbQueryCacheLast
(
STsdb
*
tsdb
,
STsdbQueryCond
*
pCond
,
STableGroupInfo
*
groupList
,
uint64_t
qId
,
...
...
@@ -127,18 +97,8 @@ SArray *tsdbRetrieveDataBlock(tsdbReaderT *pTsdbReadHandle, SArray *pColumn
void
tsdbDestroyTableGroup
(
STableGroupInfo
*
pGroupList
);
int32_t
tsdbGetOneTableGroup
(
void
*
pMeta
,
uint64_t
uid
,
TSKEY
startKey
,
STableGroupInfo
*
pGroupInfo
);
int32_t
tsdbGetTableGroupFromIdList
(
STsdb
*
tsdb
,
SArray
*
pTableIdList
,
STableGroupInfo
*
pGroupInfo
);
void
tsdbCleanupReadHandle
(
tsdbReaderT
queryHandle
);
int32_t
tsdbUpdateSmaWindow
(
STsdb
*
pTsdb
,
SSubmitReq
*
pMsg
,
int64_t
version
);
int32_t
tsdbInsertTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
,
const
char
*
msg
);
int32_t
tsdbDropTSmaData
(
STsdb
*
pTsdb
,
int64_t
indexUid
);
int32_t
tsdbInsertRSmaData
(
STsdb
*
pTsdb
,
char
*
msg
);
// tq
enum
{
TQ_STREAM_TOKEN__DATA
=
1
,
TQ_STREAM_TOKEN__WATERMARK
,
TQ_STREAM_TOKEN__CHECKPOINT
,
};
typedef
struct
STqReadHandle
STqReadHandle
;
...
...
@@ -153,9 +113,6 @@ int tqRetrieveDataBlockInfo(STqReadHandle *pHandle, SDataBlockInfo *pBlockIn
SArray
*
tqRetrieveDataBlock
(
STqReadHandle
*
pHandle
);
// need to reposition
typedef
struct
SMgmtWrapper
SMgmtWrapper
;
int32_t
tdScanAndConvertSubmitMsg
(
SSubmitReq
*
pMsg
);
// structs
struct
SMetaCfg
{
...
...
@@ -202,21 +159,6 @@ struct SVnodeCfg {
int8_t
hashMethod
;
};
struct
STqReadHandle
{
int64_t
ver
;
int64_t
tbUid
;
SHashObj
*
tbIdHash
;
const
SSubmitReq
*
pMsg
;
SSubmitBlk
*
pBlock
;
SSubmitMsgIter
msgIter
;
SSubmitBlkIter
blkIter
;
SMeta
*
pVnodeMeta
;
SArray
*
pColIdList
;
// SArray<int32_t>
int32_t
sver
;
SSchemaWrapper
*
pSchemaWrapper
;
STSchema
*
pSchema
;
};
struct
SDataStatis
{
int16_t
colId
;
int16_t
maxIndex
;
...
...
@@ -241,22 +183,6 @@ typedef struct {
uint64_t
uid
;
}
STableKeyInfo
;
typedef
struct
STable
{
uint64_t
tid
;
uint64_t
uid
;
STSchema
*
pSchema
;
}
STable
;
typedef
struct
{
int8_t
type
;
int8_t
reserved
[
7
];
union
{
void
*
data
;
int64_t
wmTs
;
int64_t
checkpointId
;
};
}
STqStreamToken
;
#ifdef __cplusplus
}
#endif
...
...
source/dnode/vnode/src/inc/meta.h
浏览文件 @
f8175117
...
...
@@ -20,20 +20,48 @@
extern
"C"
{
#endif
typedef
struct
SMetaCache
SMetaCache
;
typedef
struct
SMetaIdx
SMetaIdx
;
typedef
struct
SMetaDB
SMetaDB
;
typedef
struct
SMetaCache
SMetaCache
;
typedef
struct
SMetaIdx
SMetaIdx
;
typedef
struct
SMetaDB
SMetaDB
;
typedef
struct
SMCtbCursor
SMCtbCursor
;
typedef
struct
SMSmaCursor
SMSmaCursor
;
SMeta
*
metaOpen
(
const
char
*
path
,
const
SMetaCfg
*
pMetaCfg
,
SMemAllocatorFactory
*
pMAF
);
void
metaClose
(
SMeta
*
pMeta
);
void
metaRemove
(
const
char
*
path
);
int
metaCreateTable
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
);
int
metaDropTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
int
metaCommit
(
SMeta
*
pMeta
);
int32_t
metaCreateTSma
(
SMeta
*
pMeta
,
SSmaCfg
*
pCfg
);
int32_t
metaDropTSma
(
SMeta
*
pMeta
,
int64_t
indexUid
);
STbCfg
*
metaGetTbInfoByUid
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
STbCfg
*
metaGetTbInfoByName
(
SMeta
*
pMeta
,
char
*
tbname
,
tb_uid_t
*
uid
);
// metaDebug ==================
// clang-format off
#define metaFatal(...) do { if (metaDebugFlag & DEBUG_FATAL) { taosPrintLog("META FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define metaError(...) do { if (metaDebugFlag & DEBUG_ERROR) { taosPrintLog("META ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define metaWarn(...) do { if (metaDebugFlag & DEBUG_WARN) { taosPrintLog("META WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define metaInfo(...) do { if (metaDebugFlag & DEBUG_INFO) { taosPrintLog("META ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define metaDebug(...) do { if (metaDebugFlag & DEBUG_DEBUG) { taosPrintLog("META ", DEBUG_DEBUG, metaDebugFlag, __VA_ARGS__); }} while(0)
#define metaTrace(...) do { if (metaDebugFlag & DEBUG_TRACE) { taosPrintLog("META ", DEBUG_TRACE, metaDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
#define META_SUPER_TABLE TD_SUPER_TABLE
#define META_CHILD_TABLE TD_CHILD_TABLE
#define META_NORMAL_TABLE TD_NORMAL_TABLE
SMeta
*
metaOpen
(
const
char
*
path
,
const
SMetaCfg
*
pMetaCfg
,
SMemAllocatorFactory
*
pMAF
);
void
metaClose
(
SMeta
*
pMeta
);
void
metaRemove
(
const
char
*
path
);
int
metaCreateTable
(
SMeta
*
pMeta
,
STbCfg
*
pTbCfg
);
int
metaDropTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
int
metaCommit
(
SMeta
*
pMeta
);
int32_t
metaCreateTSma
(
SMeta
*
pMeta
,
SSmaCfg
*
pCfg
);
int32_t
metaDropTSma
(
SMeta
*
pMeta
,
int64_t
indexUid
);
STbCfg
*
metaGetTbInfoByUid
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
STbCfg
*
metaGetTbInfoByName
(
SMeta
*
pMeta
,
char
*
tbname
,
tb_uid_t
*
uid
);
SSchemaWrapper
*
metaGetTableSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
,
bool
isinline
);
STSchema
*
metaGetTbTSchema
(
SMeta
*
pMeta
,
tb_uid_t
uid
,
int32_t
sver
);
void
*
metaGetSmaInfoByIndex
(
SMeta
*
pMeta
,
int64_t
indexUid
,
bool
isDecode
);
STSmaWrapper
*
metaGetSmaInfoByTable
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
SArray
*
metaGetSmaTbUids
(
SMeta
*
pMeta
,
bool
isDup
);
int
metaGetTbNum
(
SMeta
*
pMeta
);
SMSmaCursor
*
metaOpenSmaCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseSmaCursor
(
SMSmaCursor
*
pSmaCur
);
int64_t
metaSmaCursorNext
(
SMSmaCursor
*
pSmaCur
);
SMCtbCursor
*
metaOpenCtbCursor
(
SMeta
*
pMeta
,
tb_uid_t
uid
);
void
metaCloseCtbCurosr
(
SMCtbCursor
*
pCtbCur
);
tb_uid_t
metaCtbCursorNext
(
SMCtbCursor
*
pCtbCur
);
// SMetaDB
int
metaOpenDB
(
SMeta
*
pMeta
);
...
...
@@ -47,11 +75,6 @@ int metaRemoveSmaFromDb(SMeta* pMeta, int64_t indexUid);
int
metaOpenCache
(
SMeta
*
pMeta
);
void
metaCloseCache
(
SMeta
*
pMeta
);
// SMetaCfg
extern
const
SMetaCfg
defaultMetaOptions
;
// int metaValidateOptions(const SMetaCfg*);
void
metaOptionsCopy
(
SMetaCfg
*
pDest
,
const
SMetaCfg
*
pSrc
);
// SMetaIdx
int
metaOpenIdx
(
SMeta
*
pMeta
);
void
metaCloseIdx
(
SMeta
*
pMeta
);
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
f8175117
...
...
@@ -20,48 +20,21 @@
extern
"C"
{
#endif
// tqInt.h
#define tqFatal(...) \
{ \
if (tqDebugFlag & DEBUG_FATAL) { \
taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
} \
}
#define tqError(...) \
{ \
if (tqDebugFlag & DEBUG_ERROR) { \
taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
} \
}
#define tqWarn(...) \
{ \
if (tqDebugFlag & DEBUG_WARN) { \
taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
} \
}
#define tqInfo(...) \
{ \
if (tqDebugFlag & DEBUG_INFO) { \
taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); \
} \
}
#define tqDebug(...) \
{ \
if (tqDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); \
} \
}
#define tqTrace(...) \
{ \
if (tqDebugFlag & DEBUG_TRACE) { \
taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); \
} \
}
// tqDebug ===================
// clang-format off
#define tqFatal(...) do { if (tqDebugFlag & DEBUG_FATAL) { taosPrintLog("TQ FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define tqError(...) do { if (tqDebugFlag & DEBUG_ERROR) { taosPrintLog("TQ ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define tqWarn(...) do { if (tqDebugFlag & DEBUG_WARN) { taosPrintLog("TQ WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define tqInfo(...) do { if (tqDebugFlag & DEBUG_INFO) { taosPrintLog("TQ ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define tqDebug(...) do { if (tqDebugFlag & DEBUG_DEBUG) { taosPrintLog("TQ ", DEBUG_DEBUG, tqDebugFlag, __VA_ARGS__); }} while(0)
#define tqTrace(...) do { if (tqDebugFlag & DEBUG_TRACE) { taosPrintLog("TQ ", DEBUG_TRACE, tqDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
enum
{
TQ_STREAM_TOKEN__DATA
=
1
,
TQ_STREAM_TOKEN__WATERMARK
,
TQ_STREAM_TOKEN__CHECKPOINT
,
};
#define TQ_BUFFER_SIZE 4
...
...
@@ -105,6 +78,31 @@ typedef enum { TQ_ITEM_READY, TQ_ITEM_PROCESS, TQ_ITEM_EMPTY } STqItemStatus;
typedef
struct
STqOffsetCfg
STqOffsetCfg
;
typedef
struct
STqOffsetStore
STqOffsetStore
;
struct
STqReadHandle
{
int64_t
ver
;
int64_t
tbUid
;
SHashObj
*
tbIdHash
;
const
SSubmitReq
*
pMsg
;
SSubmitBlk
*
pBlock
;
SSubmitMsgIter
msgIter
;
SSubmitBlkIter
blkIter
;
SMeta
*
pVnodeMeta
;
SArray
*
pColIdList
;
// SArray<int32_t>
int32_t
sver
;
SSchemaWrapper
*
pSchemaWrapper
;
STSchema
*
pSchema
;
};
typedef
struct
{
int8_t
type
;
int8_t
reserved
[
7
];
union
{
void
*
data
;
int64_t
wmTs
;
int64_t
checkpointId
;
};
}
STqStreamToken
;
typedef
struct
{
int16_t
ver
;
int16_t
action
;
...
...
@@ -248,6 +246,25 @@ typedef struct {
static
STqPushMgmt
tqPushMgmt
;
// init once
int
tqInit
();
void
tqCleanUp
();
// open in each vnode
STQ
*
tqOpen
(
const
char
*
path
,
SVnode
*
pVnode
,
SWal
*
pWal
,
SMeta
*
pMeta
,
STqCfg
*
tqConfig
,
SMemAllocatorFactory
*
allocFac
);
void
tqClose
(
STQ
*
);
// required by vnode
int
tqPushMsg
(
STQ
*
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
version
);
int
tqCommit
(
STQ
*
);
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
);
int32_t
tqProcessSetConnReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessRebReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessCancelConnReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
,
int32_t
workerId
);
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
void
*
data
,
int32_t
dataLen
,
int32_t
workerId
);
int32_t
tqSerializeConsumer
(
const
STqConsumer
*
,
STqSerializedHead
**
);
int32_t
tqDeserializeConsumer
(
STQ
*
,
const
STqSerializedHead
*
,
STqConsumer
**
);
...
...
source/dnode/vnode/src/inc/tsdb.h
浏览文件 @
f8175117
此差异已折叠。
点击以展开。
source/dnode/vnode/src/inc/vnd.h
0 → 100644
浏览文件 @
f8175117
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VND_H_
#define _TD_VND_H_
#ifdef __cplusplus
extern
"C"
{
#endif
// vnodeDebug ====================
// clang-format off
#define vFatal(...) do { if (vDebugFlag & DEBUG_FATAL) { taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); }} while(0)
#define vError(...) do { if (vDebugFlag & DEBUG_ERROR) { taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); }} while(0)
#define vWarn(...) do { if (vDebugFlag & DEBUG_WARN) { taosPrintLog("VND WARN ", DEBUG_WARN, 255, __VA_ARGS__); }} while(0)
#define vInfo(...) do { if (vDebugFlag & DEBUG_INFO) { taosPrintLog("VND ", DEBUG_INFO, 255, __VA_ARGS__); }} while(0)
#define vDebug(...) do { if (vDebugFlag & DEBUG_DEBUG) { taosPrintLog("VND ", DEBUG_DEBUG, vDebugFlag, __VA_ARGS__); }} while(0)
#define vTrace(...) do { if (vDebugFlag & DEBUG_TRACE) { taosPrintLog("VND ", DEBUG_TRACE, vDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
// vnodeModule ====================
int
vnodeScheduleTask
(
int
(
*
execute
)(
void
*
),
void
*
arg
);
// vnodeQuery ====================
int
vnodeQueryOpen
(
SVnode
*
pVnode
);
void
vnodeQueryClose
(
SVnode
*
pVnode
);
#if 1
// SVBufPool
int
vnodeOpenBufPool
(
SVnode
*
pVnode
);
void
vnodeCloseBufPool
(
SVnode
*
pVnode
);
int
vnodeBufPoolSwitch
(
SVnode
*
pVnode
);
int
vnodeBufPoolRecycle
(
SVnode
*
pVnode
);
void
*
vnodeMalloc
(
SVnode
*
pVnode
,
uint64_t
size
);
bool
vnodeBufPoolIsFull
(
SVnode
*
pVnode
);
SMemAllocatorFactory
*
vBufPoolGetMAF
(
SVnode
*
pVnode
);
// SVMemAllocator
typedef
struct
SVArenaNode
{
TD_SLIST_NODE
(
SVArenaNode
);
uint64_t
size
;
// current node size
void
*
ptr
;
char
data
[];
}
SVArenaNode
;
typedef
struct
SVMemAllocator
{
T_REF_DECLARE
()
TD_DLIST_NODE
(
SVMemAllocator
);
uint64_t
capacity
;
uint64_t
ssize
;
uint64_t
lsize
;
SVArenaNode
*
pNode
;
TD_SLIST
(
SVArenaNode
)
nlist
;
}
SVMemAllocator
;
SVMemAllocator
*
vmaCreate
(
uint64_t
capacity
,
uint64_t
ssize
,
uint64_t
lsize
);
void
vmaDestroy
(
SVMemAllocator
*
pVMA
);
void
vmaReset
(
SVMemAllocator
*
pVMA
);
void
*
vmaMalloc
(
SVMemAllocator
*
pVMA
,
uint64_t
size
);
void
vmaFree
(
SVMemAllocator
*
pVMA
,
void
*
ptr
);
bool
vmaIsFull
(
SVMemAllocator
*
pVMA
);
// vnodeCfg.h
extern
const
SVnodeCfg
defaultVnodeOptions
;
int
vnodeValidateOptions
(
const
SVnodeCfg
*
);
void
vnodeOptionsCopy
(
SVnodeCfg
*
pDest
,
const
SVnodeCfg
*
pSrc
);
// For commit
#define vnodeShouldCommit vnodeBufPoolIsFull
int
vnodeSyncCommit
(
SVnode
*
pVnode
);
int
vnodeAsyncCommit
(
SVnode
*
pVnode
);
#endif
#ifdef __cplusplus
}
#endif
#endif
/*_TD_VND_H_*/
\ No newline at end of file
source/dnode/vnode/src/inc/vnodeInt.h
浏览文件 @
f8175117
...
...
@@ -35,36 +35,21 @@
#include "tstream.h"
#include "ttime.h"
#include "ttimer.h"
#include "vnode.h"
#include "wal.h"
#include "vnode.h"
#ifdef __cplusplus
extern
"C"
{
#endif
typedef
struct
STQ
STQ
;
typedef
struct
SMeta
SMeta
;
typedef
struct
STsdb
STsdb
;
typedef
struct
STQ
STQ
;
typedef
struct
SVState
SVState
;
typedef
struct
SVBufPool
SVBufPool
;
typedef
struct
SQWorkerMgmt
SQHandle
;
typedef
struct
SVnodeTask
{
TD_DLIST_NODE
(
SVnodeTask
);
void
*
arg
;
int
(
*
execute
)(
void
*
);
}
SVnodeTask
;
typedef
struct
SVnodeMgr
{
td_mode_flag_t
vnodeInitFlag
;
// For commit
bool
stop
;
uint16_t
nthreads
;
TdThread
*
threads
;
TdThreadMutex
mutex
;
TdThreadCond
hasTask
;
TD_DLIST
(
SVnodeTask
)
queue
;
}
SVnodeMgr
;
typedef
struct
{
int8_t
streamType
;
// sma or other
int8_t
dstType
;
...
...
@@ -80,8 +65,6 @@ typedef struct {
SHashObj
*
pHash
;
// streamId -> SStreamSinkInfo
}
SSink
;
extern
SVnodeMgr
vnodeMgr
;
// SVState
struct
SVState
{
int64_t
processed
;
...
...
@@ -106,118 +89,11 @@ struct SVnode {
STfs
*
pTfs
;
};
int
vnodeScheduleTask
(
SVnodeTask
*
task
);
int
vnodeQueryOpen
(
SVnode
*
pVnode
);
void
vnodeQueryClose
(
SVnode
*
pVnode
);
#define vFatal(...) \
do { \
if (vDebugFlag & DEBUG_FATAL) { \
taosPrintLog("VND FATAL ", DEBUG_FATAL, 255, __VA_ARGS__); \
} \
} while (0)
#define vError(...) \
do { \
if (vDebugFlag & DEBUG_ERROR) { \
taosPrintLog("VND ERROR ", DEBUG_ERROR, 255, __VA_ARGS__); \
} \
} while (0)
#define vWarn(...) \
do { \
if (vDebugFlag & DEBUG_WARN) { \
taosPrintLog("VND WARN ", DEBUG_WARN, 255, __VA_ARGS__); \
} \
} while (0)
#define vInfo(...) \
do { \
if (vDebugFlag & DEBUG_INFO) { \
taosPrintLog("VND ", DEBUG_INFO, 255, __VA_ARGS__); \
} \
} while (0)
#define vDebug(...) \
do { \
if (vDebugFlag & DEBUG_DEBUG) { \
taosPrintLog("VND ", DEBUG_DEBUG, tsdbDebugFlag, __VA_ARGS__); \
} \
} while (0)
#define vTrace(...) \
do { \
if (vDebugFlag & DEBUG_TRACE) { \
taosPrintLog("VND ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); \
} \
} while (0)
// vnodeCfg.h
extern
const
SVnodeCfg
defaultVnodeOptions
;
int
vnodeValidateOptions
(
const
SVnodeCfg
*
);
void
vnodeOptionsCopy
(
SVnodeCfg
*
pDest
,
const
SVnodeCfg
*
pSrc
);
// For commit
#define vnodeShouldCommit vnodeBufPoolIsFull
int
vnodeSyncCommit
(
SVnode
*
pVnode
);
int
vnodeAsyncCommit
(
SVnode
*
pVnode
);
// SVBufPool
int
vnodeOpenBufPool
(
SVnode
*
pVnode
);
void
vnodeCloseBufPool
(
SVnode
*
pVnode
);
int
vnodeBufPoolSwitch
(
SVnode
*
pVnode
);
int
vnodeBufPoolRecycle
(
SVnode
*
pVnode
);
void
*
vnodeMalloc
(
SVnode
*
pVnode
,
uint64_t
size
);
bool
vnodeBufPoolIsFull
(
SVnode
*
pVnode
);
SMemAllocatorFactory
*
vBufPoolGetMAF
(
SVnode
*
pVnode
);
// SVMemAllocator
typedef
struct
SVArenaNode
{
TD_SLIST_NODE
(
SVArenaNode
);
uint64_t
size
;
// current node size
void
*
ptr
;
char
data
[];
}
SVArenaNode
;
typedef
struct
SVMemAllocator
{
T_REF_DECLARE
()
TD_DLIST_NODE
(
SVMemAllocator
);
uint64_t
capacity
;
uint64_t
ssize
;
uint64_t
lsize
;
SVArenaNode
*
pNode
;
TD_SLIST
(
SVArenaNode
)
nlist
;
}
SVMemAllocator
;
SVMemAllocator
*
vmaCreate
(
uint64_t
capacity
,
uint64_t
ssize
,
uint64_t
lsize
);
void
vmaDestroy
(
SVMemAllocator
*
pVMA
);
void
vmaReset
(
SVMemAllocator
*
pVMA
);
void
*
vmaMalloc
(
SVMemAllocator
*
pVMA
,
uint64_t
size
);
void
vmaFree
(
SVMemAllocator
*
pVMA
,
void
*
ptr
);
bool
vmaIsFull
(
SVMemAllocator
*
pVMA
);
// init once
int
tqInit
();
void
tqCleanUp
();
// open in each vnode
STQ
*
tqOpen
(
const
char
*
path
,
SVnode
*
pVnode
,
SWal
*
pWal
,
SMeta
*
pMeta
,
STqCfg
*
tqConfig
,
SMemAllocatorFactory
*
allocFac
);
void
tqClose
(
STQ
*
);
// required by vnode
int
tqPushMsg
(
STQ
*
,
void
*
msg
,
int32_t
msgLen
,
tmsg_t
msgType
,
int64_t
version
);
int
tqCommit
(
STQ
*
);
int32_t
tqProcessPollReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
,
int32_t
workerId
);
int32_t
tqProcessSetConnReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessRebReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessCancelConnReq
(
STQ
*
pTq
,
char
*
msg
);
int32_t
tqProcessTaskExec
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
,
int32_t
workerId
);
int32_t
tqProcessTaskDeploy
(
STQ
*
pTq
,
char
*
msg
,
int32_t
msgLen
);
int32_t
tqProcessStreamTrigger
(
STQ
*
pTq
,
void
*
data
,
int32_t
dataLen
,
int32_t
workerId
);
// sma
void
smaHandleRes
(
void
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
);
#include "vnd.h"
#include "meta.h"
#include "tsdb.h"
...
...
source/dnode/vnode/src/meta/metaCache.c
已删除
100644 → 0
浏览文件 @
1e265d76
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
struct
SMetaCache
{
// TODO
};
int
metaOpenCache
(
SMeta
*
pMeta
)
{
// TODO
// if (pMeta->options.lruSize) {
// pMeta->pCache = rocksdb_cache_create_lru(pMeta->options.lruSize);
// if (pMeta->pCache == NULL) {
// // TODO: handle error
// return -1;
// }
// }
return
0
;
}
void
metaCloseCache
(
SMeta
*
pMeta
)
{
// if (pMeta->pCache) {
// rocksdb_cache_destroy(pMeta->pCache);
// pMeta->pCache = NULL;
// }
}
\ No newline at end of file
source/dnode/vnode/src/meta/metaCfg.c
已删除
100644 → 0
浏览文件 @
1e265d76
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
const
SMetaCfg
defaultMetaOptions
=
{.
lruSize
=
0
};
/* ------------------------ EXPOSED METHODS ------------------------ */
int
metaValidateOptions
(
const
SMetaCfg
*
pMetaOptions
)
{
// TODO
return
0
;
}
void
metaOptionsCopy
(
SMetaCfg
*
pDest
,
const
SMetaCfg
*
pSrc
)
{
memcpy
(
pDest
,
pSrc
,
sizeof
(
*
pSrc
));
}
/* ------------------------ STATIC METHODS ------------------------ */
\ No newline at end of file
source/dnode/vnode/src/meta/metaMain.c
浏览文件 @
f8175117
...
...
@@ -25,17 +25,6 @@ static void metaCloseImpl(SMeta *pMeta);
SMeta
*
metaOpen
(
const
char
*
path
,
const
SMetaCfg
*
pMetaCfg
,
SMemAllocatorFactory
*
pMAF
)
{
SMeta
*
pMeta
=
NULL
;
// Set default options
if
(
pMetaCfg
==
NULL
)
{
pMetaCfg
=
&
defaultMetaOptions
;
}
// // Validate the options
// if (metaValidateOptions(pMetaCfg) < 0) {
// // TODO: deal with error
// return NULL;
// }
// Allocate handle
pMeta
=
metaNew
(
path
,
pMetaCfg
,
pMAF
);
if
(
pMeta
==
NULL
)
{
...
...
@@ -80,9 +69,6 @@ static SMeta *metaNew(const char *path, const SMetaCfg *pMetaCfg, SMemAllocatorF
return
NULL
;
}
metaOptionsCopy
(
&
(
pMeta
->
options
),
pMetaCfg
);
pMeta
->
pmaf
=
pMAF
;
return
pMeta
;
};
...
...
@@ -94,13 +80,6 @@ static void metaFree(SMeta *pMeta) {
}
static
int
metaOpenImpl
(
SMeta
*
pMeta
)
{
// Open meta cache
if
(
metaOpenCache
(
pMeta
)
<
0
)
{
// TODO: handle error
metaCloseImpl
(
pMeta
);
return
-
1
;
}
// Open meta db
if
(
metaOpenDB
(
pMeta
)
<
0
)
{
// TODO: handle error
...
...
@@ -129,5 +108,4 @@ static void metaCloseImpl(SMeta *pMeta) {
metaCloseUidGnrt
(
pMeta
);
metaCloseIdx
(
pMeta
);
metaCloseDB
(
pMeta
);
metaCloseCache
(
pMeta
);
}
\ No newline at end of file
source/dnode/vnode/src/meta/metaQuery.c
已删除
100644 → 0
浏览文件 @
1e265d76
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
\ No newline at end of file
source/dnode/vnode/src/meta/metaTbCfg.c
已删除
100644 → 0
浏览文件 @
1e265d76
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
int
metaValidateTbCfg
(
SMeta
*
pMeta
,
const
STbCfg
*
pTbOptions
)
{
// TODO
return
0
;
}
size_t
metaEncodeTbObjFromTbOptions
(
const
STbCfg
*
pTbOptions
,
void
*
pBuf
,
size_t
bsize
)
{
void
**
ppBuf
=
&
pBuf
;
int
tlen
=
0
;
tlen
+=
taosEncodeFixedU8
(
ppBuf
,
pTbOptions
->
type
);
tlen
+=
taosEncodeString
(
ppBuf
,
pTbOptions
->
name
);
tlen
+=
taosEncodeFixedU32
(
ppBuf
,
pTbOptions
->
ttl
);
switch
(
pTbOptions
->
type
)
{
case
META_SUPER_TABLE
:
tlen
+=
taosEncodeFixedU64
(
ppBuf
,
pTbOptions
->
stbCfg
.
suid
);
tlen
+=
tdEncodeSchema
(
ppBuf
,
(
STSchema
*
)
pTbOptions
->
stbCfg
.
pTagSchema
);
// TODO: encode schema version array
break
;
case
META_CHILD_TABLE
:
tlen
+=
taosEncodeFixedU64
(
ppBuf
,
pTbOptions
->
ctbCfg
.
suid
);
break
;
case
META_NORMAL_TABLE
:
// TODO: encode schema version array
break
;
default:
break
;
}
return
tlen
;
}
\ No newline at end of file
source/dnode/vnode/src/meta/metaTbTag.c
已删除
100644 → 0
浏览文件 @
1e265d76
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnodeInt.h"
\ No newline at end of file
source/dnode/vnode/src/vnd/vnodeCommit.c
浏览文件 @
f8175117
...
...
@@ -24,16 +24,10 @@ int vnodeAsyncCommit(SVnode *pVnode) {
vnodeWaitCommit
(
pVnode
);
vnodeBufPoolSwitch
(
pVnode
);
SVnodeTask
*
pTask
=
(
SVnodeTask
*
)
taosMemoryMalloc
(
sizeof
(
*
pTask
));
pTask
->
execute
=
vnodeCommit
;
// TODO
pTask
->
arg
=
pVnode
;
// TODO
tsdbPrepareCommit
(
pVnode
->
pTsdb
);
// metaPrepareCommit(pVnode->pMeta);
// walPreapareCommit(pVnode->pWal);
vnodeScheduleTask
(
pTask
);
vnodeScheduleTask
(
vnodeCommit
,
pVnode
);
return
0
;
}
...
...
source/dnode/vnode/src/vnd/vnodeM
gr
.c
→
source/dnode/vnode/src/vnd/vnodeM
odule
.c
浏览文件 @
f8175117
...
...
@@ -15,31 +15,55 @@
#include "vnodeInt.h"
SVnodeMgr
vnodeMgr
=
{.
vnodeInitFlag
=
TD_MOD_UNINITIALIZED
};
typedef
struct
SVnodeTask
SVnodeTask
;
struct
SVnodeTask
{
SVnodeTask
*
next
;
SVnodeTask
*
prev
;
int
(
*
execute
)(
void
*
);
void
*
arg
;
};
struct
SVnodeGlobal
{
int8_t
init
;
int8_t
stop
;
int
nthreads
;
TdThread
*
threads
;
TdThreadMutex
mutex
;
TdThreadCond
hasTask
;
SVnodeTask
queue
;
};
struct
SVnodeGlobal
vnodeGlobal
;
static
void
*
loop
(
void
*
arg
);
int
vnodeInit
()
{
if
(
TD_CHECK_AND_SET_MODE_INIT
(
&
(
vnodeMgr
.
vnodeInitFlag
))
==
TD_MOD_INITIALIZED
)
{
int
vnodeInit
(
int
nthreads
)
{
int8_t
init
;
int
ret
;
init
=
atomic_val_compare_exchange_8
(
&
(
vnodeGlobal
.
init
),
0
,
1
);
if
(
init
)
{
return
0
;
}
vnodeMgr
.
stop
=
false
;
vnodeGlobal
.
stop
=
0
;
vnodeGlobal
.
queue
.
next
=
&
vnodeGlobal
.
queue
;
vnodeGlobal
.
queue
.
prev
=
&
vnodeGlobal
.
queue
;
// Start commit handers
vnodeMgr
.
nthreads
=
tsNumOfCommitThreads
;
vnodeMgr
.
threads
=
taosMemoryCalloc
(
vnodeMgr
.
nthreads
,
sizeof
(
TdThread
));
if
(
vnodeMgr
.
threads
==
NULL
)
{
vnodeGlobal
.
nthreads
=
nthreads
;
vnodeGlobal
.
threads
=
taosMemoryCalloc
(
nthreads
,
sizeof
(
TdThread
));
if
(
vnodeGlobal
.
threads
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
vError
(
"failed to init vnode module since: %s"
,
tstrerror
(
terrno
));
return
-
1
;
}
taosThreadMutexInit
(
&
(
vnodeMgr
.
mutex
),
NULL
);
taosThreadCondInit
(
&
(
vnodeMgr
.
hasTask
),
NULL
);
TD_DLIST_INIT
(
&
(
vnodeMgr
.
queue
));
taosThreadMutexInit
(
&
vnodeGlobal
.
mutex
,
NULL
);
taosThreadCondInit
(
&
vnodeGlobal
.
hasTask
,
NULL
);
for
(
uint16_t
i
=
0
;
i
<
vnodeMgr
.
nthreads
;
i
++
)
{
taosThreadCreate
(
&
(
vnodeMgr
.
threads
[
i
]),
NULL
,
loop
,
NULL
);
// pthread_setname_np(vnodeMgr.threads[i], "VND Commit Thread");
for
(
int
i
=
0
;
i
<
nthreads
;
i
++
)
{
taosThreadCreate
(
&
(
vnodeGlobal
.
threads
[
i
]),
NULL
,
loop
,
NULL
);
}
if
(
walInit
()
<
0
)
{
...
...
@@ -50,62 +74,83 @@ int vnodeInit() {
}
void
vnodeCleanup
()
{
if
(
TD_CHECK_AND_SET_MOD_CLEAR
(
&
(
vnodeMgr
.
vnodeInitFlag
))
==
TD_MOD_UNINITIALIZED
)
{
return
;
}
int8_t
init
;
// Stop commit handler
taosThreadMutexLock
(
&
(
vnodeMgr
.
mutex
));
vnodeMgr
.
stop
=
true
;
taosThreadCondBroadcast
(
&
(
vnodeMgr
.
hasTask
));
taosThreadMutexUnlock
(
&
(
vnodeMgr
.
mutex
));
init
=
atomic_val_compare_exchange_8
(
&
(
vnodeGlobal
.
init
),
1
,
0
);
if
(
init
==
0
)
return
;
for
(
uint16_t
i
=
0
;
i
<
vnodeMgr
.
nthreads
;
i
++
)
{
taosThreadJoin
(
vnodeMgr
.
threads
[
i
],
NULL
);
// set stop
taosThreadMutexLock
(
&
(
vnodeGlobal
.
mutex
));
vnodeGlobal
.
stop
=
1
;
taosThreadCondBroadcast
(
&
(
vnodeGlobal
.
hasTask
));
taosThreadMutexUnlock
(
&
(
vnodeGlobal
.
mutex
));
// wait for threads
for
(
int
i
=
0
;
i
<
vnodeGlobal
.
nthreads
;
i
++
)
{
taosThreadJoin
(
vnodeGlobal
.
threads
[
i
],
NULL
);
}
taosMemoryFreeClear
(
vnodeMgr
.
threads
);
taosThreadCondDestroy
(
&
(
vnodeMgr
.
hasTask
));
taosThreadMutexDestroy
(
&
(
vnodeMgr
.
mutex
));
// clear source
taosMemoryFreeClear
(
vnodeGlobal
.
threads
);
taosThreadCondDestroy
(
&
(
vnodeGlobal
.
hasTask
));
taosThreadMutexDestroy
(
&
(
vnodeGlobal
.
mutex
));
}
int
vnodeScheduleTask
(
SVnodeTask
*
pTask
)
{
taosThreadMutexLock
(
&
(
vnodeMgr
.
mutex
))
;
int
vnodeScheduleTask
(
int
(
*
execute
)(
void
*
),
void
*
arg
)
{
SVnodeTask
*
pTask
;
TD_DLIST_APPEND
(
&
(
vnodeMgr
.
queue
),
pTask
);
ASSERT
(
!
vnodeGlobal
.
stop
);
taosThreadCondSignal
(
&
(
vnodeMgr
.
hasTask
));
pTask
=
taosMemoryMalloc
(
sizeof
(
*
pTask
));
if
(
pTask
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pTask
->
execute
=
execute
;
pTask
->
arg
=
arg
;
taosThreadMutexUnlock
(
&
(
vnodeMgr
.
mutex
));
taosThreadMutexLock
(
&
(
vnodeGlobal
.
mutex
));
pTask
->
next
=
&
vnodeGlobal
.
queue
;
pTask
->
prev
=
vnodeGlobal
.
queue
.
prev
;
vnodeGlobal
.
queue
.
prev
->
next
=
pTask
;
vnodeGlobal
.
queue
.
prev
=
pTask
;
taosThreadCondSignal
(
&
(
vnodeGlobal
.
hasTask
));
taosThreadMutexUnlock
(
&
(
vnodeGlobal
.
mutex
));
return
0
;
}
/* ------------------------ STATIC METHODS ------------------------ */
static
void
*
loop
(
void
*
arg
)
{
SVnodeTask
*
pTask
;
int
ret
;
setThreadName
(
"vnode-commit"
);
SVnodeTask
*
pTask
;
for
(;;)
{
taosThreadMutexLock
(
&
(
vnode
Mgr
.
mutex
));
taosThreadMutexLock
(
&
(
vnode
Global
.
mutex
));
for
(;;)
{
pTask
=
TD_DLIST_HEAD
(
&
(
vnodeMgr
.
queue
));
if
(
pTask
==
NULL
)
{
if
(
vnodeMgr
.
stop
)
{
taosThreadMutexUnlock
(
&
(
vnodeMgr
.
mutex
));
pTask
=
vnodeGlobal
.
queue
.
next
;
if
(
pTask
==
&
vnodeGlobal
.
queue
)
{
// no task
if
(
vnodeGlobal
.
stop
)
{
taosThreadMutexUnlock
(
&
(
vnodeGlobal
.
mutex
));
return
NULL
;
}
else
{
taosThreadCondWait
(
&
(
vnode
Mgr
.
hasTask
),
&
(
vnodeMgr
.
mutex
));
taosThreadCondWait
(
&
(
vnode
Global
.
hasTask
),
&
(
vnodeGlobal
.
mutex
));
}
}
else
{
TD_DLIST_POP
(
&
(
vnodeMgr
.
queue
),
pTask
);
// has task
pTask
->
prev
->
next
=
pTask
->
next
;
pTask
->
next
->
prev
=
pTask
->
prev
;
break
;
}
}
taosThreadMutexUnlock
(
&
(
vnode
Mgr
.
mutex
));
taosThreadMutexUnlock
(
&
(
vnode
Global
.
mutex
));
(
*
(
pTask
->
execute
))
(
pTask
->
arg
);
pTask
->
execute
(
pTask
->
arg
);
taosMemoryFree
(
pTask
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录