Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
19ec7bca
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
19ec7bca
编写于
4月 27, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
enh: more info in perf schema
上级
e8acef45
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
184 addition
and
29 deletion
+184
-29
include/util/tarray.h
include/util/tarray.h
+2
-1
include/util/tdef.h
include/util/tdef.h
+1
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+0
-1
source/client/src/tmq.c
source/client/src/tmq.c
+5
-2
source/dnode/mnode/impl/inc/mndConsumer.h
source/dnode/mnode/impl/inc/mndConsumer.h
+0
-2
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+9
-0
source/dnode/mnode/impl/src/mndConsumer.c
source/dnode/mnode/impl/src/mndConsumer.c
+122
-2
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+15
-0
source/dnode/mnode/impl/src/mndInfoSchema.c
source/dnode/mnode/impl/src/mndInfoSchema.c
+0
-19
source/dnode/mnode/impl/src/mndPerfSchema.c
source/dnode/mnode/impl/src/mndPerfSchema.c
+11
-2
source/util/src/tarray.c
source/util/src/tarray.c
+19
-0
未找到文件。
include/util/tarray.h
浏览文件 @
19ec7bca
...
...
@@ -205,7 +205,6 @@ SArray* taosArrayDup(const SArray* pSrc);
*/
SArray
*
taosArrayDeepCopy
(
const
SArray
*
pSrc
,
FCopy
deepCopy
);
/**
* clear the array (remove all element)
* @param pArray
...
...
@@ -272,6 +271,8 @@ void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* par
int32_t
taosEncodeArray
(
void
**
buf
,
const
SArray
*
pArray
,
FEncode
encode
);
void
*
taosDecodeArray
(
const
void
*
buf
,
SArray
**
pArray
,
FDecode
decode
,
int32_t
dataSz
);
char
*
taosShowStrArray
(
const
SArray
*
pArray
);
#ifdef __cplusplus
}
#endif
...
...
include/util/tdef.h
浏览文件 @
19ec7bca
...
...
@@ -131,6 +131,7 @@ extern const int32_t TYPE_BYTES[15];
#define TSDB_PERFS_TABLE_TOPICS "topics"
#define TSDB_PERFS_TABLE_CONSUMERS "consumers"
#define TSDB_PERFS_TABLE_SUBSCRIPTIONS "subscriptions"
#define TSDB_PERFS_TABLE_OFFSETS "offsets"
#define TSDB_INDEX_TYPE_SMA "SMA"
#define TSDB_INDEX_TYPE_FULLTEXT "FULLTEXT"
...
...
source/client/src/clientImpl.c
浏览文件 @
19ec7bca
...
...
@@ -245,7 +245,6 @@ void setResSchemaInfo(SReqResultInfo* pResInfo, const SSchema* pSchema, int32_t
ASSERT
(
pSchema
!=
NULL
&&
numOfCols
>
0
);
pResInfo
->
numOfCols
=
numOfCols
;
// TODO handle memory leak
if
(
pResInfo
->
fields
!=
NULL
)
{
taosMemoryFree
(
pResInfo
->
fields
);
}
...
...
source/client/src/tmq.c
浏览文件 @
19ec7bca
...
...
@@ -666,7 +666,6 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
code
=
param
.
rspErr
;
if
(
code
!=
0
)
goto
FAIL
;
// TODO: add max retry cnt
while
(
TSDB_CODE_MND_CONSUMER_NOT_READY
==
tmqAskEp
(
tmq
,
false
))
{
tscDebug
(
"not ready, retry"
);
taosMsleep
(
500
);
...
...
@@ -683,7 +682,7 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, const tmq_list_t* topic_list) {
code
=
0
;
FAIL:
if
(
req
.
topicNames
!=
NULL
)
taosArrayDestroyP
(
req
.
topicNames
,
taosMemoryFree
);
if
(
code
!=
0
)
{
if
(
code
!=
0
&&
buf
)
{
taosMemoryFree
(
buf
);
}
return
code
;
...
...
@@ -1265,6 +1264,7 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
return
(
TAOS_RES
*
)
rspObj
;
}
// in no topic status also need process delayed task
if
(
atomic_load_8
(
&
tmq
->
status
)
==
TMQ_CONSUMER_STATUS__INIT
)
{
return
NULL
;
}
...
...
@@ -1285,6 +1285,9 @@ TAOS_RES* tmq_consumer_poll(tmq_t* tmq, int64_t wait_time) {
return
NULL
;
}
tsem_timewait
(
&
tmq
->
rspSem
,
leftTime
*
1000
);
}
else
{
// use tsem_timewait instead of tsem_wait to avoid unexpected stuck
tsem_timewait
(
&
tmq
->
rspSem
,
500
*
1000
);
}
}
}
...
...
source/dnode/mnode/impl/inc/mndConsumer.h
浏览文件 @
19ec7bca
...
...
@@ -23,10 +23,8 @@ extern "C" {
#endif
enum
{
// MQ_CONSUMER_STATUS__INIT = 1,
MQ_CONSUMER_STATUS__MODIFY
=
1
,
MQ_CONSUMER_STATUS__MODIFY_IN_REB
,
// MQ_CONSUMER_STATUS__IDLE,
MQ_CONSUMER_STATUS__READY
,
MQ_CONSUMER_STATUS__LOST
,
MQ_CONSUMER_STATUS__LOST_IN_REB
,
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
19ec7bca
...
...
@@ -469,6 +469,7 @@ enum {
typedef
struct
{
int64_t
consumerId
;
char
cgroup
[
TSDB_CGROUP_LEN
];
char
appId
[
TSDB_CGROUP_LEN
];
int8_t
updateType
;
// used only for update
int32_t
epoch
;
int32_t
status
;
...
...
@@ -479,6 +480,14 @@ typedef struct {
SArray
*
currentTopics
;
// SArray<char*>
SArray
*
rebNewTopics
;
// SArray<char*>
SArray
*
rebRemovedTopics
;
// SArray<char*>
// data for display
int32_t
pid
;
SEpSet
ep
;
int64_t
upTime
;
int64_t
subscribeTime
;
int64_t
rebalanceTime
;
}
SMqConsumerObj
;
SMqConsumerObj
*
tNewSMqConsumerObj
(
int64_t
consumerId
,
char
cgroup
[
TSDB_CGROUP_LEN
]);
...
...
source/dnode/mnode/impl/src/mndConsumer.c
浏览文件 @
19ec7bca
...
...
@@ -37,6 +37,8 @@
static
int8_t
mqInRebFlag
=
0
;
static
const
char
*
mndConsumerStatusName
(
int
status
);
static
int32_t
mndConsumerActionInsert
(
SSdb
*
pSdb
,
SMqConsumerObj
*
pConsumer
);
static
int32_t
mndConsumerActionDelete
(
SSdb
*
pSdb
,
SMqConsumerObj
*
pConsumer
);
static
int32_t
mndConsumerActionUpdate
(
SSdb
*
pSdb
,
SMqConsumerObj
*
pConsumer
,
SMqConsumerObj
*
pNewConsumer
);
...
...
@@ -62,6 +64,10 @@ int32_t mndInitConsumer(SMnode *pMnode) {
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_ASK_EP
,
mndProcessAskEpReq
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_TIMER
,
mndProcessMqTimerMsg
);
mndSetMsgHandle
(
pMnode
,
TDMT_MND_MQ_CONSUMER_LOST
,
mndProcessConsumerLostMsg
);
mndAddShowRetrieveHandle
(
pMnode
,
TSDB_MGMT_TABLE_CONSUMERS
,
mndRetrieveConsumer
);
mndAddShowFreeIterHandle
(
pMnode
,
TSDB_MGMT_TABLE_CONSUMERS
,
mndCancelGetNextConsumer
);
return
sdbSetTable
(
pMnode
->
pSdb
,
table
);
}
...
...
@@ -366,7 +372,6 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
if
(
pConsumerOld
==
NULL
)
{
pConsumerNew
=
tNewSMqConsumerObj
(
consumerId
,
cgroup
);
pConsumerNew
->
updateType
=
CONSUMER_UPDATE__MODIFY
;
/*pConsumerNew->waitingRebTopics = newSub;*/
pConsumerNew
->
rebNewTopics
=
newSub
;
subscribe
.
topicNames
=
NULL
;
...
...
@@ -389,7 +394,6 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
goto
SUBSCRIBE_OVER
;
}
pConsumerNew
->
updateType
=
CONSUMER_UPDATE__MODIFY
;
/*pConsumerOld->waitingRebTopics = newSub;*/
int32_t
oldTopicNum
=
0
;
if
(
pConsumerOld
->
currentTopics
)
{
...
...
@@ -532,6 +536,7 @@ CM_DECODE_OVER:
static
int32_t
mndConsumerActionInsert
(
SSdb
*
pSdb
,
SMqConsumerObj
*
pConsumer
)
{
mTrace
(
"consumer:%"
PRId64
", perform insert action"
,
pConsumer
->
consumerId
);
pConsumer
->
subscribeTime
=
pConsumer
->
upTime
;
return
0
;
}
...
...
@@ -557,6 +562,8 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer
->
rebRemovedTopics
=
pNewConsumer
->
rebRemovedTopics
;
pNewConsumer
->
rebRemovedTopics
=
tmp
;
pOldConsumer
->
subscribeTime
=
pNewConsumer
->
upTime
;
pOldConsumer
->
status
=
MQ_CONSUMER_STATUS__MODIFY
;
}
else
if
(
pNewConsumer
->
updateType
==
CONSUMER_UPDATE__LOST
)
{
int32_t
sz
=
taosArrayGetSize
(
pOldConsumer
->
currentTopics
);
...
...
@@ -565,9 +572,15 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
char
*
topic
=
strdup
(
taosArrayGetP
(
pOldConsumer
->
currentTopics
,
i
));
taosArrayPush
(
pNewConsumer
->
rebRemovedTopics
,
&
topic
);
}
pOldConsumer
->
rebalanceTime
=
pNewConsumer
->
upTime
;
pOldConsumer
->
status
=
MQ_CONSUMER_STATUS__LOST
;
}
else
if
(
pNewConsumer
->
updateType
==
CONSUMER_UPDATE__TOUCH
)
{
atomic_add_fetch_32
(
&
pOldConsumer
->
epoch
,
1
);
pOldConsumer
->
rebalanceTime
=
pNewConsumer
->
upTime
;
}
else
if
(
pNewConsumer
->
updateType
==
CONSUMER_UPDATE__ADD
)
{
ASSERT
(
taosArrayGetSize
(
pNewConsumer
->
rebNewTopics
)
==
1
);
ASSERT
(
taosArrayGetSize
(
pNewConsumer
->
rebRemovedTopics
)
==
0
);
...
...
@@ -612,6 +625,9 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer
->
status
=
MQ_CONSUMER_STATUS__LOST_IN_REB
;
}
}
pOldConsumer
->
rebalanceTime
=
pNewConsumer
->
upTime
;
atomic_add_fetch_32
(
&
pOldConsumer
->
epoch
,
1
);
}
else
if
(
pNewConsumer
->
updateType
==
CONSUMER_UPDATE__REMOVE
)
{
ASSERT
(
taosArrayGetSize
(
pNewConsumer
->
rebNewTopics
)
==
0
);
...
...
@@ -668,6 +684,9 @@ static int32_t mndConsumerActionUpdate(SSdb *pSdb, SMqConsumerObj *pOldConsumer,
pOldConsumer
->
status
=
MQ_CONSUMER_STATUS__LOST_IN_REB
;
}
}
pOldConsumer
->
rebalanceTime
=
pNewConsumer
->
upTime
;
atomic_add_fetch_32
(
&
pOldConsumer
->
epoch
,
1
);
}
...
...
@@ -688,3 +707,104 @@ void mndReleaseConsumer(SMnode *pMnode, SMqConsumerObj *pConsumer) {
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbRelease
(
pSdb
,
pConsumer
);
}
static
int32_t
mndRetrieveConsumer
(
SNodeMsg
*
pReq
,
SShowObj
*
pShow
,
SSDataBlock
*
pBlock
,
int32_t
rowsCapacity
)
{
SMnode
*
pMnode
=
pReq
->
pNode
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
int32_t
numOfRows
=
0
;
SMqConsumerObj
*
pConsumer
=
NULL
;
while
(
numOfRows
<
rowsCapacity
)
{
pShow
->
pIter
=
sdbFetch
(
pSdb
,
SDB_CONSUMER
,
pShow
->
pIter
,
(
void
**
)
&
pConsumer
);
if
(
pShow
->
pIter
==
NULL
)
break
;
SColumnInfoData
*
pColInfo
;
int32_t
cols
=
0
;
taosRLockLatch
(
&
pConsumer
->
lock
);
// consumer id
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pConsumer
->
consumerId
,
false
);
// group id
char
groupId
[
TSDB_CGROUP_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
tstrncpy
(
varDataVal
(
groupId
),
pConsumer
->
cgroup
,
TSDB_CGROUP_LEN
);
varDataSetLen
(
groupId
,
strlen
(
varDataVal
(
groupId
)));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
groupId
,
false
);
// app id
char
appId
[
TSDB_CGROUP_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
tstrncpy
(
varDataVal
(
appId
),
pConsumer
->
appId
,
TSDB_CGROUP_LEN
);
varDataSetLen
(
appId
,
strlen
(
varDataVal
(
appId
)));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
appId
,
false
);
// status
char
status
[
20
+
VARSTR_HEADER_SIZE
]
=
{
0
};
tstrncpy
(
varDataVal
(
status
),
mndConsumerStatusName
(
pConsumer
->
status
),
20
);
varDataSetLen
(
status
,
strlen
(
varDataVal
(
status
)));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
status
,
false
);
// subscribed topics
char
topics
[
TSDB_SHOW_LIST_LEN
+
VARSTR_HEADER_SIZE
]
=
{
0
};
char
*
showStr
=
taosShowStrArray
(
pConsumer
->
currentTopics
);
tstrncpy
(
varDataVal
(
topics
),
showStr
,
TSDB_SHOW_LIST_LEN
);
taosMemoryFree
(
showStr
);
varDataSetLen
(
topics
,
strlen
(
varDataVal
(
topics
)));
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
topics
,
false
);
// pid
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pConsumer
->
pid
,
true
);
// end point
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pConsumer
->
ep
,
true
);
// up time
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pConsumer
->
upTime
,
false
);
// subscribe time
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pConsumer
->
subscribeTime
,
false
);
// rebalance time
pColInfo
=
taosArrayGet
(
pBlock
->
pDataBlock
,
cols
++
);
colDataAppend
(
pColInfo
,
numOfRows
,
(
const
char
*
)
&
pConsumer
->
rebalanceTime
,
pConsumer
->
rebalanceTime
==
0
);
taosRUnLockLatch
(
&
pConsumer
->
lock
);
sdbRelease
(
pSdb
,
pConsumer
);
numOfRows
++
;
}
pShow
->
numOfRows
+=
numOfRows
;
return
numOfRows
;
}
static
void
mndCancelGetNextConsumer
(
SMnode
*
pMnode
,
void
*
pIter
)
{
SSdb
*
pSdb
=
pMnode
->
pSdb
;
sdbCancelFetch
(
pSdb
,
pIter
);
}
static
const
char
*
mndConsumerStatusName
(
int
status
)
{
switch
(
status
)
{
case
MQ_CONSUMER_STATUS__READY
:
return
"ready"
;
case
MQ_CONSUMER_STATUS__LOST
:
case
MQ_CONSUMER_STATUS__LOST_REBD
:
case
MQ_CONSUMER_STATUS__LOST_IN_REB
:
return
"lost"
;
case
MQ_CONSUMER_STATUS__MODIFY
:
case
MQ_CONSUMER_STATUS__MODIFY_IN_REB
:
return
"rebalancing"
;
default:
return
"unknown"
;
}
}
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
19ec7bca
...
...
@@ -43,6 +43,8 @@ SMqConsumerObj *tNewSMqConsumerObj(int64_t consumerId, char cgroup[TSDB_CGROUP_L
return
NULL
;
}
pConsumer
->
upTime
=
taosGetTimestampMs
();
return
pConsumer
;
}
...
...
@@ -67,6 +69,12 @@ int32_t tEncodeSMqConsumerObj(void **buf, const SMqConsumerObj *pConsumer) {
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumer
->
epoch
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumer
->
status
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pConsumer
->
pid
);
tlen
+=
taosEncodeSEpSet
(
buf
,
&
pConsumer
->
ep
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
upTime
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
subscribeTime
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
rebalanceTime
);
// current topics
if
(
pConsumer
->
currentTopics
)
{
sz
=
taosArrayGetSize
(
pConsumer
->
currentTopics
);
...
...
@@ -114,6 +122,12 @@ void *tDecodeSMqConsumerObj(const void *buf, SMqConsumerObj *pConsumer) {
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumer
->
epoch
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumer
->
status
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pConsumer
->
pid
);
buf
=
taosDecodeSEpSet
(
buf
,
&
pConsumer
->
ep
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
upTime
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
subscribeTime
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
rebalanceTime
);
// current topics
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
pConsumer
->
currentTopics
=
taosArrayInit
(
sz
,
sizeof
(
void
*
));
...
...
@@ -329,6 +343,7 @@ int32_t tEncodeSMqSubActionLogEntry(void **buf, const SMqSubActionLogEntry *pEnt
tlen
+=
taosEncodeArray
(
buf
,
pEntry
->
consumers
,
(
FEncode
)
tEncodeSMqSubActionLogEntry
);
return
tlen
;
}
void
*
tDecodeSMqSubActionLogEntry
(
const
void
*
buf
,
SMqSubActionLogEntry
*
pEntry
)
{
buf
=
taosDecodeFixedI32
(
buf
,
&
pEntry
->
epoch
);
buf
=
taosDecodeArray
(
buf
,
&
pEntry
->
consumers
,
(
FDecode
)
tDecodeSMqSubActionLogEntry
,
sizeof
(
SMqSubActionLogEntry
));
...
...
source/dnode/mnode/impl/src/mndInfoSchema.c
浏览文件 @
19ec7bca
...
...
@@ -199,23 +199,6 @@ static const SInfosTableSchema vgroupsSchema[] = {
{.
name
=
"file_size"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
};
static
const
SInfosTableSchema
consumerSchema
[]
=
{
{.
name
=
"client_id"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"group_id"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"pid"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"status"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
// ep
// up time
// topics
};
static
const
SInfosTableSchema
subscribeSchema
[]
=
{
{.
name
=
"topic_name"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"group_id"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"vgroup_id"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"client_id"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
};
static
const
SInfosTableSchema
smaSchema
[]
=
{
{.
name
=
"sma_name"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_VARCHAR
},
{.
name
=
"create_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
...
...
@@ -282,8 +265,6 @@ static const SInfosTableMeta infosMeta[] = {
{
TSDB_INS_TABLE_USER_USERS
,
userUsersSchema
,
tListLen
(
userUsersSchema
)},
{
TSDB_INS_TABLE_LICENCES
,
grantsSchema
,
tListLen
(
grantsSchema
)},
{
TSDB_INS_TABLE_VGROUPS
,
vgroupsSchema
,
tListLen
(
vgroupsSchema
)},
{
TSDB_INS_TABLE_CONSUMERS
,
consumerSchema
,
tListLen
(
consumerSchema
)},
{
TSDB_INS_TABLE_SUBSCRIBES
,
subscribeSchema
,
tListLen
(
subscribeSchema
)},
{
TSDB_INS_TABLE_TRANS
,
transSchema
,
tListLen
(
transSchema
)},
{
TSDB_INS_TABLE_SMAS
,
smaSchema
,
tListLen
(
smaSchema
)},
{
TSDB_INS_TABLE_CONFIGS
,
configSchema
,
tListLen
(
configSchema
)},
...
...
source/dnode/mnode/impl/src/mndPerfSchema.c
浏览文件 @
19ec7bca
...
...
@@ -49,13 +49,15 @@ static const SPerfsTableSchema topicSchema[] = {
static
const
SPerfsTableSchema
consumerSchema
[]
=
{
{.
name
=
"consumer_id"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_BIGINT
},
{.
name
=
"app_id"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"group_id"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"status"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"app_id"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"status"
,
.
bytes
=
20
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"topics"
,
.
bytes
=
TSDB_SHOW_LIST_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"pid"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"end_point"
,
.
bytes
=
TSDB_EP_LEN
+
VARSTR_HEADER_SIZE
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"up_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"subscribe_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
{.
name
=
"rebalance_time"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_TIMESTAMP
},
};
static
const
SPerfsTableSchema
subscriptionSchema
[]
=
{
...
...
@@ -63,6 +65,12 @@ static const SPerfsTableSchema subscriptionSchema[] = {
{.
name
=
"group_id"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"vgroup_id"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"consumer_id"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_BIGINT
},
};
static
const
SPerfsTableSchema
offsetSchema
[]
=
{
{.
name
=
"topic_name"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"group_id"
,
.
bytes
=
SYSTABLE_SCH_TABLE_NAME_LEN
,
.
type
=
TSDB_DATA_TYPE_BINARY
},
{.
name
=
"vgroup_id"
,
.
bytes
=
4
,
.
type
=
TSDB_DATA_TYPE_INT
},
{.
name
=
"committed_offset"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_BIGINT
},
{.
name
=
"current_offset"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_BIGINT
},
{.
name
=
"skip_log_cnt"
,
.
bytes
=
8
,
.
type
=
TSDB_DATA_TYPE_BIGINT
},
...
...
@@ -74,6 +82,7 @@ static const SPerfsTableMeta perfsMeta[] = {
{
TSDB_PERFS_TABLE_TOPICS
,
topicSchema
,
tListLen
(
topicSchema
)},
{
TSDB_PERFS_TABLE_CONSUMERS
,
consumerSchema
,
tListLen
(
consumerSchema
)},
{
TSDB_PERFS_TABLE_SUBSCRIPTIONS
,
subscriptionSchema
,
tListLen
(
subscriptionSchema
)},
{
TSDB_PERFS_TABLE_OFFSETS
,
offsetSchema
,
tListLen
(
offsetSchema
)},
};
// connection/application/
...
...
source/util/src/tarray.c
浏览文件 @
19ec7bca
...
...
@@ -482,3 +482,22 @@ void taosArraySortPWithExt(SArray* pArray, __ext_compar_fn_t fn, const void* par
taosArrayGetSize
(
pArray
)
>
8
?
taosArrayQuickSort
(
pArray
,
fn
,
param
)
:
taosArrayInsertSort
(
pArray
,
fn
,
param
);
}
// TODO(yihaoDeng) add order array<type>
//
char
*
taosShowStrArray
(
const
SArray
*
pArray
)
{
int32_t
sz
=
pArray
->
size
;
int32_t
tlen
=
0
;
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
tlen
+=
strlen
(
taosArrayGetP
(
pArray
,
i
))
+
1
;
}
char
*
res
=
taosMemoryCalloc
(
1
,
tlen
);
char
*
buf
=
res
;
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
char
*
str
=
taosArrayGetP
(
pArray
,
i
);
int32_t
len
=
strlen
(
str
);
memcpy
(
buf
,
str
,
len
);
buf
+=
len
;
if
(
i
!=
sz
-
1
)
*
buf
=
','
;
}
return
res
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录