Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
7c8b2867
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
7c8b2867
编写于
11月 14, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh(stream): add stream task info into information schema
上级
aa9e9358
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
127 addition
and
4 deletion
+127
-4
include/common/systable.h
include/common/systable.h
+1
-0
include/common/tmsg.h
include/common/tmsg.h
+1
-0
source/common/src/systable.c
source/common/src/systable.c
+11
-1
source/dnode/mnode/impl/src/mndShow.c
source/dnode/mnode/impl/src/mndShow.c
+2
-0
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+107
-1
source/libs/wal/src/walMeta.c
source/libs/wal/src/walMeta.c
+4
-2
source/libs/wal/src/walWrite.c
source/libs/wal/src/walWrite.c
+1
-0
未找到文件。
include/common/systable.h
浏览文件 @
7c8b2867
...
...
@@ -46,6 +46,7 @@ extern "C" {
#define TSDB_INS_TABLE_SUBSCRIPTIONS "ins_subscriptions"
#define TSDB_INS_TABLE_TOPICS "ins_topics"
#define TSDB_INS_TABLE_STREAMS "ins_streams"
#define TSDB_INS_TABLE_STREAM_TASKS "ins_stream_tasks"
#define TSDB_PERFORMANCE_SCHEMA_DB "performance_schema"
#define TSDB_PERFS_TABLE_SMAS "perf_smas"
...
...
include/common/tmsg.h
浏览文件 @
7c8b2867
...
...
@@ -119,6 +119,7 @@ typedef enum _mgmt_table {
TSDB_MGMT_TABLE_QUERIES
,
TSDB_MGMT_TABLE_VNODES
,
TSDB_MGMT_TABLE_APPS
,
TSDB_MGMT_TABLE_STREAM_TASKS
,
TSDB_MGMT_TABLE_MAX
,
}
EShowType
;
...
...
source/common/src/systable.c
浏览文件 @
7c8b2867
...
...
@@ -134,7 +134,7 @@ static const SSysDbTableSchema userStbsSchema[] = {
};
static
const
SSysDbTableSchema
streamSchema
[]
=
{
{.
name
=
"stream_name"
,
.
bytes
=
SYSTABLE_SCH_
DB
_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
false
},
{.
name
=
"stream_name"
,
.
bytes
=
SYSTABLE_SCH_
TABLE
_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
false
},
{.
name
=
"create_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
,
.
sysInfo
=
false
},
{.
name
=
"sql"
,
.
bytes
=
TSDB_SHOW_SQL_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
false
},
{.
name
=
"status"
,
.
bytes
=
20
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
false
},
...
...
@@ -145,6 +145,15 @@ static const SSysDbTableSchema streamSchema[] = {
{.
name
=
"trigger"
,
.
bytes
=
20
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
false
},
};
static
const
SSysDbTableSchema
streamTaskSchema
[]
=
{
{.
name
=
"stream_name"
,
.
bytes
=
SYSTABLE_SCH_DB_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
false
},
{.
name
=
"task_id"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_INT
,
.
sysInfo
=
false
},
{.
name
=
"node_type"
,
.
bytes
=
SYSTABLE_SCH_DB_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
false
},
{.
name
=
"node_id"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_INT
,
.
sysInfo
=
false
},
{.
name
=
"level"
,
.
bytes
=
20
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
false
},
{.
name
=
"status"
,
.
bytes
=
20
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
false
},
};
static
const
SSysDbTableSchema
userTblsSchema
[]
=
{
{.
name
=
"table_name"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
false
},
{.
name
=
"db_name"
,
.
bytes
=
SYSTABLE_SCH_DB_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
,
.
sysInfo
=
false
},
...
...
@@ -287,6 +296,7 @@ static const SSysTableMeta infosMeta[] = {
{
TSDB_INS_TABLE_TOPICS
,
topicSchema
,
tListLen
(
topicSchema
),
false
},
{
TSDB_INS_TABLE_SUBSCRIPTIONS
,
subscriptionSchema
,
tListLen
(
subscriptionSchema
),
false
},
{
TSDB_INS_TABLE_STREAMS
,
streamSchema
,
tListLen
(
streamSchema
),
false
},
{
TSDB_INS_TABLE_STREAM_TASKS
,
streamTaskSchema
,
tListLen
(
streamTaskSchema
),
false
},
{
TSDB_INS_TABLE_VNODES
,
vnodesSchema
,
tListLen
(
vnodesSchema
),
true
},
};
...
...
source/dnode/mnode/impl/src/mndShow.c
浏览文件 @
7c8b2867
...
...
@@ -106,6 +106,8 @@ static int32_t convertToRetrieveType(char *name, int32_t len) {
type
=
TSDB_MGMT_TABLE_STREAMS
;
}
else
if
(
strncasecmp
(
name
,
TSDB_PERFS_TABLE_APPS
,
len
)
==
0
)
{
type
=
TSDB_MGMT_TABLE_APPS
;
}
else
if
(
strncasecmp
(
name
,
TSDB_INS_TABLE_STREAM_TASKS
,
len
)
==
0
)
{
type
=
TSDB_MGMT_TABLE_STREAM_TASKS
;
}
else
{
// ASSERT(0);
}
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
7c8b2867
...
...
@@ -41,6 +41,8 @@ static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq);
static
int32_t
mndGetStreamMeta
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
STableMetaRsp
*
pMeta
);
static
int32_t
mndRetrieveStream
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextStream
(
SMnode
*
pMnode
,
void
*
pIter
);
static
int32_t
mndRetrieveStreamTask
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rows
);
static
void
mndCancelGetNextStreamTask
(
SMnode
*
pMnode
,
void
*
pIter
);
int32_t
mndInitStream
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{
...
...
@@ -62,6 +64,8 @@ int32_t mndInitStream(SMnode *pMnode) {
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_STREAMS
,
mndRetrieveStream
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_STREAMS
,
mndCancelGetNextStream
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_STREAM_TASKS
,
mndRetrieveStreamTask
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_STREAM_TASKS
,
mndCancelGetNextStreamTask
);
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
}
...
...
@@ -891,7 +895,7 @@ static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
SName
n
;
int32_t
cols
=
0
;
char
streamName
[
TSDB_
DB
_NAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
char
streamName
[
TSDB_
TABLE
_NAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
STR_WITH_MAXSIZE_TO_VARSTR
(
streamName
,
mndGetDbStr
(
pStream
->
name
),
sizeof
(
streamName
));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
streamName
,
false
);
...
...
@@ -953,3 +957,105 @@ static void mndCancelGetNextStream(SMnode *pMnode, void *pIter) {
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
static
int32_t
mndRetrieveStreamTask
(
SRpcMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rowsCapacity
)
{
SMnode
*
pMnode
=
pReq
->
info
.
node
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
SStreamObj
*
pStream
=
NULL
;
while
(
numOfRows
<
rowsCapacity
)
{
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_STREAM
,
pShow
->
pIter
,
(
void
**
)
&
pStream
);
if
(
pShow
->
pIter
==
NULL
)
break
;
// lock
taosRLockLatch
(
&
pStream
->
lock
);
// count task num
int32_t
sz
=
taosArrayGetSize
(
pStream
->
tasks
);
int32_t
count
=
0
;
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SArray
*
pLevel
=
taosArrayGetP
(
pStream
->
tasks
,
i
);
count
+=
taosArrayGetSize
(
pLevel
);
}
if
(
numOfRows
+
count
>
rowsCapacity
)
{
blockDataEnsureCapacity
(
pBlock
,
numOfRows
+
count
);
}
// add row for each task
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SArray
*
pLevel
=
taosArrayGetP
(
pStream
->
tasks
,
i
);
int32_t
levelCnt
=
taosArrayGetSize
(
pLevel
);
for
(
int32_t
j
=
0
;
j
<
levelCnt
;
j
++
)
{
SStreamTask
*
pTask
=
taosArrayGetP
(
pLevel
,
j
);
SColumnInfoData
*
pColInfo
;
int32_t
cols
=
0
;
// stream name
char
streamName
[
TSDB_TABLE_NAME_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
STR_WITH_MAXSIZE_TO_VARSTR
(
streamName
,
mndGetDbStr
(
pStream
->
name
),
sizeof
(
streamName
));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
streamName
,
false
);
// task id
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pTask
->
taskId
,
false
);
// node type
char
nodeType
[
20
+
VARSTR_HEADER_SIZE
]
=
{
0
};
varDataSetLen
(
nodeType
,
5
);
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
if
(
pTask
->
nodeId
>
0
)
{
memcpy
(
varDataVal
(
nodeType
),
"vnode"
,
5
);
}
else
{
memcpy
(
varDataVal
(
nodeType
),
"snode"
,
5
);
}
colDataAppend
(
pColInfo
,
numOfRows
,
nodeType
,
false
);
// node id
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
int32_t
nodeId
=
TMAX
(
pTask
->
nodeId
,
0
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
nodeId
,
false
);
// level
char
level
[
20
+
VARSTR_HEADER_SIZE
]
=
{
0
};
if
(
pTask
->
taskLevel
==
TASK_LEVEL__SOURCE
)
{
memcpy
(
varDataVal
(
level
),
"source"
,
6
);
varDataSetLen
(
level
,
6
);
}
else
if
(
pTask
->
taskLevel
==
TASK_LEVEL__AGG
)
{
memcpy
(
varDataVal
(
level
),
"agg"
,
3
);
varDataSetLen
(
level
,
3
);
}
else
if
(
pTask
->
taskLevel
==
TASK_LEVEL__SINK
)
{
memcpy
(
varDataVal
(
level
),
"sink"
,
4
);
varDataSetLen
(
level
,
4
);
}
else
if
(
pTask
->
taskLevel
==
TASK_LEVEL__SINK
)
{
}
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
level
,
false
);
// status
char
status
[
20
+
VARSTR_HEADER_SIZE
]
=
{
0
};
char
status2
[
20
]
=
{
0
};
strcpy
(
status
,
"normal"
);
STR_WITH_MAXSIZE_TO_VARSTR
(
status
,
status2
,
sizeof
(
status
));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
status
,
false
);
numOfRows
++
;
}
}
// unlock
taosRUnLockLatch
(
&
pStream
->
lock
);
sdbRelease
(
pSdb
,
pStream
);
}
pShow
->
numOfRows
+=
numOfRows
;
return
numOfRows
;
}
static
void
mndCancelGetNextStreamTask
(
SMnode
*
pMnode
,
void
*
pIter
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
source/libs/wal/src/walMeta.c
浏览文件 @
7c8b2867
...
...
@@ -124,8 +124,8 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal, int32_t fileIdx) {
goto
_err
;
}
char
*
candidate
=
NULL
;
char
*
haystack
=
buf
;
char
*
candidate
=
NULL
;
char
*
haystack
=
buf
;
int64_t
pos
=
0
;
SWalCkHead
*
logContent
=
NULL
;
...
...
@@ -414,8 +414,10 @@ int walCheckAndRepairMeta(SWal* pWal) {
}
ASSERT
(
pFileInfo
->
fileSize
==
0
);
// remove the empty wal log, and its idx
wInfo
(
"vgId:%d, wal remove empty file %s"
,
pWal
->
cfg
.
vgId
,
fnameStr
);
taosRemoveFile
(
fnameStr
);
walBuildIdxName
(
pWal
,
pFileInfo
->
firstVer
,
fnameStr
);
wInfo
(
"vgId:%d, wal remove empty file %s"
,
pWal
->
cfg
.
vgId
,
fnameStr
);
taosRemoveFile
(
fnameStr
);
// remove its meta entry
taosArrayRemove
(
pWal
->
fileInfoSet
,
fileIdx
);
...
...
source/libs/wal/src/walWrite.c
浏览文件 @
7c8b2867
...
...
@@ -407,6 +407,7 @@ int32_t walRollImpl(SWal *pWal) {
}
walBuildLogName
(
pWal
,
newFileFirstVer
,
fnameStr
);
pLogFile
=
taosOpenFile
(
fnameStr
,
TD_FILE_CREATE
|
TD_FILE_WRITE
|
TD_FILE_APPEND
);
wDebug
(
"vgId:%d, wal create new file for write:%s"
,
pWal
->
cfg
.
vgId
,
fnameStr
);
if
(
pLogFile
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
code
=
-
1
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录