Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
98f62165
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
98f62165
编写于
6月 01, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(tmq): tq meta
上级
6874016b
变更
6
隐藏空白更改
内联
并排
Showing
6 changed file
with
151 addition
and
129 deletion
+151
-129
example/src/tmq.c
example/src/tmq.c
+0
-4
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+4
-3
source/dnode/vnode/src/inc/tq.h
source/dnode/vnode/src/inc/tq.h
+7
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+4
-122
source/dnode/vnode/src/tq/tqMeta.c
source/dnode/vnode/src/tq/tqMeta.c
+134
-0
source/dnode/vnode/src/tq/tqPush.c
source/dnode/vnode/src/tq/tqPush.c
+2
-0
未找到文件。
example/src/tmq.c
浏览文件 @
98f62165
...
...
@@ -165,7 +165,6 @@ tmq_t* build_consumer() {
tmq_conf_set
(
conf
,
"group.id"
,
"tg2"
);
tmq_conf_set
(
conf
,
"td.connect.user"
,
"root"
);
tmq_conf_set
(
conf
,
"td.connect.pass"
,
"taosdata"
);
/*tmq_conf_set(conf, "td.connect.db", "abc1");*/
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
tmq_conf_set
(
conf
,
"enable.auto.commit"
,
"false"
);
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
...
...
@@ -191,7 +190,6 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
return
;
}
int32_t
cnt
=
0
;
/*clock_t startTime = clock();*/
while
(
running
)
{
TAOS_RES
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
0
);
if
(
tmqmessage
)
{
...
...
@@ -204,8 +202,6 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topics) {
/*break;*/
}
}
/*clock_t endTime = clock();*/
/*printf("log cnt: %d %f s\n", cnt, (double)(endTime - startTime) / CLOCKS_PER_SEC);*/
err
=
tmq_consumer_close
(
tmq
);
if
(
err
)
...
...
source/dnode/vnode/CMakeLists.txt
浏览文件 @
98f62165
...
...
@@ -52,10 +52,11 @@ target_sources(
# tq
"src/tq/tq.c"
"src/tq/tqExec.c"
"src/tq/tqCommit.c"
"src/tq/tqOffset.c"
"src/tq/tqPush.c"
"src/tq/tqMeta.c"
"src/tq/tqRead.c"
"src/tq/tqOffset.c"
#"src/tq/tqPush.c"
#"src/tq/tqCommit.c"
)
target_include_directories
(
vnode
...
...
source/dnode/vnode/src/inc/tq.h
浏览文件 @
98f62165
...
...
@@ -168,6 +168,13 @@ int64_t tqFetchLog(STQ* pTq, STqHandle* pHandle, int64_t* fetchOffset, SWalHead*
int32_t
tqDataExec
(
STQ
*
pTq
,
STqExecHandle
*
pExec
,
SSubmitReq
*
pReq
,
SMqDataBlkRsp
*
pRsp
,
int32_t
workerId
);
// tqMeta
int32_t
tqMetaOpen
(
STQ
*
pTq
);
int32_t
tqMetaClose
(
STQ
*
pTq
);
int32_t
tqMetaSaveHandle
(
STQ
*
pTq
,
const
char
*
key
,
const
STqHandle
*
pHandle
);
int32_t
tqMetaDeleteHandle
(
STQ
*
pTq
,
const
char
*
key
);
// tqOffset
STqOffsetStore
*
STqOffsetOpen
(
STqOffsetCfg
*
);
void
STqOffsetClose
(
STqOffsetStore
*
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
98f62165
...
...
@@ -14,7 +14,6 @@
*/
#include "tq.h"
#include "tdbInt.h"
int32_t
tqInit
()
{
int8_t
old
;
...
...
@@ -47,51 +46,6 @@ void tqCleanUp() {
}
}
int
tqExecKeyCompare
(
const
void
*
pKey1
,
int32_t
kLen1
,
const
void
*
pKey2
,
int32_t
kLen2
)
{
return
strcmp
(
pKey1
,
pKey2
);
}
int32_t
tqStoreHandle
(
STQ
*
pTq
,
const
char
*
key
,
const
STqHandle
*
pHandle
)
{
int32_t
code
;
int32_t
vlen
;
tEncodeSize
(
tEncodeSTqHandle
,
pHandle
,
vlen
,
code
);
ASSERT
(
code
==
0
);
void
*
buf
=
taosMemoryCalloc
(
1
,
vlen
);
if
(
buf
==
NULL
)
{
ASSERT
(
0
);
}
SEncoder
encoder
;
tEncoderInit
(
&
encoder
,
buf
,
vlen
);
if
(
tEncodeSTqHandle
(
&
encoder
,
pHandle
)
<
0
)
{
ASSERT
(
0
);
}
TXN
txn
;
if
(
tdbTxnOpen
(
&
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbBegin
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbTbUpsert
(
pTq
->
pExecStore
,
key
,
(
int
)
strlen
(
key
),
buf
,
vlen
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbCommit
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
tEncoderClear
(
&
encoder
);
taosMemoryFree
(
buf
);
return
0
;
}
STQ
*
tqOpen
(
const
char
*
path
,
SVnode
*
pVnode
,
SWal
*
pWal
)
{
STQ
*
pTq
=
taosMemoryMalloc
(
sizeof
(
STQ
));
if
(
pTq
==
NULL
)
{
...
...
@@ -108,60 +62,7 @@ STQ* tqOpen(const char* path, SVnode* pVnode, SWal* pWal) {
pTq
->
pushMgr
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
true
,
HASH_ENTRY_LOCK
);
if
(
tdbOpen
(
path
,
16
*
1024
,
1
,
&
pTq
->
pMetaStore
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbTbOpen
(
"handles"
,
-
1
,
-
1
,
tqExecKeyCompare
,
pTq
->
pMetaStore
,
&
pTq
->
pExecStore
)
<
0
)
{
ASSERT
(
0
);
}
TXN
txn
;
if
(
tdbTxnOpen
(
&
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
0
)
<
0
)
{
ASSERT
(
0
);
}
TBC
*
pCur
;
if
(
tdbTbcOpen
(
pTq
->
pExecStore
,
&
pCur
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
void
*
pKey
;
int
kLen
;
void
*
pVal
;
int
vLen
;
tdbTbcMoveToFirst
(
pCur
);
SDecoder
decoder
;
while
(
tdbTbcNext
(
pCur
,
&
pKey
,
&
kLen
,
&
pVal
,
&
vLen
)
==
0
)
{
STqHandle
handle
;
tDecoderInit
(
&
decoder
,
(
uint8_t
*
)
pVal
,
vLen
);
tDecodeSTqHandle
(
&
decoder
,
&
handle
);
handle
.
pWalReader
=
walOpenReadHandle
(
pTq
->
pVnode
->
pWal
);
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
handle
.
execHandle
.
pExecReader
[
i
]
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
}
if
(
handle
.
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
SReadHandle
reader
=
{
.
reader
=
handle
.
execHandle
.
pExecReader
[
i
],
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
};
handle
.
execHandle
.
exec
.
execCol
.
task
[
i
]
=
qCreateStreamExecTaskInfo
(
handle
.
execHandle
.
exec
.
execCol
.
qmsg
,
&
reader
);
ASSERT
(
handle
.
execHandle
.
exec
.
execCol
.
task
[
i
]);
}
}
else
{
handle
.
execHandle
.
exec
.
execDb
.
pFilterOutTbUid
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
}
taosHashPut
(
pTq
->
handles
,
pKey
,
kLen
,
&
handle
,
sizeof
(
STqHandle
));
}
if
(
tdbTxnClose
(
&
txn
)
<
0
)
{
if
(
tqMetaOpen
(
pTq
)
<
0
)
{
ASSERT
(
0
);
}
...
...
@@ -174,7 +75,7 @@ void tqClose(STQ* pTq) {
taosHashCleanup
(
pTq
->
handles
);
taosHashCleanup
(
pTq
->
pStreamTasks
);
taosHashCleanup
(
pTq
->
pushMgr
);
t
dbClose
(
pTq
->
pMetaStore
);
t
qMetaClose
(
pTq
);
taosMemoryFree
(
pTq
);
}
// TODO
...
...
@@ -256,9 +157,6 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
taosWLockLatch
(
&
pHandle
->
pushHandle
.
lock
);
/*SRpcHandleInfo* pInfo = atomic_load_ptr(&pHandle->pushHandle.pInfo);*/
/*ASSERT(pInfo);*/
SMqDataBlkRsp
rsp
=
{
0
};
rsp
.
reqOffset
=
pHandle
->
pushHandle
.
reqOffset
;
rsp
.
blockData
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
...
...
@@ -303,7 +201,6 @@ int32_t tqPushMsgNew(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_
};
tmsgSendRsp
(
&
resp
);
/*atomic_store_ptr(&pHandle->pushHandle.pInfo, NULL);*/
memset
(
&
pHandle
->
pushHandle
.
info
,
0
,
sizeof
(
SRpcHandleInfo
));
taosWUnLockLatch
(
&
pHandle
->
pushHandle
.
lock
);
...
...
@@ -508,24 +405,9 @@ int32_t tqProcessVgDeleteReq(STQ* pTq, char* msg, int32_t msgLen) {
int32_t
code
=
taosHashRemove
(
pTq
->
handles
,
pReq
->
subKey
,
strlen
(
pReq
->
subKey
));
ASSERT
(
code
==
0
);
TXN
txn
;
if
(
tdbTxnOpen
(
&
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbBegin
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbTbDelete
(
pTq
->
pExecStore
,
pReq
->
subKey
,
(
int
)
strlen
(
pReq
->
subKey
),
&
txn
)
<
0
)
{
/*ASSERT(0);*/
}
if
(
tdbCommit
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
if
(
tqMetaDeleteHandle
(
pTq
,
pReq
->
subKey
)
<
0
)
{
ASSERT
(
0
);
}
return
0
;
}
...
...
@@ -583,7 +465,7 @@ int32_t tqProcessVgChangeReq(STQ* pTq, char* msg, int32_t msgLen) {
atomic_add_fetch_32
(
&
pHandle
->
epoch
,
1
);
}
if
(
tq
Stor
eHandle
(
pTq
,
req
.
subKey
,
pHandle
)
<
0
)
{
if
(
tq
MetaSav
eHandle
(
pTq
,
req
.
subKey
,
pHandle
)
<
0
)
{
// TODO
}
return
0
;
...
...
source/dnode/vnode/src/tq/tqMeta.c
浏览文件 @
98f62165
...
...
@@ -12,3 +12,137 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tdbInt.h"
#include "tq.h"
int
tqExecKeyCompare
(
const
void
*
pKey1
,
int32_t
kLen1
,
const
void
*
pKey2
,
int32_t
kLen2
)
{
return
strcmp
(
pKey1
,
pKey2
);
}
int32_t
tqMetaOpen
(
STQ
*
pTq
)
{
if
(
tdbOpen
(
pTq
->
path
,
16
*
1024
,
1
,
&
pTq
->
pMetaStore
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbTbOpen
(
"handles"
,
-
1
,
-
1
,
tqExecKeyCompare
,
pTq
->
pMetaStore
,
&
pTq
->
pExecStore
)
<
0
)
{
ASSERT
(
0
);
}
TXN
txn
;
if
(
tdbTxnOpen
(
&
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
0
)
<
0
)
{
ASSERT
(
0
);
}
TBC
*
pCur
;
if
(
tdbTbcOpen
(
pTq
->
pExecStore
,
&
pCur
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
void
*
pKey
;
int
kLen
;
void
*
pVal
;
int
vLen
;
tdbTbcMoveToFirst
(
pCur
);
SDecoder
decoder
;
while
(
tdbTbcNext
(
pCur
,
&
pKey
,
&
kLen
,
&
pVal
,
&
vLen
)
==
0
)
{
STqHandle
handle
;
tDecoderInit
(
&
decoder
,
(
uint8_t
*
)
pVal
,
vLen
);
tDecodeSTqHandle
(
&
decoder
,
&
handle
);
handle
.
pWalReader
=
walOpenReadHandle
(
pTq
->
pVnode
->
pWal
);
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
handle
.
execHandle
.
pExecReader
[
i
]
=
tqInitSubmitMsgScanner
(
pTq
->
pVnode
->
pMeta
);
}
if
(
handle
.
execHandle
.
subType
==
TOPIC_SUB_TYPE__COLUMN
)
{
for
(
int32_t
i
=
0
;
i
<
5
;
i
++
)
{
SReadHandle
reader
=
{
.
reader
=
handle
.
execHandle
.
pExecReader
[
i
],
.
meta
=
pTq
->
pVnode
->
pMeta
,
.
pMsgCb
=
&
pTq
->
pVnode
->
msgCb
,
};
handle
.
execHandle
.
exec
.
execCol
.
task
[
i
]
=
qCreateStreamExecTaskInfo
(
handle
.
execHandle
.
exec
.
execCol
.
qmsg
,
&
reader
);
ASSERT
(
handle
.
execHandle
.
exec
.
execCol
.
task
[
i
]);
}
}
else
{
handle
.
execHandle
.
exec
.
execDb
.
pFilterOutTbUid
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_BIGINT
),
false
,
HASH_NO_LOCK
);
}
taosHashPut
(
pTq
->
handles
,
pKey
,
kLen
,
&
handle
,
sizeof
(
STqHandle
));
}
if
(
tdbTxnClose
(
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
return
0
;
}
int32_t
tqMetaClose
(
STQ
*
pTq
)
{
tdbClose
(
pTq
->
pMetaStore
);
return
0
;
}
int32_t
tqMetaSaveHandle
(
STQ
*
pTq
,
const
char
*
key
,
const
STqHandle
*
pHandle
)
{
int32_t
code
;
int32_t
vlen
;
tEncodeSize
(
tEncodeSTqHandle
,
pHandle
,
vlen
,
code
);
ASSERT
(
code
==
0
);
void
*
buf
=
taosMemoryCalloc
(
1
,
vlen
);
if
(
buf
==
NULL
)
{
ASSERT
(
0
);
}
SEncoder
encoder
;
tEncoderInit
(
&
encoder
,
buf
,
vlen
);
if
(
tEncodeSTqHandle
(
&
encoder
,
pHandle
)
<
0
)
{
ASSERT
(
0
);
}
TXN
txn
;
if
(
tdbTxnOpen
(
&
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbBegin
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbTbUpsert
(
pTq
->
pExecStore
,
key
,
(
int
)
strlen
(
key
),
buf
,
vlen
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbCommit
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
tEncoderClear
(
&
encoder
);
taosMemoryFree
(
buf
);
return
0
;
}
int32_t
tqMetaDeleteHandle
(
STQ
*
pTq
,
const
char
*
key
)
{
TXN
txn
;
if
(
tdbTxnOpen
(
&
txn
,
0
,
tdbDefaultMalloc
,
tdbDefaultFree
,
NULL
,
TDB_TXN_WRITE
|
TDB_TXN_READ_UNCOMMITTED
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbBegin
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
if
(
tdbTbDelete
(
pTq
->
pExecStore
,
key
,
(
int
)
strlen
(
key
),
&
txn
)
<
0
)
{
/*ASSERT(0);*/
}
if
(
tdbCommit
(
pTq
->
pMetaStore
,
&
txn
)
<
0
)
{
ASSERT
(
0
);
}
return
0
;
}
source/dnode/vnode/src/tq/tqPush.c
浏览文件 @
98f62165
...
...
@@ -12,3 +12,5 @@
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "tq.h"
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录