Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
deab8952
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1185
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
体验新版 GitCode,发现更多精彩内容 >>
未验证
提交
deab8952
编写于
4月 08, 2022
作者:
L
Liu Jicong
提交者:
GitHub
4月 08, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #11319 from taosdata/feature/tq
Feature/tq
上级
70797ef8
32cc09ae
变更
12
隐藏空白更改
内联
并排
Showing
12 changed file
with
33 addition
and
51 deletion
+33
-51
cmake/bdb_CMakeLists.txt.in.bak
cmake/bdb_CMakeLists.txt.in.bak
+0
-0
cmake/cmake.options
cmake/cmake.options
+0
-11
contrib/CMakeLists.txt
contrib/CMakeLists.txt
+3
-3
contrib/test/CMakeLists.txt
contrib/test/CMakeLists.txt
+3
-3
include/common/tmsg.h
include/common/tmsg.h
+1
-0
source/client/src/tmq.c
source/client/src/tmq.c
+3
-2
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+11
-9
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+1
-1
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+7
-12
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+2
-2
source/libs/transport/src/transCli.c
source/libs/transport/src/transCli.c
+2
-2
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+0
-6
未找到文件。
cmake/bdb_CMakeLists.txt.in
→
cmake/bdb_CMakeLists.txt.in
.bak
浏览文件 @
deab8952
文件已移动
cmake/cmake.options
浏览文件 @
deab8952
...
...
@@ -55,17 +55,6 @@ option(
OFF
)
IF(${TD_WINDOWS})
MESSAGE("Not build BDB on Windows")
ELSE ()
option(
BUILD_WITH_BDB
"If build with BerkleyDB"
ON
)
ENDIF ()
option(
BUILD_WITH_LUCENE
"If build with lucene"
...
...
contrib/CMakeLists.txt
浏览文件 @
deab8952
...
...
@@ -63,9 +63,9 @@ if(${BUILD_WITH_UV})
endif
(
${
BUILD_WITH_UV
}
)
# bdb
if
(
${
BUILD_WITH_BDB
}
)
cat
(
"
${
CMAKE_SUPPORT_DIR
}
/bdb_CMakeLists.txt.in"
${
CONTRIB_TMP_FILE
}
)
endif
(
${
BUILD_WITH_BDB
}
)
#
if(${BUILD_WITH_BDB})
#
cat("${CMAKE_SUPPORT_DIR}/bdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
#
endif(${BUILD_WITH_BDB})
# sqlite
if
(
${
BUILD_WITH_SQLITE
}
)
...
...
contrib/test/CMakeLists.txt
浏览文件 @
deab8952
...
...
@@ -7,9 +7,9 @@ if(${BUILD_WITH_LUCENE})
add_subdirectory
(
lucene
)
endif
(
${
BUILD_WITH_LUCENE
}
)
if
(
${
BUILD_WITH_BDB
}
)
add_subdirectory
(
bdb
)
endif
(
${
BUILD_WITH_BDB
}
)
#
if(${BUILD_WITH_BDB})
#
add_subdirectory(bdb)
#
endif(${BUILD_WITH_BDB})
if
(
${
BUILD_WITH_SQLITE
}
)
add_subdirectory
(
sqlite
)
...
...
include/common/tmsg.h
浏览文件 @
deab8952
...
...
@@ -2310,6 +2310,7 @@ typedef struct {
char
cgroup
[
TSDB_CGROUP_LEN
];
int64_t
currentOffset
;
uint64_t
reqId
;
char
topic
[
TSDB_TOPIC_FNAME_LEN
];
}
SMqPollReq
;
...
...
source/client/src/tmq.c
浏览文件 @
deab8952
...
...
@@ -1149,6 +1149,7 @@ SMqPollReq* tmqBuildConsumeReqImpl(tmq_t* tmq, int64_t blockingTime, SMqClientTo
pReq
->
consumerId
=
tmq
->
consumerId
;
pReq
->
epoch
=
tmq
->
epoch
;
pReq
->
currentOffset
=
reqOffset
;
pReq
->
reqId
=
generateRequestId
();
pReq
->
head
.
vgId
=
htonl
(
pVg
->
vgId
);
pReq
->
head
.
contLen
=
htonl
(
sizeof
(
SMqPollReq
));
...
...
@@ -1279,7 +1280,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
.
len
=
sizeof
(
SMqPollReq
),
.
handle
=
NULL
,
};
sendInfo
->
requestId
=
generateRequestId
()
;
sendInfo
->
requestId
=
pReq
->
reqId
;
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
param
=
pParam
;
sendInfo
->
fp
=
tmqPollCb
;
...
...
@@ -1288,7 +1289,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
int64_t
transporterId
=
0
;
/*printf("send poll\n");*/
atomic_add_fetch_32
(
&
tmq
->
waitingRequest
,
1
);
tscDebug
(
"consumer %ld send poll
: vg %d, epoch %d, req offset %ld"
,
tmq
->
consumerId
,
pVg
->
vgId
,
tmq
->
epoch
,
pVg
->
currentOffset
);
tscDebug
(
"consumer %ld send poll
to %s : vg %d, epoch %d, req offset %ld, reqId %lu"
,
tmq
->
consumerId
,
pTopic
->
topicName
,
pVg
->
vgId
,
tmq
->
epoch
,
pVg
->
currentOffset
,
pReq
->
reqId
);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
pVg
->
epSet
,
&
transporterId
,
sendInfo
);
pVg
->
pollCnt
++
;
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
deab8952
...
...
@@ -60,7 +60,7 @@ static int32_t mndProcessResetOffsetReq(SNodeMsg *pMsg);
static
int32_t
mndPersistMqSetConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqTopicObj
*
pTopic
,
const
char
*
cgroup
,
const
SMqConsumerEp
*
pConsumerEp
);
static
int32_t
mndPersistRebalanceMsg
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
);
static
int32_t
mndPersistRebalanceMsg
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
topicName
);
static
int32_t
mndPersistCancelConnReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
oldTopicName
);
int32_t
mndInitSubscribe
(
SMnode
*
pMnode
)
{
...
...
@@ -102,12 +102,13 @@ static SMqSubscribeObj *mndCreateSubscription(SMnode *pMnode, const SMqTopicObj
return
pSub
;
}
static
int32_t
mndBuildRebalanceMsg
(
void
**
pBuf
,
int32_t
*
pLen
,
const
SMqConsumerEp
*
pConsumerEp
)
{
static
int32_t
mndBuildRebalanceMsg
(
void
**
pBuf
,
int32_t
*
pLen
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
topicName
)
{
SMqMVRebReq
req
=
{
.
vgId
=
pConsumerEp
->
vgId
,
.
oldConsumerId
=
pConsumerEp
->
oldConsumerId
,
.
newConsumerId
=
pConsumerEp
->
consumerId
,
};
req
.
topic
=
strdup
(
topicName
);
int32_t
tlen
=
tEncodeSMqMVRebReq
(
NULL
,
&
req
);
void
*
buf
=
taosMemoryMalloc
(
sizeof
(
SMsgHead
)
+
tlen
);
...
...
@@ -122,6 +123,7 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tEncodeSMqMVRebReq
(
&
abuf
,
&
req
);
taosMemoryFree
(
req
.
topic
);
*
pBuf
=
buf
;
*
pLen
=
tlen
;
...
...
@@ -129,12 +131,12 @@ static int32_t mndBuildRebalanceMsg(void **pBuf, int32_t *pLen, const SMqConsume
return
0
;
}
static
int32_t
mndPersistRebalanceMsg
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
)
{
static
int32_t
mndPersistRebalanceMsg
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
const
SMqConsumerEp
*
pConsumerEp
,
const
char
*
topicName
)
{
ASSERT
(
pConsumerEp
->
oldConsumerId
!=
-
1
);
void
*
buf
;
int32_t
tlen
;
if
(
mndBuildRebalanceMsg
(
&
buf
,
&
tlen
,
pConsumerEp
)
<
0
)
{
if
(
mndBuildRebalanceMsg
(
&
buf
,
&
tlen
,
pConsumerEp
,
topicName
)
<
0
)
{
return
-
1
;
}
...
...
@@ -502,10 +504,10 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
pConsumerEp
->
epoch
=
0
;
taosArrayPush
(
pSubConsumer
->
vgInfo
,
pConsumerEp
);
char
topic
[
TSDB_TOPIC_FNAME_LEN
];
char
cgroup
[
TSDB_CGROUP_LEN
];
mndSplitSubscribeKey
(
pSub
->
key
,
topic
,
cgroup
);
if
(
pConsumerEp
->
oldConsumerId
==
-
1
)
{
char
topic
[
TSDB_TOPIC_FNAME_LEN
];
char
cgroup
[
TSDB_CGROUP_LEN
];
mndSplitSubscribeKey
(
pSub
->
key
,
topic
,
cgroup
);
SMqTopicObj
*
pTopic
=
mndAcquireTopic
(
pMnode
,
topic
);
mInfo
(
"mq set conn: assign vgroup %d of topic %s to consumer %"
PRId64
" cgroup: %s"
,
pConsumerEp
->
vgId
,
...
...
@@ -517,7 +519,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
mInfo
(
"mq rebalance: assign vgroup %d, from consumer %"
PRId64
" to consumer %"
PRId64
""
,
pConsumerEp
->
vgId
,
pConsumerEp
->
oldConsumerId
,
pConsumerEp
->
consumerId
);
mndPersistRebalanceMsg
(
pMnode
,
pTrans
,
pConsumerEp
);
mndPersistRebalanceMsg
(
pMnode
,
pTrans
,
pConsumerEp
,
topic
);
}
}
}
...
...
@@ -849,7 +851,7 @@ static int32_t mndProcessSubscribeReq(SNodeMsg *pMsg) {
pConsumerEp
->
consumerId
);
mndPersistMqSetConnReq
(
pMnode
,
pTrans
,
pTopic
,
cgroup
,
pConsumerEp
);
}
else
{
mndPersistRebalanceMsg
(
pMnode
,
pTrans
,
pConsumerEp
);
mndPersistRebalanceMsg
(
pMnode
,
pTrans
,
pConsumerEp
,
newTopicName
);
}
// to trigger rebalance at once, do not set status active
/*atomic_store_32(&pConsumer->status, MQ_CONSUMER_STATUS__ACTIVE);*/
...
...
source/dnode/vnode/CMakeLists.txt
浏览文件 @
deab8952
...
...
@@ -68,7 +68,7 @@ target_link_libraries(
PUBLIC executor
PUBLIC scheduler
PUBLIC tdb
PUBLIC bdb
#
PUBLIC bdb
PUBLIC transport
PUBLIC stream
)
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
deab8952
...
...
@@ -316,7 +316,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
return
0
;
}
vDebug
(
"poll topic %s from consumer %ld (epoch %d)
"
,
pTopic
->
topicName
,
consumerId
,
pReq
->
epoch
);
vDebug
(
"poll topic %s from consumer %ld (epoch %d)
%s"
,
pTopic
->
topicName
,
consumerId
,
pReq
->
epoch
,
pTopic
->
topicName
);
rsp
.
reqOffset
=
pReq
->
currentOffset
;
rsp
.
skipLogNum
=
0
;
...
...
@@ -334,12 +334,10 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
// TODO: no more log, set timer to wait blocking time
// if data inserted during waiting, launch query and
// response to user
vDebug
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
);
vDebug
(
"tmq poll: consumer %ld (epoch %d) vg %d offset %ld, no more log to return"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
);
break
;
}
vDebug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
pHead
->
msgType
);
vDebug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d offset %ld msgType %d"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
pHead
->
msgType
);
/*int8_t pos = fetchOffset % TQ_BUFFER_SIZE;*/
/*pHead = pTopic->pReadhandle->pHead;*/
if
(
pHead
->
msgType
==
TDMT_VND_SUBMIT
)
{
...
...
@@ -363,8 +361,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
}
if
(
taosArrayGetSize
(
pRes
)
==
0
)
{
vDebug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
);
vDebug
(
"tmq poll: consumer %ld (epoch %d) iter log, vg %d skip log %ld since not wanted"
,
consumerId
,
pReq
->
epoch
,
pTq
->
pVnode
->
vgId
,
fetchOffset
);
fetchOffset
++
;
rsp
.
skipLogNum
++
;
taosArrayDestroy
(
pRes
);
...
...
@@ -393,8 +390,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg
->
pCont
=
buf
;
pMsg
->
contLen
=
tlen
;
pMsg
->
code
=
0
;
vDebug
(
"vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp"
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
pHead
->
msgType
,
consumerId
,
pReq
->
epoch
);
vDebug
(
"vg %d offset %ld msgType %d from consumer %ld (epoch %d) actual rsp"
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
pHead
->
msgType
,
consumerId
,
pReq
->
epoch
);
tmsgSendRsp
(
pMsg
);
taosMemoryFree
(
pHead
);
return
0
;
...
...
@@ -425,8 +421,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg, int32_t workerId) {
pMsg
->
contLen
=
tlen
;
pMsg
->
code
=
0
;
tmsgSendRsp
(
pMsg
);
vDebug
(
"vg %d offset %ld from consumer %ld (epoch %d) not rsp"
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
consumerId
,
pReq
->
epoch
);
vDebug
(
"vg %d offset %ld from consumer %ld (epoch %d) not rsp"
,
pTq
->
pVnode
->
vgId
,
fetchOffset
,
consumerId
,
pReq
->
epoch
);
/*}*/
return
0
;
...
...
@@ -437,7 +432,7 @@ int32_t tqProcessRebReq(STQ* pTq, char* msg) {
terrno
=
TSDB_CODE_SUCCESS
;
tDecodeSMqMVRebReq
(
msg
,
&
req
);
vDebug
(
"vg %d set from consumer %ld to consumer %ld"
,
req
.
vgId
,
req
.
oldConsumerId
,
req
.
newConsumerId
);
vDebug
(
"vg %d set from consumer %ld to consumer %ld"
,
req
.
vgId
,
req
.
oldConsumerId
,
req
.
newConsumerId
);
STqConsumer
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
req
.
oldConsumerId
);
ASSERT
(
pConsumer
);
ASSERT
(
pConsumer
->
consumerId
==
req
.
oldConsumerId
);
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
deab8952
...
...
@@ -167,8 +167,8 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
if
(
!
tdSTSRowIterNext
(
&
iter
,
pColData
->
info
.
colId
,
pColData
->
info
.
type
,
&
sVal
))
{
break
;
}
if
(
colDataAppend
(
pColData
,
curRow
,
sVal
.
val
,
false
)
<
0
)
{
/*if (colDataAppend(pColData, curRow, sVal.val, sVal.valType == TD_VTYPE_NULL) < 0) {*/
/*if (colDataAppend(pColData, curRow, sVal.val, false) < 0) {*/
if
(
colDataAppend
(
pColData
,
curRow
,
sVal
.
val
,
sVal
.
valType
==
TD_VTYPE_NULL
)
<
0
)
{
taosArrayDestroyEx
(
pArray
,
(
void
(
*
)(
void
*
))
tDeleteSSDataBlock
);
return
NULL
;
}
...
...
source/libs/transport/src/transCli.c
浏览文件 @
deab8952
...
...
@@ -944,8 +944,8 @@ void transSendRequest(void* shandle, const char* ip, uint32_t port, STransMsg* p
SCliThrdObj
*
thrd
=
((
SCliObj
*
)
pTransInst
->
tcphandle
)
->
pThreadObj
[
index
];
tDebug
(
"send request at thread:%d %p, dst: %s:%d
"
,
index
,
pMsg
,
ip
,
port
);
transSendAsync
(
thrd
->
asyncPool
,
&
(
cliMsg
->
q
)
);
tDebug
(
"send request at thread:%d %p, dst: %s:%d
, app:%p"
,
index
,
pMsg
,
ip
,
port
,
pMsg
->
ahandle
);
ASSERT
(
transSendAsync
(
thrd
->
asyncPool
,
&
(
cliMsg
->
q
))
==
0
);
}
void
transSendRecv
(
void
*
shandle
,
const
char
*
ip
,
uint32_t
port
,
STransMsg
*
pReq
,
STransMsg
*
pRsp
)
{
...
...
tests/test/c/tmqSim.c
浏览文件 @
deab8952
...
...
@@ -274,17 +274,11 @@ int main(int32_t argc, char *argv[]) {
loop_consume
(
tmq
);
err
=
tmq_unsubscribe
(
tmq
);
ASSERT
(
err
==
TMQ_RESP_ERR__SUCCESS
);
#if 0
err
=
tmq_unsubscribe
(
tmq
);
if
(
err
)
{
printf
(
"tmq_unsubscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
exit
(
-
1
);
}
#endif
return
0
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录