Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
f2048b00
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1184
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
提交
f2048b00
编写于
6月 02, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix(tmq): subscribe stb
上级
e1fd4a66
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
62 addition
and
81 deletion
+62
-81
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+14
-21
source/dnode/mnode/impl/src/mndDef.c
source/dnode/mnode/impl/src/mndDef.c
+3
-9
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+2
-3
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+4
-29
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+9
-8
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+9
-7
source/dnode/vnode/src/tsdb/tsdbRead.c
source/dnode/vnode/src/tsdb/tsdbRead.c
+21
-4
未找到文件。
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
f2048b00
...
...
@@ -403,19 +403,15 @@ int32_t tEncodeSMqOffsetObj(void** buf, const SMqOffsetObj* pOffset);
void
*
tDecodeSMqOffsetObj
(
void
*
buf
,
SMqOffsetObj
*
pOffset
);
typedef
struct
{
char
name
[
TSDB_TOPIC_FNAME_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
int64_t
createTime
;
int64_t
updateTime
;
int64_t
uid
;
int64_t
dbUid
;
int32_t
version
;
int8_t
subType
;
// column, db or stable
// int8_t withTbName;
// int8_t withSchema;
// int8_t withTag;
char
name
[
TSDB_TOPIC_FNAME_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
int64_t
createTime
;
int64_t
updateTime
;
int64_t
uid
;
int64_t
dbUid
;
int32_t
version
;
int8_t
subType
;
// column, db or stable
SRWLatch
lock
;
int32_t
consumerCnt
;
int32_t
sqlLen
;
int32_t
astLen
;
char
*
sql
;
...
...
@@ -423,7 +419,6 @@ typedef struct {
char
*
physicalPlan
;
SSchemaWrapper
schema
;
int64_t
stbUid
;
// int32_t refConsumerCnt;
}
SMqTopicObj
;
typedef
struct
{
...
...
@@ -477,14 +472,12 @@ int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp);
void
*
tDecodeSMqConsumerEp
(
const
void
*
buf
,
SMqConsumerEp
*
pEp
);
typedef
struct
{
char
key
[
TSDB_SUBSCRIBE_KEY_LEN
];
SRWLatch
lock
;
int64_t
dbUid
;
int32_t
vgNum
;
int8_t
subType
;
// int8_t withTbName;
// int8_t withSchema;
// int8_t withTag;
char
key
[
TSDB_SUBSCRIBE_KEY_LEN
];
SRWLatch
lock
;
int64_t
dbUid
;
int32_t
vgNum
;
int8_t
subType
;
int64_t
stbUid
;
SHashObj
*
consumerHash
;
// consumerId -> SMqConsumerEp
SArray
*
unassignedVgs
;
// SArray<SMqVgEp*>
}
SMqSubscribeObj
;
...
...
source/dnode/mnode/impl/src/mndDef.c
浏览文件 @
f2048b00
...
...
@@ -395,10 +395,8 @@ SMqSubscribeObj *tCloneSubscribeObj(const SMqSubscribeObj *pSub) {
taosInitRWLatch
(
&
pSubNew
->
lock
);
pSubNew
->
dbUid
=
pSub
->
dbUid
;
pSubNew
->
stbUid
=
pSub
->
stbUid
;
pSubNew
->
subType
=
pSub
->
subType
;
/*pSubNew->withTbName = pSub->withTbName;*/
/*pSubNew->withSchema = pSub->withSchema;*/
/*pSubNew->withTag = pSub->withTag;*/
pSubNew
->
vgNum
=
pSub
->
vgNum
;
pSubNew
->
consumerHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
...
...
@@ -431,9 +429,7 @@ int32_t tEncodeSubscribeObj(void **buf, const SMqSubscribeObj *pSub) {
tlen
+=
taosEncodeFixedI64
(
buf
,
pSub
->
dbUid
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pSub
->
vgNum
);
tlen
+=
taosEncodeFixedI8
(
buf
,
pSub
->
subType
);
/*tlen += taosEncodeFixedI8(buf, pSub->withTbName);*/
/*tlen += taosEncodeFixedI8(buf, pSub->withSchema);*/
/*tlen += taosEncodeFixedI8(buf, pSub->withTag);*/
tlen
+=
taosEncodeFixedI64
(
buf
,
pSub
->
stbUid
);
void
*
pIter
=
NULL
;
int32_t
sz
=
taosHashGetSize
(
pSub
->
consumerHash
);
...
...
@@ -458,9 +454,7 @@ void *tDecodeSubscribeObj(const void *buf, SMqSubscribeObj *pSub) {
buf
=
taosDecodeFixedI64
(
buf
,
&
pSub
->
dbUid
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pSub
->
vgNum
);
buf
=
taosDecodeFixedI8
(
buf
,
&
pSub
->
subType
);
/*buf = taosDecodeFixedI8(buf, &pSub->withTbName);*/
/*buf = taosDecodeFixedI8(buf, &pSub->withSchema);*/
/*buf = taosDecodeFixedI8(buf, &pSub->withTag);*/
buf
=
taosDecodeFixedI64
(
buf
,
&
pSub
->
stbUid
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
f2048b00
...
...
@@ -93,10 +93,8 @@ static SMqSubscribeObj *mndCreateSub(SMnode *pMnode, const SMqTopicObj *pTopic,
return
NULL
;
}
pSub
->
dbUid
=
pTopic
->
dbUid
;
pSub
->
stbUid
=
pTopic
->
stbUid
;
pSub
->
subType
=
pTopic
->
subType
;
/*pSub->withTbName = pTopic->withTbName;*/
/*pSub->withSchema = pTopic->withSchema;*/
/*pSub->withTag = pTopic->withTag;*/
ASSERT
(
pSub
->
unassignedVgs
->
size
==
0
);
ASSERT
(
taosHashGetSize
(
pSub
->
consumerHash
)
==
0
);
...
...
@@ -121,6 +119,7 @@ static int32_t mndBuildSubChangeReq(void **pBuf, int32_t *pLen, const SMqSubscri
req
.
vgId
=
pRebVg
->
pVgEp
->
vgId
;
req
.
qmsg
=
pRebVg
->
pVgEp
->
qmsg
;
req
.
subType
=
pSub
->
subType
;
req
.
suid
=
pSub
->
stbUid
;
strncpy
(
req
.
subKey
,
pSub
->
key
,
TSDB_SUBSCRIBE_KEY_LEN
);
int32_t
tlen
=
sizeof
(
SMsgHead
)
+
tEncodeSMqRebVgReq
(
NULL
,
&
req
);
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
f2048b00
...
...
@@ -96,11 +96,8 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT64
(
pRaw
,
dataPos
,
pTopic
->
dbUid
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
version
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT8
(
pRaw
,
dataPos
,
pTopic
->
subType
,
TOPIC_ENCODE_OVER
);
/*SDB_SET_INT8(pRaw, dataPos, pTopic->withTbName, TOPIC_ENCODE_OVER);*/
/*SDB_SET_INT8(pRaw, dataPos, pTopic->withSchema, TOPIC_ENCODE_OVER);*/
/*SDB_SET_INT8(pRaw, dataPos, pTopic->withTag, TOPIC_ENCODE_OVER);*/
SDB_SET_INT
32
(
pRaw
,
dataPos
,
pTopic
->
consumerCnt
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT
64
(
pRaw
,
dataPos
,
pTopic
->
stbUid
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
sqlLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
sql
,
pTopic
->
sqlLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
pTopic
->
astLen
,
TOPIC_ENCODE_OVER
);
...
...
@@ -122,8 +119,6 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_BINARY
(
pRaw
,
dataPos
,
swBuf
,
schemaLen
,
TOPIC_ENCODE_OVER
);
}
/*SDB_SET_INT32(pRaw, dataPos, pTopic->refConsumerCnt, TOPIC_ENCODE_OVER);*/
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
MND_TOPIC_RESERVE_SIZE
,
TOPIC_ENCODE_OVER
);
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
TOPIC_ENCODE_OVER
);
...
...
@@ -168,12 +163,8 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pTopic
->
dbUid
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pTopic
->
version
,
TOPIC_DECODE_OVER
);
SDB_GET_INT8
(
pRaw
,
dataPos
,
&
pTopic
->
subType
,
TOPIC_DECODE_OVER
);
/*SDB_GET_INT8(pRaw, dataPos, &pTopic->withTbName, TOPIC_DECODE_OVER);*/
/*SDB_GET_INT8(pRaw, dataPos, &pTopic->withSchema, TOPIC_DECODE_OVER);*/
/*SDB_GET_INT8(pRaw, dataPos, &pTopic->withTag, TOPIC_DECODE_OVER);*/
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pTopic
->
consumerCnt
,
TOPIC_DECODE_OVER
);
SDB_GET_INT64
(
pRaw
,
dataPos
,
&
pTopic
->
stbUid
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
pTopic
->
sqlLen
,
TOPIC_DECODE_OVER
);
pTopic
->
sql
=
taosMemoryCalloc
(
pTopic
->
sqlLen
,
sizeof
(
char
));
if
(
pTopic
->
sql
==
NULL
)
{
...
...
@@ -222,8 +213,6 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
pTopic
->
schema
.
pSchema
=
NULL
;
}
/*SDB_GET_INT32(pRaw, dataPos, &pTopic->refConsumerCnt, TOPIC_DECODE_OVER);*/
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_TOPIC_RESERVE_SIZE
,
TOPIC_DECODE_OVER
);
terrno
=
TSDB_CODE_SUCCESS
;
...
...
@@ -254,8 +243,6 @@ static int32_t mndTopicActionUpdate(SSdb *pSdb, SMqTopicObj *pOldTopic, SMqTopic
atomic_exchange_64
(
&
pOldTopic
->
updateTime
,
pNewTopic
->
updateTime
);
atomic_exchange_32
(
&
pOldTopic
->
version
,
pNewTopic
->
version
);
/*atomic_store_32(&pOldTopic->refConsumerCnt, pNewTopic->refConsumerCnt);*/
/*taosWLockLatch(&pOldTopic->lock);*/
// TODO handle update
...
...
@@ -278,18 +265,6 @@ void mndReleaseTopic(SMnode *pMnode, SMqTopicObj *pTopic) {
sdbRelease
(
pSdb
,
pTopic
);
}
#if 0
static SDbObj *mndAcquireDbByTopic(SMnode *pMnode, char *topicName) {
SName name = {0};
tNameFromString(&name, topicName, T_NAME_ACCT | T_NAME_DB | T_NAME_TABLE);
char db[TSDB_TOPIC_FNAME_LEN] = {0};
tNameGetFullDbName(&name, db);
return mndAcquireDb(pMnode, db);
}
#endif
static
SDDropTopicReq
*
mndBuildDropTopicMsg
(
SMnode
*
pMnode
,
SVgObj
*
pVgroup
,
SMqTopicObj
*
pTopic
)
{
int32_t
contLen
=
sizeof
(
SDDropTopicReq
);
...
...
@@ -341,8 +316,6 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
if
(
pCreate
->
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
topicObj
.
ast
=
strdup
(
pCreate
->
ast
);
topicObj
.
astLen
=
strlen
(
pCreate
->
ast
)
+
1
;
/*topicObj.withTbName = pCreate->withTbName;*/
/*topicObj.withSchema = pCreate->withSchema;*/
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
pCreate
->
ast
,
&
pAst
)
!=
0
)
{
...
...
@@ -376,6 +349,8 @@ static int32_t mndCreateTopic(SMnode *pMnode, SRpcMsg *pReq, SCMCreateTopicReq *
return
-
1
;
}
}
else
if
(
pCreate
->
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
SStbObj
*
pStb
=
mndAcquireStb
(
pMnode
,
pCreate
->
subStbName
);
topicObj
.
stbUid
=
pStb
->
uid
;
}
/*} else if (pCreate->subType == TOPIC_SUB_TYPE__DB) {*/
/*topicObj.ast = NULL;*/
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
f2048b00
...
...
@@ -85,7 +85,7 @@ typedef struct SMetaFltParam {
tb_uid_t
suid
;
int16_t
cid
;
int16_t
type
;
char
*
val
;
char
*
val
;
bool
reverse
;
int
(
*
filterFunc
)(
void
*
a
,
void
*
b
,
int16_t
type
);
...
...
@@ -119,7 +119,8 @@ tsdbReaderT tsdbQueryCacheLast(SVnode *pVnode, SQueryTableDataCond *pCond, STab
int32_t
tsdbGetFileBlocksDistInfo
(
tsdbReaderT
*
pReader
,
STableBlockDistInfo
*
pTableBlockInfo
);
bool
isTsdbCacheLastRow
(
tsdbReaderT
*
pReader
);
int32_t
tsdbGetAllTableList
(
SMeta
*
pMeta
,
uint64_t
uid
,
SArray
*
list
);
void
*
tsdbGetIdx
(
SMeta
*
pMeta
);
int32_t
tsdbGetCtbIdList
(
SMeta
*
pMeta
,
int64_t
suid
,
SArray
*
list
);
void
*
tsdbGetIdx
(
SMeta
*
pMeta
);
int64_t
tsdbGetNumOfRowsInMemTable
(
tsdbReaderT
*
pHandle
);
bool
tsdbNextDataBlock
(
tsdbReaderT
pTsdbReadHandle
);
...
...
@@ -192,7 +193,7 @@ struct SMetaEntry {
int64_t
version
;
int8_t
type
;
tb_uid_t
uid
;
char
*
name
;
char
*
name
;
union
{
struct
{
SSchemaWrapper
schemaRow
;
...
...
@@ -220,17 +221,17 @@ struct SMetaEntry {
struct
SMetaReader
{
int32_t
flags
;
SMeta
*
pMeta
;
SMeta
*
pMeta
;
SDecoder
coder
;
SMetaEntry
me
;
void
*
pBuf
;
void
*
pBuf
;
int32_t
szBuf
;
};
struct
SMTbCursor
{
TBC
*
pDbc
;
void
*
pKey
;
void
*
pVal
;
TBC
*
pDbc
;
void
*
pKey
;
void
*
pVal
;
int32_t
kLen
;
int32_t
vLen
;
SMetaReader
mr
;
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
f2048b00
...
...
@@ -260,9 +260,6 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle
->
epoch
=
-
1
;
pHandle
->
execHandle
.
subType
=
req
.
subType
;
/*pExec->withTbName = req.withTbName;*/
/*pExec->withSchema = req.withSchema;*/
/*pExec->withTag = req.withTag;*/
pHandle
->
pWalReader
=
walOpenReadHandle
(
pTq
->
pVnode
->
pWal
);
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
...
...
@@ -285,13 +282,18 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
pHandle
->
execHandle
.
exec
.
execDb
.
pFilterOutTbUid
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
}
else
if
(
pHandle
->
execHandle
.
subType
==
TOPIC_SUB_TYPE__TABLE
)
{
int64_t
suid
=
0
;
/*pHandle->execHandle.exec.execTb.suid = req.suid;*/
SArray
*
tbUidList
=
taosArrayInit
(
0
,
sizeof
(
int16_t
));
tsdbGetAllTableList
(
pTq
->
pVnode
->
pMeta
,
suid
,
tbUidList
);
pHandle
->
execHandle
.
exec
.
execTb
.
suid
=
req
.
suid
;
SArray
*
tbUidList
=
taosArrayInit
(
0
,
sizeof
(
int64_t
));
tsdbGetCtbIdList
(
pTq
->
pVnode
->
pMeta
,
req
.
suid
,
tbUidList
);
tqDebug
(
"vg %d, tq try get suid: %ld"
,
pTq
->
pVnode
->
config
.
vgId
,
req
.
suid
);
for
(
int32_t
i
=
0
;
i
<
taosArrayGetSize
(
tbUidList
);
i
++
)
{
int64_t
tbUid
=
*
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
tqDebug
(
"vg %d, idx %d, uid: %ld"
,
pTq
->
pVnode
->
config
.
vgId
,
i
,
tbUid
);
}
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
tqReadHandleSetTbUidList
(
pHandle
->
execHandle
.
pExecReader
[
i
],
tbUidList
);
}
taosArrayDestroy
(
tbUidList
);
}
taosHashPut
(
pTq
->
handles
,
req
.
subKey
,
strlen
(
req
.
subKey
),
pHandle
,
sizeof
(
STqHandle
));
}
else
{
...
...
source/dnode/vnode/src/tsdb/tsdbRead.c
浏览文件 @
f2048b00
...
...
@@ -13,8 +13,8 @@
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "vnode.h"
#include "tsdb.h"
#include "vnode.h"
#define EXTRA_BYTES 2
#define ASCENDING_TRAVERSE(o) (o == TSDB_ORDER_ASC)
...
...
@@ -327,8 +327,8 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableData
if
(
updateTs
)
{
tsdbDebug
(
"%p update the query time window, old:%"
PRId64
" - %"
PRId64
", new:%"
PRId64
" - %"
PRId64
", %s"
,
pTsdbReadHandle
,
pCond
->
twindows
[
tWinIdx
].
skey
,
pCond
->
twindows
[
tWinIdx
].
ekey
,
pTsdbReadHandle
->
window
.
skey
,
pTsdbReadHandle
->
window
.
ekey
,
pTsdbReadHandle
->
idStr
);
pTsdbReadHandle
,
pCond
->
twindows
[
tWinIdx
].
skey
,
pCond
->
twindows
[
tWinIdx
].
ekey
,
pTsdbReadHandle
->
window
.
skey
,
pTsdbReadHandle
->
window
.
ekey
,
pTsdbReadHandle
->
idStr
);
}
}
...
...
@@ -586,7 +586,8 @@ void tsdbResetReadHandle(tsdbReaderT queryHandle, SQueryTableDataCond* pCond, in
resetCheckInfo
(
pTsdbReadHandle
);
}
void
tsdbResetQueryHandleForNewTable
(
tsdbReaderT
queryHandle
,
SQueryTableDataCond
*
pCond
,
STableListInfo
*
tableList
,
int32_t
tWinIdx
)
{
void
tsdbResetQueryHandleForNewTable
(
tsdbReaderT
queryHandle
,
SQueryTableDataCond
*
pCond
,
STableListInfo
*
tableList
,
int32_t
tWinIdx
)
{
STsdbReadHandle
*
pTsdbReadHandle
=
queryHandle
;
pTsdbReadHandle
->
order
=
pCond
->
order
;
...
...
@@ -2845,6 +2846,22 @@ int32_t tsdbGetAllTableList(SMeta* pMeta, uint64_t uid, SArray* list) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
tsdbGetCtbIdList
(
SMeta
*
pMeta
,
int64_t
suid
,
SArray
*
list
)
{
SMCtbCursor
*
pCur
=
metaOpenCtbCursor
(
pMeta
,
suid
);
while
(
1
)
{
tb_uid_t
id
=
metaCtbCursorNext
(
pCur
);
if
(
id
==
0
)
{
break
;
}
taosArrayPush
(
list
,
&
id
);
}
metaCloseCtbCursor
(
pCur
);
return
TSDB_CODE_SUCCESS
;
}
static
void
destroyHelper
(
void
*
param
)
{
if
(
param
==
NULL
)
{
return
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录