Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
00e09623
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
00e09623
编写于
4月 05, 2020
作者:
J
Jeff Tao
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
head file is messed up
vnodeWrite.c does not include dnode.h
上级
e10186e8
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
69 addition
and
76 deletion
+69
-76
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+6
-56
src/dnode/src/dnodeRead.c
src/dnode/src/dnodeRead.c
+3
-1
src/dnode/src/dnodeWrite.c
src/dnode/src/dnodeWrite.c
+2
-2
src/inc/dnode.h
src/inc/dnode.h
+0
-1
src/inc/vnode.h
src/inc/vnode.h
+4
-0
src/util/src/tqueue.c
src/util/src/tqueue.c
+2
-2
src/vnode/main/inc/vnodeInt.h
src/vnode/main/inc/vnodeInt.h
+8
-7
src/vnode/main/src/vnodeMain.c
src/vnode/main/src/vnodeMain.c
+42
-5
src/vnode/main/src/vnodeWrite.c
src/vnode/main/src/vnodeWrite.c
+1
-1
src/vnode/wal/src/walMain.c
src/vnode/wal/src/walMain.c
+1
-1
未找到文件。
src/dnode/src/dnodeMgmt.c
浏览文件 @
00e09623
...
...
@@ -31,22 +31,7 @@
#include "dnodeWrite.h"
#include "vnode.h"
typedef
struct
{
int32_t
vgId
;
// global vnode group ID
int32_t
refCount
;
// reference count
EVnodeStatus
status
;
// status: master, slave, notready, deleting
int64_t
version
;
void
*
wworker
;
void
*
rworker
;
void
*
wal
;
void
*
tsdb
;
void
*
replica
;
void
*
events
;
void
*
cq
;
// continuous query
}
SVnodeObj
;
static
int32_t
dnodeOpenVnodes
();
static
void
dnodeCleanupVnodes
();
static
int32_t
dnodeProcessCreateVnodeMsg
(
SRpcMsg
*
pMsg
);
static
int32_t
dnodeProcessDropVnodeMsg
(
SRpcMsg
*
pMsg
);
static
int32_t
dnodeProcessAlterVnodeMsg
(
SRpcMsg
*
pMsg
);
...
...
@@ -56,7 +41,6 @@ static int32_t (*dnodeProcessMgmtMsgFp[TSDB_MSG_TYPE_MAX])(SRpcMsg *pMsg);
static
void
dnodeSendStatusMsg
(
void
*
handle
,
void
*
tmrId
);
static
void
dnodeReadDnodeId
();
void
*
tsDnodeVnodesHash
=
NULL
;
static
void
*
tsDnodeTmr
=
NULL
;
static
void
*
tsStatusTimer
=
NULL
;
static
uint32_t
tsRebootTime
;
...
...
@@ -72,12 +56,6 @@ int32_t dnodeInitMgmt() {
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_ALTER_STREAM
]
=
dnodeProcessAlterStreamMsg
;
dnodeProcessMgmtMsgFp
[
TSDB_MSG_TYPE_MD_CONFIG_DNODE
]
=
dnodeProcessConfigDnodeMsg
;
tsDnodeVnodesHash
=
taosInitIntHash
(
TSDB_MAX_VNODES
,
sizeof
(
SVnodeObj
),
taosHashInt
);
if
(
tsDnodeVnodesHash
==
NULL
)
{
dError
(
"failed to init vnode list"
);
return
-
1
;
}
tsRebootTime
=
taosGetTimestampSec
();
tsDnodeTmr
=
taosTmrInit
(
100
,
200
,
60000
,
"DND-DM"
);
...
...
@@ -86,6 +64,10 @@ int32_t dnodeInitMgmt() {
return
-
1
;
}
if
(
vnodeInitModule
()
!=
TSDB_CODE_SUCCESS
)
{
return
-
1
;
}
int32_t
code
=
dnodeOpenVnodes
();
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
-
1
;
...
...
@@ -106,11 +88,7 @@ void dnodeCleanupMgmt() {
tsDnodeTmr
=
NULL
;
}
dnodeCleanupVnodes
();
if
(
tsDnodeVnodesHash
==
NULL
)
{
taosCleanUpIntHash
(
tsDnodeVnodesHash
);
tsDnodeVnodesHash
=
NULL
;
}
vnodeCleanupModule
();
}
void
dnodeMgmt
(
SRpcMsg
*
pMsg
)
{
...
...
@@ -129,14 +107,6 @@ void dnodeMgmt(SRpcMsg *pMsg) {
rpcFreeCont
(
pMsg
->
pCont
);
}
void
*
dnodeGetVnodeWworker
(
void
*
pVnode
)
{
return
((
SVnodeObj
*
)
pVnode
)
->
wworker
;
}
void
*
dnodeGetVnodeRworker
(
void
*
pVnode
)
{
return
((
SVnodeObj
*
)
pVnode
)
->
rworker
;
}
static
int32_t
dnodeOpenVnodes
()
{
DIR
*
dir
=
opendir
(
tsVnodeDir
);
if
(
dir
==
NULL
)
{
...
...
@@ -166,13 +136,6 @@ static int32_t dnodeOpenVnodes() {
return
TSDB_CODE_SUCCESS
;
}
typedef
void
(
*
CleanupFp
)(
char
*
);
static
void
dnodeCleanupVnodes
()
{
int32_t
num
=
taosGetIntHashSize
(
tsDnodeVnodesHash
);
taosCleanUpIntHashWithFp
(
tsDnodeVnodesHash
,
(
CleanupFp
)
vnodeClose
);
dPrint
(
"dnode mgmt is closed, vnodes:%d"
,
num
);
}
static
int32_t
dnodeProcessCreateVnodeMsg
(
SRpcMsg
*
rpcMsg
)
{
SMDCreateVnodeMsg
*
pCreate
=
rpcMsg
->
pCont
;
...
...
@@ -219,19 +182,6 @@ static int32_t dnodeProcessConfigDnodeMsg(SRpcMsg *pMsg) {
return
tsCfgDynamicOptions
(
pCfg
->
config
);
}
static
void
dnodeBuildVloadMsg
(
char
*
pNode
,
void
*
param
)
{
SVnodeObj
*
pVnode
=
(
SVnodeObj
*
)
pNode
;
if
(
pVnode
->
status
==
TSDB_VN_STATUS_DELETING
)
return
;
SDMStatusMsg
*
pStatus
=
param
;
if
(
pStatus
->
openVnodes
>=
TSDB_MAX_VNODES
)
return
;
SVnodeLoad
*
pLoad
=
&
pStatus
->
load
[
pStatus
->
openVnodes
++
];
pLoad
->
vgId
=
htonl
(
pVnode
->
vgId
);
pLoad
->
vnode
=
htonl
(
pVnode
->
vgId
);
pLoad
->
status
=
pVnode
->
status
;
}
static
void
dnodeSendStatusMsg
(
void
*
handle
,
void
*
tmrId
)
{
if
(
tsDnodeTmr
==
NULL
)
{
dError
(
"dnode timer is already released"
);
...
...
@@ -263,7 +213,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
pStatus
->
diskAvailable
=
tsAvailDataDirGB
;
pStatus
->
alternativeRole
=
(
uint8_t
)
tsAlternativeRole
;
taosVisitIntHashWithFp
(
tsDnodeVnodesHash
,
dnodeBuildVloadMsg
,
pStatus
);
vnodeBuildStatusMsg
(
pStatus
);
contLen
=
sizeof
(
SDMStatusMsg
)
+
pStatus
->
openVnodes
*
sizeof
(
SVnodeLoad
);
pStatus
->
openVnodes
=
htons
(
pStatus
->
openVnodes
);
...
...
src/dnode/src/dnodeRead.c
浏览文件 @
00e09623
...
...
@@ -127,7 +127,7 @@ void dnodeRead(SRpcMsg *pMsg) {
}
void
*
dnodeAllocateRqueue
(
void
*
pVnode
)
{
taos_queue
*
queue
=
taosOpenQueue
(
sizeof
(
SReadMsg
)
);
taos_queue
queue
=
taosOpenQueue
(
);
if
(
queue
==
NULL
)
return
NULL
;
taosAddIntoQset
(
readQset
,
queue
,
pVnode
);
...
...
@@ -144,6 +144,8 @@ void *dnodeAllocateRqueue(void *pVnode) {
}
}
dTrace
(
"pVnode:%p, queue:%p is allocated"
,
pVnode
,
queue
);
return
queue
;
}
...
...
src/dnode/src/dnodeWrite.c
浏览文件 @
00e09623
...
...
@@ -106,7 +106,7 @@ void dnodeWrite(SRpcMsg *pMsg) {
void
*
dnodeAllocateWqueue
(
void
*
pVnode
)
{
SWriteWorker
*
pWorker
=
wWorkerPool
.
writeWorker
+
wWorkerPool
.
nextId
;
taos_queue
*
queue
=
taosOpenQueue
();
void
*
queue
=
taosOpenQueue
();
if
(
queue
==
NULL
)
return
NULL
;
if
(
pWorker
->
qset
==
NULL
)
{
...
...
@@ -129,7 +129,7 @@ void *dnodeAllocateWqueue(void *pVnode) {
wWorkerPool
.
nextId
=
(
wWorkerPool
.
nextId
+
1
)
%
wWorkerPool
.
max
;
}
dTrace
(
"
queue:%p is allocated for pVnode:%p"
,
queue
,
pVnod
e
);
dTrace
(
"
pVnode:%p, queue:%p is allocated"
,
pVnode
,
queu
e
);
return
queue
;
}
...
...
src/inc/dnode.h
浏览文件 @
00e09623
...
...
@@ -44,7 +44,6 @@ void *dnodeAllocateRqueue(void *pVnode);
void
dnodeFreeRqueue
(
void
*
rqueue
);
void
dnodeSendWriteResponse
(
void
*
pVnode
,
void
*
param
,
int32_t
code
);
#ifdef __cplusplus
}
#endif
...
...
src/inc/vnode.h
浏览文件 @
00e09623
...
...
@@ -25,6 +25,9 @@ typedef struct {
void
*
rsp
;
}
SRspRet
;
int32_t
vnodeInitModule
();
void
vnodeCleanupModule
();
int32_t
vnodeCreate
(
SMDCreateVnodeMsg
*
pVnodeCfg
);
int32_t
vnodeDrop
(
int32_t
vgId
);
int32_t
vnodeOpen
(
int32_t
vnode
,
char
*
rootDir
);
...
...
@@ -39,6 +42,7 @@ void* vnodeGetWal(void *pVnode);
void
*
vnodeGetTsdb
(
void
*
pVnode
);
int32_t
vnodeProcessWrite
(
void
*
pVnode
,
int
qtype
,
SWalHead
*
pHead
,
void
*
item
);
void
vnodeBuildStatusMsg
(
void
*
param
);
#ifdef __cplusplus
}
...
...
src/util/src/tqueue.c
浏览文件 @
00e09623
...
...
@@ -117,7 +117,7 @@ int taosWriteQitem(taos_queue param, int type, void *item) {
queue
->
numOfItems
++
;
if
(
queue
->
qset
)
atomic_add_fetch_32
(
&
queue
->
qset
->
numOfItems
,
1
);
//pTrace("item:%p is put into queue, items:%d", item
, queue->numOfItems);
pTrace
(
"item:%p is put into queue, type:%d items:%d"
,
item
,
type
,
queue
->
numOfItems
);
pthread_mutex_unlock
(
&
queue
->
mutex
);
...
...
@@ -197,7 +197,7 @@ int taosGetQitem(taos_qall param, int *type, void **pitem) {
*
pitem
=
pNode
->
item
;
*
type
=
pNode
->
type
;
num
=
1
;
//pTrace("item:%p is fetched", *pitem
);
pTrace
(
"item:%p is fetched, type:%d"
,
*
pitem
,
*
type
);
}
return
num
;
...
...
src/vnode/main/inc/vnodeInt.h
浏览文件 @
00e09623
...
...
@@ -34,13 +34,14 @@ typedef struct {
EVnStatus
status
;
int
role
;
int64_t
version
;
void
*
wqueue
;
void
*
rqueue
;
void
*
wal
;
void
*
tsdb
;
void
*
sync
;
void
*
events
;
void
*
cq
;
// continuous query
int
temp
;
void
*
wqueue
;
void
*
rqueue
;
void
*
wal
;
void
*
tsdb
;
void
*
sync
;
void
*
events
;
void
*
cq
;
// continuous query
}
SVnodeObj
;
int
vnodeWriteToQueue
(
void
*
param
,
SWalHead
*
pHead
,
int
type
);
...
...
src/vnode/main/src/vnodeMain.c
浏览文件 @
00e09623
...
...
@@ -25,10 +25,32 @@
#include "ttime.h"
#include "ttimer.h"
#include "twal.h"
#include "dnode.h"
#include "vnode.h"
#include "vnodeInt.h"
extern
void
*
tsDnodeVnodesHash
;
static
void
vnodeCleanUp
(
SVnodeObj
*
pVnode
);
static
void
*
tsDnodeVnodesHash
;
static
void
vnodeCleanUp
(
SVnodeObj
*
pVnode
);
static
void
vnodeBuildVloadMsg
(
char
*
pNode
,
void
*
param
);
int32_t
vnodeInitModule
()
{
vnodeInitWriteFp
();
tsDnodeVnodesHash
=
taosInitIntHash
(
TSDB_MAX_VNODES
,
sizeof
(
SVnodeObj
),
taosHashInt
);
if
(
tsDnodeVnodesHash
==
NULL
)
{
dError
(
"failed to init vnode list"
);
return
-
1
;
}
return
0
;
}
typedef
void
(
*
CleanupFp
)(
char
*
);
void
vnodeCleanupModule
()
{
taosCleanUpIntHashWithFp
(
tsDnodeVnodesHash
,
(
CleanupFp
)
vnodeClose
);
taosCleanUpIntHash
(
tsDnodeVnodesHash
);
}
int32_t
vnodeCreate
(
SMDCreateVnodeMsg
*
pVnodeCfg
)
{
int32_t
code
;
...
...
@@ -95,9 +117,6 @@ int32_t vnodeDrop(int32_t vgId) {
int32_t
vnodeOpen
(
int32_t
vnode
,
char
*
rootDir
)
{
char
temp
[
TSDB_FILENAME_LEN
];
static
pthread_once_t
vnodeInitWrite
=
PTHREAD_ONCE_INIT
;
pthread_once
(
&
vnodeInitWrite
,
vnodeInitWriteFp
);
SVnodeObj
vnodeObj
=
{
0
};
vnodeObj
.
vgId
=
vnode
;
vnodeObj
.
status
=
VN_STATUS_INIT
;
...
...
@@ -194,6 +213,24 @@ void *vnodeGetTsdb(void *pVnode) {
return
((
SVnodeObj
*
)
pVnode
)
->
tsdb
;
}
void
vnodeBuildStatusMsg
(
void
*
param
)
{
SDMStatusMsg
*
pStatus
=
param
;
taosVisitIntHashWithFp
(
tsDnodeVnodesHash
,
vnodeBuildVloadMsg
,
pStatus
);
}
static
void
vnodeBuildVloadMsg
(
char
*
pNode
,
void
*
param
)
{
SVnodeObj
*
pVnode
=
(
SVnodeObj
*
)
pNode
;
if
(
pVnode
->
status
==
VN_STATUS_DELETING
)
return
;
SDMStatusMsg
*
pStatus
=
param
;
if
(
pStatus
->
openVnodes
>=
TSDB_MAX_VNODES
)
return
;
SVnodeLoad
*
pLoad
=
&
pStatus
->
load
[
pStatus
->
openVnodes
++
];
pLoad
->
vgId
=
htonl
(
pVnode
->
vgId
);
pLoad
->
vnode
=
htonl
(
pVnode
->
vgId
);
pLoad
->
status
=
pVnode
->
status
;
}
static
void
vnodeCleanUp
(
SVnodeObj
*
pVnode
)
{
taosDeleteIntHash
(
tsDnodeVnodesHash
,
pVnode
->
vgId
);
...
...
src/vnode/main/src/vnodeWrite.c
浏览文件 @
00e09623
...
...
@@ -26,7 +26,7 @@
#include "vnode.h"
#include "vnodeInt.h"
static
int32_t
(
*
vnodeProcessWriteMsgFp
[
TSDB_MSG_TYPE_MAX
])(
SVnodeObj
*
,
void
*
,
void
*
);
static
int32_t
(
*
vnodeProcessWriteMsgFp
[
TSDB_MSG_TYPE_MAX
])(
SVnodeObj
*
,
void
*
,
SRspRet
*
);
static
int32_t
vnodeProcessSubmitMsg
(
SVnodeObj
*
pVnode
,
void
*
pMsg
,
SRspRet
*
);
static
int32_t
vnodeProcessCreateTableMsg
(
SVnodeObj
*
pVnode
,
void
*
pMsg
,
SRspRet
*
);
static
int32_t
vnodeProcessDropTableMsg
(
SVnodeObj
*
pVnode
,
void
*
pMsg
,
SRspRet
*
);
...
...
src/vnode/wal/src/walMain.c
浏览文件 @
00e09623
...
...
@@ -284,7 +284,7 @@ static int walRestoreWalFile(char *name, void *pVnode, int (*writeFp)(void *, SW
}
// write into queue
(
*
writeFp
)(
pVnode
,
buffer
,
TAOS_QTYPE_WAL
);
(
*
writeFp
)(
pVnode
,
pHead
,
TAOS_QTYPE_WAL
);
}
return
code
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录