Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
60ed607c
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
60ed607c
编写于
1月 29, 2022
作者:
L
Liu Jicong
提交者:
GitHub
1月 29, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #10094 from taosdata/feature/tq
improve mq consume
上级
09c10df4
35348528
变更
11
展开全部
隐藏空白更改
内联
并排
Showing
11 changed file
with
376 addition
and
754 deletion
+376
-754
include/client/taos.h
include/client/taos.h
+22
-17
include/common/tmsg.h
include/common/tmsg.h
+5
-2
source/client/src/tmq.c
source/client/src/tmq.c
+254
-177
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+13
-5
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+2
-1
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+15
-13
source/dnode/vnode/src/inc/tqMetaStore.h
source/dnode/vnode/src/inc/tqMetaStore.h
+1
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+32
-534
source/dnode/vnode/src/tq/tqMetaStore.c
source/dnode/vnode/src/tq/tqMetaStore.c
+22
-4
source/dnode/vnode/src/vnd/vnodeWrite.c
source/dnode/vnode/src/vnd/vnodeWrite.c
+2
-0
source/libs/wal/src/walRead.c
source/libs/wal/src/walRead.c
+8
-1
未找到文件。
include/client/taos.h
浏览文件 @
60ed607c
...
...
@@ -202,34 +202,36 @@ enum tmq_resp_err_t {
typedef
enum
tmq_resp_err_t
tmq_resp_err_t
;
typedef
struct
tmq_t
tmq_t
;
typedef
struct
tmq_topic_vgroup_t
tmq_topic_vgroup_t
;
typedef
struct
tmq_t
tmq_t
;
typedef
struct
tmq_topic_vgroup_t
tmq_topic_vgroup_t
;
typedef
struct
tmq_topic_vgroup_list_t
tmq_topic_vgroup_list_t
;
typedef
struct
tmq_conf_t
tmq_conf_t
;
typedef
struct
tmq_list_t
tmq_list_t
;
typedef
struct
tmq_message_t
tmq_message_t
;
typedef
struct
tmq_conf_t
tmq_conf_t
;
typedef
struct
tmq_list_t
tmq_list_t
;
typedef
struct
tmq_message_t
tmq_message_t
;
typedef
void
(
tmq_commit_cb
(
tmq_t
*
,
tmq_resp_err_t
,
tmq_topic_vgroup_list_t
*
,
void
*
param
));
DLL_EXPORT
tmq_list_t
*
tmq_list_new
();
DLL_EXPORT
int32_t
tmq_list_append
(
tmq_list_t
*
,
char
*
);
typedef
void
(
tmq_commit_cb
(
tmq_t
*
,
tmq_resp_err_t
,
tmq_topic_vgroup_list_t
*
,
void
*
param
));
DLL_EXPORT
TAOS_RES
*
taos_create_topic
(
TAOS
*
taos
,
const
char
*
name
,
const
char
*
sql
,
int
sqlLen
);
DLL_EXPORT
tmq_t
*
tmq_consumer_new
(
void
*
conn
,
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
DLL_EXPORT
tmq_list_t
*
tmq_list_new
();
DLL_EXPORT
int32_t
tmq_list_append
(
tmq_list_t
*
,
char
*
);
DLL_EXPORT
TAOS_RES
*
tmq_create_topic
(
TAOS
*
taos
,
const
char
*
name
,
const
char
*
sql
,
int
sqlLen
);
DLL_EXPORT
tmq_t
*
tmq_consumer_new
(
void
*
conn
,
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
DLL_EXPORT
void
tmq_message_destroy
(
tmq_message_t
*
tmq_message
);
/* ------------------------TMQ CONSUMER INTERFACE------------------------ */
DLL_EXPORT
tmq_resp_err_t
tmq_subscribe
(
tmq_t
*
tmq
,
tmq_list_t
*
topic_list
);
DLL_EXPORT
tmq_resp_err_t
tmq_subscribe
(
tmq_t
*
tmq
,
tmq_list_t
*
topic_list
);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_unsubscribe(tmq_t* tmq);
DLL_EXPORT tmq_resp_err_t tmq_subscription(tmq_t* tmq, tmq_topic_vgroup_list_t** topics);
#endif
DLL_EXPORT
tmq_message_t
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
blocking_time
);
DLL_EXPORT
tmq_message_t
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
blocking_time
);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_consumer_close(tmq_t* tmq);
DLL_EXPORT tmq_resp_err_t tmq_assign(tmq_t* tmq, const tmq_topic_vgroup_list_t* vgroups);
DLL_EXPORT tmq_resp_err_t tmq_assignment(tmq_t* tmq, tmq_topic_vgroup_list_t** vgroups);
#endif
DLL_EXPORT
tmq_resp_err_t
tmq_commit
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_list_t
*
offsets
,
int32_t
async
);
DLL_EXPORT
tmq_resp_err_t
tmq_commit
(
tmq_t
*
tmq
,
const
tmq_topic_vgroup_list_t
*
offsets
,
int32_t
async
);
#if 0
DLL_EXPORT tmq_resp_err_t tmq_commit_message(tmq_t* tmq, const tmq_message_t* tmqmessage, int32_t async);
#endif
...
...
@@ -243,10 +245,13 @@ enum tmq_conf_res_t {
typedef
enum
tmq_conf_res_t
tmq_conf_res_t
;
DLL_EXPORT
tmq_conf_t
*
tmq_conf_new
();
DLL_EXPORT
void
tmq_conf_destroy
(
tmq_conf_t
*
conf
);
DLL_EXPORT
tmq_conf_res_t
tmq_conf_set
(
tmq_conf_t
*
conf
,
const
char
*
key
,
const
char
*
value
);
DLL_EXPORT
void
tmq_conf_set_offset_commit_cb
(
tmq_conf_t
*
conf
,
tmq_commit_cb
*
cb
);
DLL_EXPORT
tmq_conf_t
*
tmq_conf_new
();
DLL_EXPORT
void
tmq_conf_destroy
(
tmq_conf_t
*
conf
);
DLL_EXPORT
tmq_conf_res_t
tmq_conf_set
(
tmq_conf_t
*
conf
,
const
char
*
key
,
const
char
*
value
);
DLL_EXPORT
void
tmq_conf_set_offset_commit_cb
(
tmq_conf_t
*
conf
,
tmq_commit_cb
*
cb
);
//temporary used function for demo only
void
tmqShowMsg
(
tmq_message_t
*
tmq_message
);
#ifdef __cplusplus
}
...
...
include/common/tmsg.h
浏览文件 @
60ed607c
...
...
@@ -1578,6 +1578,7 @@ static FORCE_INLINE void* taosDecodeSMqMsg(void* buf, SMqHbMsg* pMsg) {
}
typedef
struct
SMqSetCVgReq
{
int64_t
leftForVer
;
int32_t
vgId
;
int64_t
oldConsumerId
;
int64_t
newConsumerId
;
...
...
@@ -1612,6 +1613,7 @@ static FORCE_INLINE void* tDecodeSSubQueryMsg(void* buf, SSubQueryMsg* pMsg) {
static
FORCE_INLINE
int32_t
tEncodeSMqSetCVgReq
(
void
**
buf
,
const
SMqSetCVgReq
*
pReq
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
leftForVer
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pReq
->
vgId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
oldConsumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pReq
->
newConsumerId
);
...
...
@@ -1620,12 +1622,13 @@ static FORCE_INLINE int32_t tEncodeSMqSetCVgReq(void** buf, const SMqSetCVgReq*
tlen
+=
taosEncodeString
(
buf
,
pReq
->
sql
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
logicalPlan
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
physicalPlan
);
tlen
+=
taosEncodeString
(
buf
,
(
char
*
)
pReq
->
qmsg
);
tlen
+=
taosEncodeString
(
buf
,
pReq
->
qmsg
);
//tlen += tEncodeSSubQueryMsg(buf, &pReq->msg);
return
tlen
;
}
static
FORCE_INLINE
void
*
tDecodeSMqSetCVgReq
(
void
*
buf
,
SMqSetCVgReq
*
pReq
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
leftForVer
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pReq
->
vgId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
oldConsumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pReq
->
newConsumerId
);
...
...
@@ -1634,7 +1637,7 @@ static FORCE_INLINE void* tDecodeSMqSetCVgReq(void* buf, SMqSetCVgReq* pReq) {
buf
=
taosDecodeString
(
buf
,
&
pReq
->
sql
);
buf
=
taosDecodeString
(
buf
,
&
pReq
->
logicalPlan
);
buf
=
taosDecodeString
(
buf
,
&
pReq
->
physicalPlan
);
buf
=
taosDecodeString
(
buf
,
(
char
**
)
&
pReq
->
qmsg
);
buf
=
taosDecodeString
(
buf
,
&
pReq
->
qmsg
);
//buf = tDecodeSSubQueryMsg(buf, &pReq->msg);
return
buf
;
}
...
...
source/client/src/tmq.c
浏览文件 @
60ed607c
此差异已折叠。
点击以展开。
source/client/test/clientTests.cpp
浏览文件 @
60ed607c
...
...
@@ -583,7 +583,7 @@ TEST(testCase, create_topic_ctb_Test) {
taos_free_result
(
pRes
);
char
*
sql
=
"select * from tu"
;
pRes
=
t
aos
_create_topic
(
pConn
,
"test_ctb_topic_1"
,
sql
,
strlen
(
sql
));
pRes
=
t
mq
_create_topic
(
pConn
,
"test_ctb_topic_1"
,
sql
,
strlen
(
sql
));
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
...
...
@@ -607,7 +607,7 @@ TEST(testCase, create_topic_stb_Test) {
taos_free_result
(
pRes
);
char
*
sql
=
"select * from st1"
;
pRes
=
t
aos
_create_topic
(
pConn
,
"test_stb_topic_1"
,
sql
,
strlen
(
sql
));
pRes
=
t
mq
_create_topic
(
pConn
,
"test_stb_topic_1"
,
sql
,
strlen
(
sql
));
taos_free_result
(
pRes
);
taos_close
(
pConn
);
}
...
...
@@ -633,11 +633,11 @@ TEST(testCase, tmq_subscribe_ctb_Test) {
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
tmq_message_destroy(msg);
//printf("get msg\n");
//if (msg == NULL) break;
}
}
#endif
TEST(testCase, tmq_subscribe_stb_Test) {
TAOS* pConn = taos_connect("localhost", "root", "taosdata", NULL, 0);
...
...
@@ -656,11 +656,18 @@ TEST(testCase, tmq_subscribe_stb_Test) {
tmq_list_t* topic_list = tmq_list_new();
tmq_list_append(topic_list, "test_stb_topic_1");
tmq_subscribe(tmq, topic_list);
int cnt = 1;
while (1) {
tmq_message_t* msg = tmq_consumer_poll(tmq, 1000);
if (msg == NULL) continue;
tmqShowMsg(msg);
if (cnt++ % 10 == 0){
tmq_commit(tmq, NULL, 0);
}
//tmq_commit(tmq, NULL, 0);
tmq_message_destroy(msg);
//printf("get msg\n");
//if (msg == NULL) break;
}
}
...
...
@@ -669,6 +676,7 @@ TEST(testCase, tmq_consume_Test) {
TEST(testCase, tmq_commit_TEST) {
}
#endif
#if 0
TEST(testCase, projection_query_tables) {
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
60ed607c
...
...
@@ -633,7 +633,7 @@ typedef struct SMqConsumerTopic {
}
SMqConsumerTopic
;
static
FORCE_INLINE
SMqConsumerTopic
*
tNewConsumerTopic
(
int64_t
consumerId
,
SMqTopicObj
*
pTopic
,
SMqSubscribeObj
*
pSub
)
{
SMqSubscribeObj
*
pSub
,
int64_t
*
oldConsumerId
)
{
SMqConsumerTopic
*
pCTopic
=
malloc
(
sizeof
(
SMqConsumerTopic
));
if
(
pCTopic
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
...
...
@@ -646,6 +646,7 @@ static FORCE_INLINE SMqConsumerTopic* tNewConsumerTopic(int64_t consumerId, SMqT
int32_t
unassignedVgSz
=
taosArrayGetSize
(
pSub
->
unassignedVg
);
if
(
unassignedVgSz
>
0
)
{
SMqConsumerEp
*
pCEp
=
taosArrayPop
(
pSub
->
unassignedVg
);
*
oldConsumerId
=
pCEp
->
consumerId
;
pCEp
->
consumerId
=
consumerId
;
taosArrayPush
(
pCTopic
->
pVgInfo
,
&
pCEp
->
vgId
);
taosArrayPush
(
pSub
->
assigned
,
pCEp
);
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
60ed607c
...
...
@@ -48,7 +48,8 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg);
static
int32_t
mndProcessGetSubEpReq
(
SMnodeMsg
*
pMsg
);
static
int
mndBuildMqSetConsumerVgReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMqConsumerObj
*
pConsumer
,
SMqConsumerTopic
*
pConsumerTopic
,
SMqTopicObj
*
pTopic
,
SMqConsumerEp
*
pSub
);
SMqConsumerTopic
*
pConsumerTopic
,
SMqTopicObj
*
pTopic
,
SMqConsumerEp
*
pSub
,
int64_t
oldConsumerId
);
int32_t
mndInitSubscribe
(
SMnode
*
pMnode
)
{
SSdbTable
table
=
{.
sdbType
=
SDB_SUBSCRIBE
,
...
...
@@ -166,7 +167,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
if
(
pCEp
->
lastConsumerHbTs
!=
-
1
&&
currentTs
-
pCEp
->
lastConsumerHbTs
>
MND_SUBSCRIBE_REBALANCE_MS
)
{
// put consumer into lostConsumer
taosArrayPush
(
pSub
->
lostConsumer
,
pCEp
);
// put vg into unass
gi
ned
// put vg into unass
ig
ned
taosArrayPush
(
pSub
->
unassignedVg
,
pCEp
);
// remove from assigned
// TODO: swap with last one, reduce size and reset i
...
...
@@ -202,6 +203,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int64_t
consumerId
=
*
(
int64_t
*
)
taosArrayGet
(
pSub
->
availConsumer
,
pSub
->
nextConsumerIdx
);
SMqConsumerEp
*
pCEp
=
taosArrayPop
(
pSub
->
unassignedVg
);
int64_t
oldConsumerId
=
pCEp
->
consumerId
;
pCEp
->
consumerId
=
consumerId
;
taosArrayPush
(
pSub
->
assigned
,
pCEp
);
pSub
->
nextConsumerIdx
=
(
pSub
->
nextConsumerIdx
+
1
)
%
taosArrayGetSize
(
pSub
->
availConsumer
);
...
...
@@ -222,7 +224,9 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
req
.
logicalPlan
=
pTopic
->
logicalPlan
;
req
.
physicalPlan
=
pTopic
->
physicalPlan
;
req
.
qmsg
=
pCEp
->
qmsg
;
req
.
oldConsumerId
=
oldConsumerId
;
req
.
newConsumerId
=
consumerId
;
req
.
vgId
=
pCEp
->
vgId
;
int32_t
tlen
=
tEncodeSMqSetCVgReq
(
NULL
,
&
req
);
void
*
buf
=
malloc
(
sizeof
(
SMsgHead
)
+
tlen
);
if
(
buf
==
NULL
)
{
...
...
@@ -237,6 +241,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
tEncodeSMqSetCVgReq
(
&
abuf
,
&
req
);
// persist msg
// TODO: no need for txn
STransAction
action
=
{
0
};
action
.
epSet
=
pCEp
->
epSet
;
action
.
pCont
=
buf
;
...
...
@@ -303,13 +308,12 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
ASSERT
(
CEp
.
vgId
==
pVgroup
->
vgId
);
CEp
.
qmsg
=
strdup
(
pTaskInfo
->
msg
->
msg
);
taosArrayPush
(
unassignedVg
,
&
CEp
);
//TODO: free taskInfo
//
TODO: free taskInfo
taosArrayDestroy
(
pArray
);
/*SEpSet *pEpSet = &plan->execNode.epset;*/
/*pEpSet->inUse = 0;*/
/*addEpIntoEpSet(pEpSet, "localhost", 6030);*/
}
/*qDestroyQueryDag(pDag);*/
...
...
@@ -317,14 +321,15 @@ static int mndInitUnassignedVg(SMnode *pMnode, SMqTopicObj *pTopic, SArray *unas
}
static
int
mndBuildMqSetConsumerVgReq
(
SMnode
*
pMnode
,
STrans
*
pTrans
,
SMqConsumerObj
*
pConsumer
,
SMqConsumerTopic
*
pConsumerTopic
,
SMqTopicObj
*
pTopic
,
SMqConsumerEp
*
pCEp
)
{
SMqConsumerTopic
*
pConsumerTopic
,
SMqTopicObj
*
pTopic
,
SMqConsumerEp
*
pCEp
,
int64_t
oldConsumerId
)
{
int32_t
sz
=
taosArrayGetSize
(
pConsumerTopic
->
pVgInfo
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int32_t
vgId
=
*
(
int32_t
*
)
taosArrayGet
(
pConsumerTopic
->
pVgInfo
,
i
);
SVgObj
*
pVgObj
=
mndAcquireVgroup
(
pMnode
,
vgId
);
SMqSetCVgReq
req
=
{
.
vgId
=
vgId
,
.
oldConsumerId
=
-
1
,
.
oldConsumerId
=
oldConsumerId
,
.
newConsumerId
=
pConsumer
->
consumerId
,
};
strcpy
(
req
.
cgroup
,
pConsumer
->
cgroup
);
...
...
@@ -459,10 +464,6 @@ static int32_t mndSubActionUpdate(SSdb *pSdb, SMqSubscribeObj *pOldSub, SMqSubsc
return
0
;
}
static
void
*
mndBuildMqVGroupSetReq
(
SMnode
*
pMnode
,
char
*
topicName
,
int32_t
vgId
,
int64_t
consumerId
,
char
*
cgroup
)
{
return
0
;
}
static
char
*
mndMakeSubscribeKey
(
char
*
cgroup
,
char
*
topicName
)
{
char
*
key
=
malloc
(
TSDB_SHOW_SUBQUERY_LEN
);
if
(
key
==
NULL
)
{
...
...
@@ -642,7 +643,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
free
(
key
);
// set unassigned vg
if
(
mndInitUnassignedVg
(
pMnode
,
pTopic
,
pSub
->
unassignedVg
)
<
0
)
{
//TODO: free memory
//
TODO: free memory
return
-
1
;
}
// TODO: disable alter
...
...
@@ -650,7 +651,8 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
}
taosArrayPush
(
pSub
->
availConsumer
,
&
consumerId
);
SMqConsumerTopic
*
pConsumerTopic
=
tNewConsumerTopic
(
consumerId
,
pTopic
,
pSub
);
int64_t
oldConsumerId
;
SMqConsumerTopic
*
pConsumerTopic
=
tNewConsumerTopic
(
consumerId
,
pTopic
,
pSub
,
&
oldConsumerId
);
taosArrayPush
(
pConsumer
->
topics
,
pConsumerTopic
);
if
(
taosArrayGetSize
(
pConsumerTopic
->
pVgInfo
)
>
0
)
{
...
...
@@ -658,7 +660,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
int32_t
vgId
=
*
(
int32_t
*
)
taosArrayGetLast
(
pConsumerTopic
->
pVgInfo
);
SMqConsumerEp
*
pCEp
=
taosArrayGetLast
(
pSub
->
assigned
);
if
(
pCEp
->
vgId
==
vgId
)
{
if
(
mndBuildMqSetConsumerVgReq
(
pMnode
,
pTrans
,
pConsumer
,
pConsumerTopic
,
pTopic
,
pCEp
)
<
0
)
{
if
(
mndBuildMqSetConsumerVgReq
(
pMnode
,
pTrans
,
pConsumer
,
pConsumerTopic
,
pTopic
,
pCEp
,
oldConsumerId
)
<
0
)
{
// TODO
return
-
1
;
}
...
...
source/dnode/vnode/src/inc/tqMetaStore.h
浏览文件 @
60ed607c
...
...
@@ -40,6 +40,7 @@ int32_t tqHandleCopyPut(STqMetaStore*, int64_t key, void* value, size_t vsize);
// delete committed kv pair
// notice that a delete action still needs to be committed
int32_t
tqHandleDel
(
STqMetaStore
*
,
int64_t
key
);
int32_t
tqHandlePurge
(
STqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleCommit
(
STqMetaStore
*
,
int64_t
key
);
int32_t
tqHandleAbort
(
STqMetaStore
*
,
int64_t
key
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
60ed607c
...
...
@@ -25,17 +25,6 @@
// handle management message
//
int
tqGroupSSize
(
const
STqGroup
*
pGroup
);
int
tqTopicSSize
();
int
tqItemSSize
();
void
*
tqSerializeListHandle
(
STqList
*
listHandle
,
void
*
ptr
);
void
*
tqSerializeTopic
(
STqTopic
*
pTopic
,
void
*
ptr
);
void
*
tqSerializeItem
(
STqMsgItem
*
pItem
,
void
*
ptr
);
const
void
*
tqDeserializeTopic
(
const
void
*
pBytes
,
STqTopic
*
pTopic
);
const
void
*
tqDeserializeItem
(
const
void
*
pBytes
,
STqMsgItem
*
pItem
);
int
tqInit
()
{
int8_t
old
=
atomic_val_compare_exchange_8
(
&
tqMgmt
.
inited
,
0
,
1
);
if
(
old
==
1
)
return
0
;
...
...
@@ -88,177 +77,6 @@ void tqClose(STQ* pTq) {
// TODO
}
static
int
tqProtoCheck
(
STqMsgHead
*
pMsg
)
{
// TODO
return
pMsg
->
protoVer
==
0
;
}
static
int
tqAckOneTopic
(
STqTopic
*
pTopic
,
STqOneAck
*
pAck
,
STqQueryMsg
**
ppQuery
)
{
// clean old item and move forward
int32_t
consumeOffset
=
pAck
->
consumeOffset
;
int
idx
=
consumeOffset
%
TQ_BUFFER_SIZE
;
ASSERT
(
pTopic
->
buffer
[
idx
].
content
&&
pTopic
->
buffer
[
idx
].
executor
);
tfree
(
pTopic
->
buffer
[
idx
].
content
);
if
(
1
/* TODO: need to launch new query */
)
{
STqQueryMsg
*
pNewQuery
=
malloc
(
sizeof
(
STqQueryMsg
));
if
(
pNewQuery
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
// TODO: lock executor
// TODO: read from wal and assign to src
/*pNewQuery->exec->executor = pTopic->buffer[idx].executor;*/
/*pNewQuery->exec->src = 0;*/
/*pNewQuery->exec->dest = &pTopic->buffer[idx];*/
/*pNewQuery->next = *ppQuery;*/
/**ppQuery = pNewQuery;*/
}
return
0
;
}
static
int
tqAck
(
STqGroup
*
pGroup
,
STqAcks
*
pAcks
)
{
int32_t
ackNum
=
pAcks
->
ackNum
;
STqOneAck
*
acks
=
pAcks
->
acks
;
// double ptr for acks and list
int
i
=
0
;
STqList
*
node
=
pGroup
->
head
;
int
ackCnt
=
0
;
STqQueryMsg
*
pQuery
=
NULL
;
while
(
i
<
ackNum
&&
node
->
next
)
{
if
(
acks
[
i
].
topicId
==
node
->
next
->
topic
.
topicId
)
{
ackCnt
++
;
tqAckOneTopic
(
&
node
->
next
->
topic
,
&
acks
[
i
],
&
pQuery
);
}
else
if
(
acks
[
i
].
topicId
<
node
->
next
->
topic
.
topicId
)
{
i
++
;
}
else
{
node
=
node
->
next
;
}
}
if
(
pQuery
)
{
// post message
}
return
ackCnt
;
}
static
int
tqCommitGroup
(
STqGroup
*
pGroup
)
{
// persist modification into disk
return
0
;
}
int
tqCreateGroup
(
STQ
*
pTq
,
int64_t
topicId
,
int64_t
cgId
,
int64_t
cId
,
STqGroup
**
ppGroup
)
{
// create in disk
STqGroup
*
pGroup
=
(
STqGroup
*
)
malloc
(
sizeof
(
STqGroup
));
if
(
pGroup
==
NULL
)
{
// TODO
return
-
1
;
}
*
ppGroup
=
pGroup
;
memset
(
pGroup
,
0
,
sizeof
(
STqGroup
));
pGroup
->
topicList
=
tdListNew
(
sizeof
(
STqTopic
));
if
(
pGroup
->
topicList
==
NULL
)
{
free
(
pGroup
);
return
-
1
;
}
*
ppGroup
=
pGroup
;
return
0
;
}
STqGroup
*
tqOpenGroup
(
STQ
*
pTq
,
int64_t
topicId
,
int64_t
cgId
,
int64_t
cId
)
{
STqGroup
*
pGroup
=
tqHandleGet
(
pTq
->
tqMeta
,
cId
);
if
(
pGroup
==
NULL
)
{
int
code
=
tqCreateGroup
(
pTq
,
topicId
,
cgId
,
cId
,
&
pGroup
);
if
(
code
<
0
)
{
// TODO
return
NULL
;
}
tqHandleMovePut
(
pTq
->
tqMeta
,
cId
,
pGroup
);
}
ASSERT
(
pGroup
);
return
pGroup
;
}
int
tqCloseGroup
(
STQ
*
pTq
,
int64_t
topicId
,
int64_t
cgId
,
int64_t
cId
)
{
// TODO
return
0
;
}
int
tqDropGroup
(
STQ
*
pTq
,
int64_t
topicId
,
int64_t
cgId
,
int64_t
cId
)
{
// delete from disk
return
0
;
}
static
int
tqFetch
(
STqGroup
*
pGroup
,
STqConsumeRsp
**
pRsp
)
{
STqList
*
pHead
=
pGroup
->
head
;
STqList
*
pNode
=
pHead
;
int
totSize
=
0
;
int
numOfMsgs
=
0
;
// TODO: make it a macro
int
sizeLimit
=
4
*
1024
;
void
*
ptr
=
realloc
(
*
pRsp
,
sizeof
(
STqConsumeRsp
)
+
sizeLimit
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
*
pRsp
=
ptr
;
STqMsgContent
*
buffer
=
(
*
pRsp
)
->
msgs
;
// iterate the list to get msgs of all topics
// until all topic iterated or msgs over sizeLimit
while
(
pNode
->
next
)
{
pNode
=
pNode
->
next
;
STqTopic
*
pTopic
=
&
pNode
->
topic
;
int
idx
=
pTopic
->
nextConsumeOffset
%
TQ_BUFFER_SIZE
;
if
(
pTopic
->
buffer
[
idx
].
content
!=
NULL
&&
pTopic
->
buffer
[
idx
].
offset
==
pTopic
->
nextConsumeOffset
)
{
totSize
+=
pTopic
->
buffer
[
idx
].
size
;
if
(
totSize
>
sizeLimit
)
{
void
*
ptr
=
realloc
(
*
pRsp
,
sizeof
(
STqConsumeRsp
)
+
totSize
);
if
(
ptr
==
NULL
)
{
totSize
-=
pTopic
->
buffer
[
idx
].
size
;
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
// return msgs already copied
break
;
}
*
pRsp
=
ptr
;
break
;
}
*
((
int64_t
*
)
buffer
)
=
pTopic
->
topicId
;
buffer
=
POINTER_SHIFT
(
buffer
,
sizeof
(
int64_t
));
*
((
int64_t
*
)
buffer
)
=
pTopic
->
buffer
[
idx
].
size
;
buffer
=
POINTER_SHIFT
(
buffer
,
sizeof
(
int64_t
));
memcpy
(
buffer
,
pTopic
->
buffer
[
idx
].
content
,
pTopic
->
buffer
[
idx
].
size
);
buffer
=
POINTER_SHIFT
(
buffer
,
pTopic
->
buffer
[
idx
].
size
);
numOfMsgs
++
;
if
(
totSize
>
sizeLimit
)
{
break
;
}
}
}
(
*
pRsp
)
->
bodySize
=
totSize
;
return
numOfMsgs
;
}
STqGroup
*
tqGetGroup
(
STQ
*
pTq
,
int64_t
clientId
)
{
return
tqHandleGet
(
pTq
->
tqMeta
,
clientId
);
}
int
tqSendLaunchQuery
(
STqMsgItem
*
bufItem
,
int64_t
offset
)
{
if
(
tqQueryExecuting
(
bufItem
->
status
))
{
return
0
;
}
bufItem
->
status
=
1
;
// load data from wal or buffer pool
// put into exec
// send exec into non blocking queue
// when query finished, put into buffer pool
return
0
;
}
/*int tqMoveOffsetToNext(TqGroupHandle* gHandle) {*/
/*return 0;*/
/*}*/
int
tqPushMsg
(
STQ
*
pTq
,
void
*
p
,
int64_t
version
)
{
// add reference
// judge and launch new query
...
...
@@ -270,218 +88,6 @@ int tqCommit(STQ* pTq) {
return
0
;
}
int
tqBufferSetOffset
(
STqTopic
*
pTopic
,
int64_t
offset
)
{
int
code
;
memset
(
pTopic
->
buffer
,
0
,
sizeof
(
pTopic
->
buffer
));
// launch query
for
(
int
i
=
offset
;
i
<
offset
+
TQ_BUFFER_SIZE
;
i
++
)
{
int
pos
=
i
%
TQ_BUFFER_SIZE
;
code
=
tqSendLaunchQuery
(
&
pTopic
->
buffer
[
pos
],
offset
);
if
(
code
<
0
)
{
// TODO: error handling
}
}
// set offset
pTopic
->
nextConsumeOffset
=
offset
;
pTopic
->
floatingCursor
=
offset
;
return
0
;
}
STqTopic
*
tqFindTopic
(
STqGroup
*
pGroup
,
int64_t
topicId
)
{
// TODO
return
NULL
;
}
int
tqSetCursor
(
STQ
*
pTq
,
STqSetCurReq
*
pMsg
)
{
int
code
;
int64_t
clientId
=
pMsg
->
head
.
clientId
;
int64_t
topicId
=
pMsg
->
topicId
;
int64_t
offset
=
pMsg
->
offset
;
STqGroup
*
gHandle
=
tqGetGroup
(
pTq
,
clientId
);
if
(
gHandle
==
NULL
)
{
// client not connect
return
-
1
;
}
STqTopic
*
topicHandle
=
tqFindTopic
(
gHandle
,
topicId
);
if
(
topicHandle
==
NULL
)
{
return
-
1
;
}
if
(
pMsg
->
offset
==
topicHandle
->
nextConsumeOffset
)
{
return
0
;
}
// TODO: check log last version
code
=
tqBufferSetOffset
(
topicHandle
,
offset
);
if
(
code
<
0
)
{
// set error code
return
-
1
;
}
return
0
;
}
// temporary
int
tqProcessCMsg
(
STQ
*
pTq
,
STqConsumeReq
*
pMsg
,
STqRspHandle
*
pRsp
)
{
int64_t
clientId
=
pMsg
->
head
.
clientId
;
STqGroup
*
pGroup
=
tqGetGroup
(
pTq
,
clientId
);
if
(
pGroup
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_GROUP_NOT_SET
;
return
-
1
;
}
pGroup
->
rspHandle
.
handle
=
pRsp
->
handle
;
pGroup
->
rspHandle
.
ahandle
=
pRsp
->
ahandle
;
return
0
;
}
int
tqConsume
(
STQ
*
pTq
,
SRpcMsg
*
pReq
,
SRpcMsg
**
pRsp
)
{
STqConsumeReq
*
pMsg
=
pReq
->
pCont
;
int64_t
clientId
=
pMsg
->
head
.
clientId
;
STqGroup
*
pGroup
=
tqGetGroup
(
pTq
,
clientId
);
if
(
pGroup
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_GROUP_NOT_SET
;
return
-
1
;
}
SList
*
topicList
=
pGroup
->
topicList
;
int
totSize
=
0
;
int
numOfMsgs
=
0
;
int
sizeLimit
=
4096
;
STqConsumeRsp
*
pCsmRsp
=
(
*
pRsp
)
->
pCont
;
void
*
ptr
=
realloc
((
*
pRsp
)
->
pCont
,
sizeof
(
STqConsumeRsp
)
+
sizeLimit
);
if
(
ptr
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
(
*
pRsp
)
->
pCont
=
ptr
;
SListIter
iter
;
tdListInitIter
(
topicList
,
&
iter
,
TD_LIST_FORWARD
);
STqMsgContent
*
buffer
=
NULL
;
SArray
*
pArray
=
taosArrayInit
(
0
,
sizeof
(
void
*
));
SListNode
*
pn
;
while
((
pn
=
tdListNext
(
&
iter
))
!=
NULL
)
{
STqTopic
*
pTopic
=
*
(
STqTopic
**
)
pn
->
data
;
int
idx
=
pTopic
->
floatingCursor
%
TQ_BUFFER_SIZE
;
STqMsgItem
*
pItem
=
&
pTopic
->
buffer
[
idx
];
if
(
pItem
->
content
!=
NULL
&&
pItem
->
offset
==
pTopic
->
floatingCursor
)
{
if
(
pItem
->
status
==
TQ_ITEM_READY
)
{
// if has data
totSize
+=
pTopic
->
buffer
[
idx
].
size
;
if
(
totSize
>
sizeLimit
)
{
void
*
ptr
=
realloc
((
*
pRsp
)
->
pCont
,
sizeof
(
STqConsumeRsp
)
+
totSize
);
if
(
ptr
==
NULL
)
{
totSize
-=
pTopic
->
buffer
[
idx
].
size
;
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
// return msgs already copied
break
;
}
(
*
pRsp
)
->
pCont
=
ptr
;
break
;
}
*
((
int64_t
*
)
buffer
)
=
htobe64
(
pTopic
->
topicId
);
buffer
=
POINTER_SHIFT
(
buffer
,
sizeof
(
int64_t
));
*
((
int64_t
*
)
buffer
)
=
htobe64
(
pTopic
->
buffer
[
idx
].
size
);
buffer
=
POINTER_SHIFT
(
buffer
,
sizeof
(
int64_t
));
memcpy
(
buffer
,
pTopic
->
buffer
[
idx
].
content
,
pTopic
->
buffer
[
idx
].
size
);
buffer
=
POINTER_SHIFT
(
buffer
,
pTopic
->
buffer
[
idx
].
size
);
numOfMsgs
++
;
if
(
totSize
>
sizeLimit
)
{
break
;
}
}
else
if
(
pItem
->
status
==
TQ_ITEM_PROCESS
)
{
// if not have data but in process
}
else
if
(
pItem
->
status
==
TQ_ITEM_EMPTY
)
{
// if not have data and not in process
int32_t
old
=
atomic_val_compare_exchange_32
(
&
pItem
->
status
,
TQ_ITEM_EMPTY
,
TQ_ITEM_PROCESS
);
if
(
old
!=
TQ_ITEM_EMPTY
)
{
continue
;
}
pItem
->
offset
=
pTopic
->
floatingCursor
;
taosArrayPush
(
pArray
,
&
pItem
);
}
else
{
ASSERT
(
0
);
}
}
}
if
(
numOfMsgs
>
0
)
{
// set code and other msg
rpcSendResponse
(
*
pRsp
);
}
else
{
// most recent data has been fetched
// enable timer for blocking wait
// once new data written when waiting, launch query and rsp
}
// fetched a num of msgs, rpc response
for
(
int
i
=
0
;
i
<
pArray
->
size
;
i
++
)
{
STqMsgItem
*
pItem
=
taosArrayGet
(
pArray
,
i
);
// read from wal
void
*
raw
=
NULL
;
/*int code = pTq->tqLogReader->logRead(, &raw, pItem->offset);*/
/*int code = pTq->tqLogHandle->logRead(pItem->pTopic->logReader, &raw, pItem->offset);*/
/*if (code < 0) {*/
// TODO: error
/*}*/
// get msgType
// if submitblk
pItem
->
executor
->
assign
(
pItem
->
executor
->
runtimeEnv
,
raw
);
SSDataBlock
*
content
=
pItem
->
executor
->
exec
(
pItem
->
executor
->
runtimeEnv
);
pItem
->
content
=
content
;
// if other type, send just put into buffer
/*pItem->content = raw;*/
int32_t
old
=
atomic_val_compare_exchange_32
(
&
pItem
->
status
,
TQ_ITEM_PROCESS
,
TQ_ITEM_READY
);
ASSERT
(
old
==
TQ_ITEM_PROCESS
);
}
taosArrayDestroy
(
pArray
);
return
0
;
}
#if 0
int tqConsume(STQ* pTq, STqConsumeReq* pMsg) {
if (!tqProtoCheck((STqMsgHead*)pMsg)) {
// proto version invalid
return -1;
}
int64_t clientId = pMsg->head.clientId;
STqGroup* pGroup = tqGetGroup(pTq, clientId);
if (pGroup == NULL) {
// client not connect
return -1;
}
if (pMsg->acks.ackNum != 0) {
if (tqAck(pGroup, &pMsg->acks) != 0) {
// ack not success
return -1;
}
}
STqConsumeRsp* pRsp = (STqConsumeRsp*)pMsg;
if (tqFetch(pGroup, (void**)&pRsp->msgs) <= 0) {
// fetch error
return -1;
}
// judge and launch new query
/*if (tqSendLaunchQuery(gHandle)) {*/
// launch query error
/*return -1;*/
/*}*/
return 0;
}
#endif
int
tqSerializeConsumer
(
const
STqConsumerHandle
*
pConsumer
,
STqSerializedHead
**
ppHead
)
{
int32_t
num
=
taosArrayGetSize
(
pConsumer
->
topics
);
int32_t
sz
=
sizeof
(
STqSerializedHead
)
+
sizeof
(
int64_t
)
*
2
+
TSDB_TOPIC_FNAME_LEN
+
...
...
@@ -535,138 +141,6 @@ const void* tqDeserializeConsumer(const STqSerializedHead* pHead, STqConsumerHan
return
NULL
;
}
#if 0
int tqSerializeGroup(const STqGroup* pGroup, STqSerializedHead** ppHead) {
// calculate size
int sz = tqGroupSSize(pGroup) + sizeof(STqSerializedHead);
if (sz > (*ppHead)->ssize) {
void* tmpPtr = realloc(*ppHead, sz);
if (tmpPtr == NULL) {
free(*ppHead);
// TODO: memory err
return -1;
}
*ppHead = tmpPtr;
(*ppHead)->ssize = sz;
}
void* ptr = (*ppHead)->content;
// do serialization
*(int64_t*)ptr = pGroup->clientId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = pGroup->cgId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int32_t*)ptr = pGroup->topicNum;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
if (pGroup->topicNum > 0) {
tqSerializeListHandle(pGroup->head, ptr);
}
return 0;
}
void* tqSerializeListHandle(STqList* listHandle, void* ptr) {
STqList* node = listHandle;
ASSERT(node != NULL);
while (node) {
ptr = tqSerializeTopic(&node->topic, ptr);
node = node->next;
}
return ptr;
}
void* tqSerializeTopic(STqTopic* pTopic, void* ptr) {
*(int64_t*)ptr = pTopic->nextConsumeOffset;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
*(int64_t*)ptr = pTopic->topicId;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
/**(int32_t*)ptr = pTopic->head;*/
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
/**(int32_t*)ptr = pTopic->tail;*/
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
ptr = tqSerializeItem(&pTopic->buffer[i], ptr);
}
return ptr;
}
void* tqSerializeItem(STqMsgItem* bufItem, void* ptr) {
// TODO: do we need serialize this?
// mainly for executor
return ptr;
}
const void* tqDeserializeGroup(const STqSerializedHead* pHead, STqGroup** ppGroup) {
STqGroup* gHandle = *ppGroup;
const void* ptr = pHead->content;
gHandle->clientId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
gHandle->cgId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
gHandle->ahandle = NULL;
gHandle->topicNum = *(int32_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int32_t));
gHandle->head = NULL;
STqList* node = gHandle->head;
for (int i = 0; i < gHandle->topicNum; i++) {
if (gHandle->head == NULL) {
if ((node = malloc(sizeof(STqList))) == NULL) {
// TODO: error
return NULL;
}
node->next = NULL;
ptr = tqDeserializeTopic(ptr, &node->topic);
gHandle->head = node;
} else {
node->next = malloc(sizeof(STqList));
if (node->next == NULL) {
// TODO: error
return NULL;
}
node->next->next = NULL;
ptr = tqDeserializeTopic(ptr, &node->next->topic);
node = node->next;
}
}
return ptr;
}
const void* tqDeserializeTopic(const void* pBytes, STqTopic* topic) {
const void* ptr = pBytes;
topic->nextConsumeOffset = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
topic->topicId = *(int64_t*)ptr;
ptr = POINTER_SHIFT(ptr, sizeof(int64_t));
/*topic->head = *(int32_t*)ptr;*/
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
/*topic->tail = *(int32_t*)ptr;*/
/*ptr = POINTER_SHIFT(ptr, sizeof(int32_t));*/
for (int i = 0; i < TQ_BUFFER_SIZE; i++) {
ptr = tqDeserializeItem(ptr, &topic->buffer[i]);
}
return ptr;
}
const void* tqDeserializeItem(const void* pBytes, STqMsgItem* bufItem) { return pBytes; }
// TODO: make this a macro
int tqGroupSSize(const STqGroup* gHandle) {
return sizeof(int64_t) * 2 // cId + cgId
+ sizeof(int32_t) // topicNum
+ gHandle->topicNum * tqTopicSSize();
}
// TODO: make this a macro
int tqTopicSSize() {
return sizeof(int64_t) * 2 // nextConsumeOffset + topicId
+ sizeof(int32_t) * 2 // head + tail
+ TQ_BUFFER_SIZE * tqItemSSize();
}
int tqItemSSize() {
// TODO: do this need serialization?
// mainly for executor
return 0;
}
#endif
int32_t
tqProcessConsumeReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
SMqConsumeReq
*
pReq
=
pMsg
->
pCont
;
...
...
@@ -675,7 +149,6 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
int64_t
fetchOffset
=
pReq
->
offset
;
int64_t
blockingTime
=
pReq
->
blockingTime
;
int
rspLen
=
0
;
SMqConsumeRsp
rsp
=
{.
consumerId
=
consumerId
,
.
numOfTopics
=
0
,
.
pBlockData
=
NULL
};
STqConsumerHandle
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
consumerId
);
...
...
@@ -694,11 +167,25 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
if
(
strcmp
(
pTopic
->
topicName
,
pReq
->
topic
)
!=
0
)
{
continue
;
}
if
(
pReq
->
reqType
==
TMQ_REQ_TYPE_COMMIT_ONLY
)
{
pTopic
->
committedOffset
=
pReq
->
offset
;
pMsg
->
pCont
=
NULL
;
pMsg
->
contLen
=
0
;
pMsg
->
code
=
0
;
rpcSendResponse
(
pMsg
);
return
0
;
}
if
(
pReq
->
reqType
==
TMQ_REQ_TYPE_CONSUME_AND_COMMIT
)
{
pTopic
->
committedOffset
=
pReq
->
offset
-
1
;
}
rsp
.
committedOffset
=
pTopic
->
committedOffset
;
rsp
.
reqOffset
=
pReq
->
offset
;
rsp
.
skipLogNum
=
0
;
if
(
fetchOffset
==
-
1
)
{
if
(
fetchOffset
<=
pTopic
->
committedOffset
)
{
fetchOffset
=
pTopic
->
committedOffset
+
1
;
}
int8_t
pos
;
...
...
@@ -802,11 +289,22 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
}
int32_t
tqProcessSetConnReq
(
STQ
*
pTq
,
char
*
msg
)
{
SMqSetCVgReq
req
;
SMqSetCVgReq
req
=
{
0
}
;
tDecodeSMqSetCVgReq
(
msg
,
&
req
);
STqConsumerHandle
*
pConsumer
=
calloc
(
sizeof
(
STqConsumerHandle
),
1
);
STqConsumerHandle
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
req
.
oldConsumerId
);
if
(
pConsumer
==
NULL
)
{
return
-
1
;
pConsumer
=
calloc
(
sizeof
(
STqConsumerHandle
),
1
);
if
(
pConsumer
==
NULL
)
{
terrno
=
TSDB_CODE_TQ_OUT_OF_MEMORY
;
return
-
1
;
}
}
else
{
tqHandleMovePut
(
pTq
->
tqMeta
,
req
.
newConsumerId
,
pConsumer
);
tqHandleCommit
(
pTq
->
tqMeta
,
req
.
newConsumerId
);
tqHandlePurge
(
pTq
->
tqMeta
,
req
.
oldConsumerId
);
terrno
=
TSDB_CODE_SUCCESS
;
return
0
;
}
strcpy
(
pConsumer
->
cgroup
,
req
.
cgroup
);
pConsumer
->
topics
=
taosArrayInit
(
0
,
sizeof
(
STqTopicHandle
));
...
...
@@ -819,9 +317,9 @@ int32_t tqProcessSetConnReq(STQ* pTq, char* msg) {
return
-
1
;
}
strcpy
(
pTopic
->
topicName
,
req
.
topicName
);
pTopic
->
sql
=
strdup
(
req
.
sql
)
;
pTopic
->
logicalPlan
=
strdup
(
req
.
logicalPlan
)
;
pTopic
->
physicalPlan
=
strdup
(
req
.
physicalPlan
)
;
pTopic
->
sql
=
req
.
sql
;
pTopic
->
logicalPlan
=
req
.
logicalPlan
;
pTopic
->
physicalPlan
=
req
.
physicalPlan
;
pTopic
->
committedOffset
=
-
1
;
pTopic
->
currentOffset
=
-
1
;
...
...
source/dnode/vnode/src/tq/tqMetaStore.c
浏览文件 @
60ed607c
...
...
@@ -584,12 +584,30 @@ int32_t tqHandleDel(STqMetaStore* pMeta, int64_t key) {
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
STqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
if
(
pNode
->
handle
.
valueInTxn
)
{
pMeta
->
pDeleter
(
pNode
->
handle
.
valueInTxn
);
if
(
pNode
->
handle
.
key
==
key
)
{
if
(
pNode
->
handle
.
valueInTxn
!=
TQ_DELETE_TOKEN
)
{
if
(
pNode
->
handle
.
valueInTxn
)
{
pMeta
->
pDeleter
(
pNode
->
handle
.
valueInTxn
);
}
pNode
->
handle
.
valueInTxn
=
TQ_DELETE_TOKEN
;
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
}
}
else
{
pNode
=
pNode
->
next
;
}
}
terrno
=
TSDB_CODE_TQ_META_NO_SUCH_KEY
;
return
-
1
;
}
pNode
->
handle
.
valueInTxn
=
TQ_DELETE_TOKEN
;
int32_t
tqHandlePurge
(
STqMetaStore
*
pMeta
,
int64_t
key
)
{
int64_t
bucketKey
=
key
&
TQ_BUCKET_MASK
;
STqMetaList
*
pNode
=
pMeta
->
bucket
[
bucketKey
];
while
(
pNode
)
{
if
(
pNode
->
handle
.
key
==
key
)
{
pNode
->
handle
.
valueInUse
=
TQ_DELETE_TOKEN
;
tqLinkUnpersist
(
pMeta
,
pNode
);
return
0
;
}
else
{
...
...
source/dnode/vnode/src/vnd/vnodeWrite.c
浏览文件 @
60ed607c
...
...
@@ -16,6 +16,7 @@
#include "tq.h"
#include "vnd.h"
#if 0
int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
switch (pMsg->msgType) {
case TDMT_VND_MQ_SET_CUR:
...
...
@@ -26,6 +27,7 @@ int vnodeProcessNoWalWMsgs(SVnode *pVnode, SRpcMsg *pMsg) {
}
return 0;
}
#endif
int
vnodeProcessWMsgs
(
SVnode
*
pVnode
,
SArray
*
pMsgs
)
{
SRpcMsg
*
pMsg
;
...
...
source/libs/wal/src/walRead.c
浏览文件 @
60ed607c
...
...
@@ -169,10 +169,17 @@ int32_t walReadWithHandle(SWalReadHandle *pRead, int64_t ver) {
return
-
1
;
}
ASSERT
(
pRead
->
pHead
->
head
.
version
==
ver
);
if
(
pRead
->
pHead
->
head
.
version
!=
ver
)
{
/*wError("unexpected wal log version: %ld, read request version:%ld", pRead->pHead->head.version, ver);*/
pRead
->
curVersion
=
-
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
code
=
walValidBodyCksum
(
pRead
->
pHead
);
if
(
code
!=
0
)
{
/*wError("unexpected wal log version: checksum not passed");*/
pRead
->
curVersion
=
-
1
;
terrno
=
TSDB_CODE_WAL_FILE_CORRUPTED
;
return
-
1
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录