Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
0712f87d
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
0712f87d
编写于
1月 27, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
fix query crash
上级
5ea4eba3
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
244 addition
and
78 deletion
+244
-78
include/common/common.h
include/common/common.h
+12
-0
include/common/tmsg.h
include/common/tmsg.h
+7
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+113
-43
source/client/test/clientTests.cpp
source/client/test/clientTests.cpp
+2
-4
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+9
-1
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+89
-25
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+12
-5
未找到文件。
include/common/common.h
浏览文件 @
0712f87d
...
...
@@ -126,6 +126,12 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
int32_t
tlen
=
0
;
int32_t
sz
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
consumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
committedOffset
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
reqOffset
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
rspOffset
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
skipLogNum
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
numOfTopics
);
if
(
pRsp
->
numOfTopics
==
0
)
return
tlen
;
tlen
+=
tEncodeSSchemaWrapper
(
buf
,
pRsp
->
schemas
);
if
(
pRsp
->
pBlockData
)
{
sz
=
taosArrayGetSize
(
pRsp
->
pBlockData
);
...
...
@@ -141,6 +147,12 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
static
FORCE_INLINE
void
*
tDecodeSMqConsumeRsp
(
void
*
buf
,
SMqConsumeRsp
*
pRsp
)
{
int32_t
sz
;
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
consumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
committedOffset
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
reqOffset
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
rspOffset
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
skipLogNum
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
numOfTopics
);
if
(
pRsp
->
numOfTopics
==
0
)
return
buf
;
pRsp
->
schemas
=
(
SSchemaWrapper
*
)
calloc
(
1
,
sizeof
(
SSchemaWrapper
));
if
(
pRsp
->
schemas
==
NULL
)
return
NULL
;
buf
=
tDecodeSSchemaWrapper
(
buf
,
pRsp
->
schemas
);
...
...
include/common/tmsg.h
浏览文件 @
0712f87d
...
...
@@ -1655,6 +1655,10 @@ typedef struct SMqTopicBlk {
typedef
struct
SMqConsumeRsp
{
int64_t
consumerId
;
SSchemaWrapper
*
schemas
;
int64_t
committedOffset
;
int64_t
reqOffset
;
int64_t
rspOffset
;
int32_t
skipLogNum
;
int32_t
numOfTopics
;
SArray
*
pBlockData
;
//SArray<SSDataBlock>
}
SMqConsumeRsp
;
...
...
@@ -1688,6 +1692,7 @@ typedef struct SMqSubTopicEp {
typedef
struct
SMqCMGetSubEpRsp
{
int64_t
consumerId
;
int64_t
epoch
;
char
cgroup
[
TSDB_CONSUMER_GROUP_LEN
];
SArray
*
topics
;
// SArray<SMqSubTopicEp>
}
SMqCMGetSubEpRsp
;
...
...
@@ -1736,6 +1741,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
static
FORCE_INLINE
int32_t
tEncodeSMqCMGetSubEpRsp
(
void
**
buf
,
const
SMqCMGetSubEpRsp
*
pRsp
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
consumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
epoch
);
tlen
+=
taosEncodeString
(
buf
,
pRsp
->
cgroup
);
int32_t
sz
=
taosArrayGetSize
(
pRsp
->
topics
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
...
...
@@ -1748,6 +1754,7 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu
static
FORCE_INLINE
void
*
tDecodeSMqCMGetSubEpRsp
(
void
*
buf
,
SMqCMGetSubEpRsp
*
pRsp
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
consumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
epoch
);
buf
=
taosDecodeStringTo
(
buf
,
pRsp
->
cgroup
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
...
...
source/client/src/clientImpl.c
浏览文件 @
0712f87d
...
...
@@ -326,13 +326,17 @@ int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
struct
tmq_t
{
char
groupId
[
256
];
char
clientId
[
256
];
SRWLatch
lock
;
int64_t
consumerId
;
int64_t
epoch
;
int64_t
status
;
tsem_t
rspSem
;
STscObj
*
pTscObj
;
tmq_commit_cb
*
commit_cb
;
int32_t
nextTopicIdx
;
SArray
*
clientTopics
;
//SArray<SMqClientTopic>
//stat
int64_t
pollCnt
;
};
tmq_t
*
taos_consumer_new
(
void
*
conn
,
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
)
{
...
...
@@ -342,6 +346,9 @@ tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t err
}
pTmq
->
pTscObj
=
(
STscObj
*
)
conn
;
pTmq
->
status
=
0
;
pTmq
->
pollCnt
=
0
;
pTmq
->
epoch
=
0
;
taosInitRWLatch
(
&
pTmq
->
lock
);
strcpy
(
pTmq
->
clientId
,
conf
->
clientId
);
strcpy
(
pTmq
->
groupId
,
conf
->
groupId
);
pTmq
->
commit_cb
=
conf
->
commit_cb
;
...
...
@@ -621,34 +628,61 @@ struct tmq_message_t {
};
int32_t
tmq_poll_cb_inner
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SMqClientVg
*
pVg
=
(
SMqClientVg
*
)
param
;
SMqConsumeRsp
rsp
;
tDecodeSMqConsumeRsp
(
pMsg
->
pData
,
&
rsp
);
if
(
rsp
.
numOfTopics
==
0
)
{
/*printf("no data\n");*/
return
0
;
}
int32_t
colNum
=
rsp
.
schemas
->
nCols
;
pVg
->
currentOffset
=
rsp
.
rspOffset
;
/*printf("rsp offset: %ld\n", rsp.rspOffset);*/
/*printf("-----msg begin----\n");*/
printf
(
"|"
);
for
(
int32_t
i
=
0
;
i
<
colNum
;
i
++
)
{
printf
(
"
| %
s |"
,
rsp
.
schemas
->
pSchema
[
i
].
name
);
printf
(
"
%15
s |"
,
rsp
.
schemas
->
pSchema
[
i
].
name
);
}
printf
(
"
\n
"
);
printf
(
"=====================================
\n
"
);
int32_t
sz
=
taosArrayGetSize
(
rsp
.
pBlockData
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
rsp
.
pBlockData
,
i
);
int32_t
rows
=
pDataBlock
->
info
.
rows
;
for
(
int32_t
j
=
0
;
j
<
colNum
;
j
++
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
j
);
for
(
int32_t
k
=
0
;
k
<
rows
;
k
++
)
{
void
*
var
=
POINTER_SHIFT
(
pColInfoData
->
pData
,
k
*
pColInfoData
->
info
.
bytes
);
if
(
j
==
0
)
printf
(
" %ld "
,
*
(
int64_t
*
)
var
);
if
(
j
==
1
)
printf
(
" %d "
,
*
(
int32_t
*
)
var
);
for
(
int32_t
j
=
0
;
j
<
rows
;
j
++
)
{
printf
(
"|"
);
for
(
int32_t
k
=
0
;
k
<
colNum
;
k
++
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
k
);
void
*
var
=
POINTER_SHIFT
(
pColInfoData
->
pData
,
j
*
pColInfoData
->
info
.
bytes
);
switch
(
pColInfoData
->
info
.
type
)
{
case
TSDB_DATA_TYPE_TIMESTAMP
:
printf
(
" %15lu |"
,
*
(
uint64_t
*
)
var
);
break
;
case
TSDB_DATA_TYPE_INT
:
case
TSDB_DATA_TYPE_UINT
:
printf
(
" %15u |"
,
*
(
uint32_t
*
)
var
);
break
;
}
}
printf
(
"
\n
"
);
}
/*pDataBlock->*/
}
/*printf("\n-----msg end------\n");*/
return
0
;
}
typedef
struct
SMqAskEpCbParam
{
tmq_t
*
tmq
;
int32_t
wait
;
}
SMqAskEpCbParam
;
int32_t
tmq_ask_ep_cb
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
tmq_t
*
tmq
=
(
tmq_t
*
)
param
;
SMqAskEpCbParam
*
pParam
=
(
SMqAskEpCbParam
*
)
param
;
tmq_t
*
tmq
=
pParam
->
tmq
;
if
(
code
!=
0
)
{
tsem_post
(
&
tmq
->
rspSem
);
if
(
pParam
->
wait
)
{
tsem_post
(
&
tmq
->
rspSem
);
}
return
0
;
}
tscDebug
(
"tmq ask ep cb called"
);
...
...
@@ -657,36 +691,47 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
tDecodeSMqCMGetSubEpRsp
(
pMsg
->
pData
,
&
rsp
);
int32_t
sz
=
taosArrayGetSize
(
rsp
.
topics
);
// TODO: lock
tmq
->
clientTopics
=
taosArrayInit
(
sz
,
sizeof
(
SMqClientTopic
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqClientTopic
topic
=
{
0
};
SMqSubTopicEp
*
pTopicEp
=
taosArrayGet
(
rsp
.
topics
,
i
);
topic
.
topicName
=
strdup
(
pTopicEp
->
topic
);
int32_t
vgSz
=
taosArrayGetSize
(
pTopicEp
->
vgs
);
topic
.
vgs
=
taosArrayInit
(
vgSz
,
sizeof
(
SMqClientVg
));
for
(
int32_t
j
=
0
;
j
<
vgSz
;
j
++
)
{
SMqSubVgEp
*
pVgEp
=
taosArrayGet
(
pTopicEp
->
vgs
,
j
);
SMqClientVg
clientVg
=
{
.
pollCnt
=
0
,
.
committedOffset
=
-
1
,
.
currentOffset
=
-
1
,
.
vgId
=
pVgEp
->
vgId
,
.
epSet
=
pVgEp
->
epSet
};
taosArrayPush
(
topic
.
vgs
,
&
clientVg
);
set
=
true
;
if
(
rsp
.
epoch
!=
tmq
->
epoch
)
{
/*printf("rsp epoch %ld", rsp.epoch);*/
/*printf("tmq epoch %ld", tmq->epoch);*/
//TODO
if
(
tmq
->
clientTopics
)
taosArrayDestroy
(
tmq
->
clientTopics
);
tmq
->
clientTopics
=
taosArrayInit
(
sz
,
sizeof
(
SMqClientTopic
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SMqClientTopic
topic
=
{
0
};
SMqSubTopicEp
*
pTopicEp
=
taosArrayGet
(
rsp
.
topics
,
i
);
topic
.
topicName
=
strdup
(
pTopicEp
->
topic
);
int32_t
vgSz
=
taosArrayGetSize
(
pTopicEp
->
vgs
);
topic
.
vgs
=
taosArrayInit
(
vgSz
,
sizeof
(
SMqClientVg
));
for
(
int32_t
j
=
0
;
j
<
vgSz
;
j
++
)
{
SMqSubVgEp
*
pVgEp
=
taosArrayGet
(
pTopicEp
->
vgs
,
j
);
SMqClientVg
clientVg
=
{
.
pollCnt
=
0
,
.
committedOffset
=
-
1
,
.
currentOffset
=
-
1
,
.
vgId
=
pVgEp
->
vgId
,
.
epSet
=
pVgEp
->
epSet
};
taosArrayPush
(
topic
.
vgs
,
&
clientVg
);
set
=
true
;
}
taosArrayPush
(
tmq
->
clientTopics
,
&
topic
);
}
taosArrayPush
(
tmq
->
clientTopics
,
&
topic
);
tmq
->
epoch
=
rsp
.
epoch
;
}
if
(
set
)
{
atomic_store_64
(
&
tmq
->
status
,
1
);
}
if
(
set
)
tmq
->
status
=
1
;
// unlock
tsem_post
(
&
tmq
->
rspSem
);
/*tsem_post(&tmq->rspSem);*/
if
(
pParam
->
wait
)
{
tsem_post
(
&
tmq
->
rspSem
);
}
free
(
pParam
);
return
0
;
}
tmq_message_t
*
tmq_consume_poll
(
tmq_t
*
tmq
,
int64_t
blocking_time
)
{
if
(
taosArrayGetSize
(
tmq
->
clientTopics
)
==
0
||
tmq
->
status
==
0
)
{
int32_t
tmqAsyncAskEp
(
tmq_t
*
tmq
,
bool
wait
)
{
int32_t
tlen
=
sizeof
(
SMqCMGetSubEpReq
);
SMqCMGetSubEpReq
*
buf
=
malloc
(
tlen
);
if
(
buf
==
NULL
)
{
...
...
@@ -702,9 +747,17 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){
.
pData
=
buf
,
.
len
=
tlen
};
SMqAskEpCbParam
*
pParam
=
malloc
(
sizeof
(
SMqAskEpCbParam
));
if
(
pParam
==
NULL
)
{
free
(
buf
);
goto
END
;
}
pParam
->
tmq
=
tmq
;
pParam
->
wait
=
wait
;
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
param
=
tmq
;
sendInfo
->
param
=
pParam
;
sendInfo
->
fp
=
tmq_ask_ep_cb
;
SEpSet
epSet
=
getEpSet_s
(
&
tmq
->
pTscObj
->
pAppInfo
->
mgmtEp
);
...
...
@@ -712,11 +765,20 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
sendInfo
);
tsem_wait
(
&
tmq
->
rspSem
);
}
END:
if
(
wait
)
tsem_wait
(
&
tmq
->
rspSem
);
return
0
;
}
tmq_message_t
*
tmq_consume_poll
(
tmq_t
*
tmq
,
int64_t
blocking_time
)
{
int64_t
status
=
atomic_load_64
(
&
tmq
->
status
);
tmqAsyncAskEp
(
tmq
,
status
==
0
||
taosArrayGetSize
(
tmq
->
clientTopics
));
if
(
blocking_time
<
0
)
blocking_time
=
500
;
if
(
taosArrayGetSize
(
tmq
->
clientTopics
)
==
0
)
{
tscDebug
(
"consumer:%ld poll but not assigned"
,
tmq
->
consumerId
);
usleep
(
blocking_time
*
1000
);
return
NULL
;
}
...
...
@@ -730,10 +792,15 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
SMqClientTopic
*
pTopic
=
taosArrayGet
(
tmq
->
clientTopics
,
tmq
->
nextTopicIdx
);
tmq
->
nextTopicIdx
=
(
tmq
->
nextTopicIdx
+
1
)
%
taosArrayGetSize
(
tmq
->
clientTopics
);
strcpy
(
pReq
->
topic
,
pTopic
->
topicName
);
int32_t
nextVgIdx
=
pTopic
->
nextVgIdx
;
pTopic
->
nextVgIdx
=
(
nextVgIdx
+
1
)
%
taosArrayGetSize
(
pTopic
->
vgs
);
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
nextVgIdx
);
pReq
->
offset
=
pVg
->
currentOffset
;
int32_t
vgSz
=
taosArrayGetSize
(
pTopic
->
vgs
);
if
(
vgSz
==
0
)
{
free
(
pReq
);
usleep
(
blocking_time
*
1000
);
return
NULL
;
}
pTopic
->
nextVgIdx
=
(
pTopic
->
nextVgIdx
+
1
%
vgSz
);
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
pTopic
->
nextVgIdx
);
pReq
->
offset
=
pVg
->
currentOffset
+
1
;
pReq
->
head
.
vgId
=
htonl
(
pVg
->
vgId
);
pReq
->
head
.
contLen
=
htonl
(
sizeof
(
SMqConsumeReq
));
...
...
@@ -743,13 +810,16 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
sendInfo
->
requestObjRefId
=
0
;
/*sendInfo->param = &tmq_message;*/
sendInfo
->
param
=
pVg
;
sendInfo
->
fp
=
tmq_poll_cb_inner
;
/*printf("req offset: %ld\n", pReq->offset);*/
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
pVg
->
epSet
,
&
transporterId
,
sendInfo
);
tmq
->
pollCnt
++
;
tsem_wait
(
&
pRequest
->
body
.
rspSem
);
usleep
(
blocking_time
*
1000
);
return
tmq_message
;
...
...
source/client/test/clientTests.cpp
浏览文件 @
0712f87d
...
...
@@ -570,7 +570,6 @@ TEST(testCase, create_topic_Test) {
//taos_close(pConn);
//}
#if 0
TEST
(
testCase
,
tmq_subscribe_Test
)
{
TAOS
*
pConn
=
taos_connect
(
"localhost"
,
"root"
,
"taosdata"
,
NULL
,
0
);
assert
(
pConn
!=
NULL
);
...
...
@@ -590,12 +589,11 @@ TEST(testCase, tmq_subscribe_Test) {
tmq_subscribe
(
tmq
,
topic_list
);
while
(
1
)
{
tmq_message_t* msg = tmq_consume_poll(tmq, 0);
printf("get msg\n");
tmq_message_t
*
msg
=
tmq_consume_poll
(
tmq
,
100
0
);
//
printf("get msg\n");
//if (msg == NULL) break;
}
}
#endif
TEST
(
testCase
,
tmq_consume_Test
)
{
}
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
0712f87d
...
...
@@ -658,12 +658,17 @@ typedef struct SMqConsumerObj {
SRWLatch
lock
;
char
cgroup
[
TSDB_CONSUMER_GROUP_LEN
];
SArray
*
topics
;
// SArray<SMqConsumerTopic>
// SHashObj *topicHash; //SHashObj<SMqTopicObj>
int64_t
epoch
;
// stat
int64_t
pollCnt
;
}
SMqConsumerObj
;
static
FORCE_INLINE
int32_t
tEncodeSMqConsumerObj
(
void
**
buf
,
const
SMqConsumerObj
*
pConsumer
)
{
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
consumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
connId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
epoch
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
pollCnt
);
tlen
+=
taosEncodeString
(
buf
,
pConsumer
->
cgroup
);
int32_t
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
...
...
@@ -676,6 +681,9 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerO
static
FORCE_INLINE
void
*
tDecodeSMqConsumerObj
(
void
*
buf
,
SMqConsumerObj
*
pConsumer
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
consumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
connId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
epoch
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
pollCnt
);
buf
=
taosDecodeStringTo
(
buf
,
pConsumer
->
cgroup
);
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
0712f87d
...
...
@@ -30,6 +30,8 @@
#define MND_SUBSCRIBE_VER_NUMBER 1
#define MND_SUBSCRIBE_RESERVE_SIZE 64
#define MND_SUBSCRIBE_REBALANCE_MS 5000
static
char
*
mndMakeSubscribeKey
(
char
*
cgroup
,
char
*
topicName
);
static
SSdbRaw
*
mndSubActionEncode
(
SMqSubscribeObj
*
);
...
...
@@ -69,6 +71,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
SMqCMGetSubEpReq
*
pReq
=
(
SMqCMGetSubEpReq
*
)
pMsg
->
rpcMsg
.
pCont
;
SMqCMGetSubEpRsp
rsp
;
int64_t
consumerId
=
be64toh
(
pReq
->
consumerId
);
int64_t
currentTs
=
taosGetTimestampMs
();
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMsg
->
pMnode
,
consumerId
);
if
(
pConsumer
==
NULL
)
{
...
...
@@ -79,6 +82,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
strcpy
(
rsp
.
cgroup
,
pReq
->
cgroup
);
rsp
.
consumerId
=
consumerId
;
rsp
.
epoch
=
pConsumer
->
epoch
;
SArray
*
pTopics
=
pConsumer
->
topics
;
int32_t
sz
=
taosArrayGetSize
(
pTopics
);
rsp
.
topics
=
taosArrayInit
(
sz
,
sizeof
(
SMqSubTopicEp
));
...
...
@@ -88,21 +92,43 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
strcpy
(
topicEp
.
topic
,
pConsumerTopic
->
name
);
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
pConsumer
->
cgroup
,
pConsumerTopic
->
name
);
ASSERT
(
pSub
);
bool
found
=
0
;
bool
changed
=
0
;
for
(
int32_t
j
=
0
;
j
<
taosArrayGetSize
(
pSub
->
availConsumer
);
j
++
)
{
if
(
*
(
int64_t
*
)
taosArrayGet
(
pSub
->
availConsumer
,
j
)
==
consumerId
)
{
found
=
1
;
break
;
}
}
if
(
found
==
0
)
{
taosArrayPush
(
pSub
->
availConsumer
,
&
consumerId
);
}
SSdbRaw
*
pRaw
=
mndSubActionEncode
(
pSub
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
sdbWriteNotFree
(
pMnode
->
pSdb
,
pRaw
);
int32_t
assignedSz
=
taosArrayGetSize
(
pSub
->
assigned
);
topicEp
.
vgs
=
taosArrayInit
(
assignedSz
,
sizeof
(
SMqSubVgEp
));
for
(
int32_t
j
=
0
;
j
<
assignedSz
;
j
++
)
{
SMqConsumerEp
*
pCEp
=
taosArrayGet
(
pSub
->
assigned
,
j
);
if
(
pCEp
->
consumerId
==
consumerId
)
{
pCEp
->
lastConsumerHbTs
=
currentTs
;
SMqSubVgEp
vgEp
=
{
.
epSet
=
pCEp
->
epSet
,
.
vgId
=
pCEp
->
vgId
};
taosArrayPush
(
topicEp
.
vgs
,
&
vgEp
);
changed
=
1
;
}
}
if
(
taosArrayGetSize
(
topicEp
.
vgs
)
!=
0
)
{
taosArrayPush
(
rsp
.
topics
,
&
topicEp
);
}
if
(
changed
||
found
)
{
}
mndReleaseSubscribe
(
pMnode
,
pSub
);
}
int32_t
tlen
=
tEncodeSMqCMGetSubEpRsp
(
NULL
,
&
rsp
);
void
*
buf
=
rpcMallocCont
(
tlen
);
...
...
@@ -124,9 +150,9 @@ static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
i
++
;
}
key
[
i
]
=
0
;
*
topic
=
strdup
(
key
);
*
cgroup
=
strdup
(
key
);
key
[
i
]
=
':'
;
*
cgroup
=
strdup
(
&
key
[
i
+
1
]);
*
topic
=
strdup
(
&
key
[
i
+
1
]);
return
0
;
}
...
...
@@ -135,9 +161,37 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SMqSubscribeObj
*
pSub
=
NULL
;
void
*
pIter
=
sdbFetch
(
pSdb
,
SDB_SUBSCRIBE
,
NULL
,
(
void
**
)
&
pSub
);
int
sz
;
int64_t
currentTs
=
taosGetTimestampMs
();
int32_t
sz
;
while
(
pIter
!=
NULL
)
{
if
((
sz
=
taosArrayGetSize
(
pSub
->
unassignedVg
))
>
0
)
{
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pSub
->
assigned
);
i
++
)
{
SMqConsumerEp
*
pCEp
=
taosArrayGet
(
pSub
->
assigned
,
i
);
int64_t
consumerId
=
pCEp
->
consumerId
;
if
(
pCEp
->
lastConsumerHbTs
!=
-
1
&&
currentTs
-
pCEp
->
lastConsumerHbTs
>
MND_SUBSCRIBE_REBALANCE_MS
)
{
// put consumer into lostConsumer
taosArrayPush
(
pSub
->
lostConsumer
,
pCEp
);
// put vg into unassgined
taosArrayPush
(
pSub
->
unassignedVg
,
pCEp
);
// remove from assigned
// TODO: swap with last one, reduce size and reset i
taosArrayRemove
(
pSub
->
assigned
,
i
);
// remove from available consumer
for
(
int
j
=
0
;
j
<
taosArrayGetSize
(
pSub
->
availConsumer
);
j
++
)
{
if
(
*
(
int64_t
*
)
taosArrayGet
(
pSub
->
availConsumer
,
i
)
==
pCEp
->
consumerId
)
{
taosArrayRemove
(
pSub
->
availConsumer
,
j
);
break
;
}
// TODO: acquire consumer, set status to unavail
}
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMnode
,
consumerId
);
pConsumer
->
epoch
++
;
SSdbRaw
*
pRaw
=
mndConsumerActionEncode
(
pConsumer
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
sdbWriteNotFree
(
pMnode
->
pSdb
,
pRaw
);
mndReleaseConsumer
(
pMnode
,
pConsumer
);
}
}
if
((
sz
=
taosArrayGetSize
(
pSub
->
unassignedVg
))
>
0
&&
taosArrayGetSize
(
pSub
->
availConsumer
)
>
0
)
{
char
*
topic
=
NULL
;
char
*
cgroup
=
NULL
;
mndSplitSubscribeKey
(
pSub
->
key
,
&
topic
,
&
cgroup
);
...
...
@@ -146,7 +200,7 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
// create trans
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
&
pMsg
->
rpcMsg
);
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
for
(
int
32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int64_t
consumerId
=
*
(
int64_t
*
)
taosArrayGet
(
pSub
->
availConsumer
,
pSub
->
nextConsumerIdx
);
SMqConsumerEp
*
pCEp
=
taosArrayPop
(
pSub
->
unassignedVg
);
pCEp
->
consumerId
=
consumerId
;
...
...
@@ -155,49 +209,49 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
// build msg
SMqSetCVgReq
*
pReq
=
malloc
(
sizeof
(
SMqSetCVgReq
));
if
(
pReq
==
NULL
)
{
SMqSetCVgReq
req
=
{
0
};
strcpy
(
req
.
cgroup
,
cgroup
);
strcpy
(
req
.
topicName
,
topic
);
req
.
sql
=
pTopic
->
sql
;
req
.
logicalPlan
=
pTopic
->
logicalPlan
;
req
.
physicalPlan
=
pTopic
->
physicalPlan
;
req
.
qmsg
=
pCEp
->
qmsg
;
int32_t
tlen
=
tEncodeSMqSetCVgReq
(
NULL
,
&
req
);
void
*
buf
=
malloc
(
tlen
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
strcpy
(
pReq
->
cgroup
,
cgroup
);
strcpy
(
pReq
->
topicName
,
topic
);
pReq
->
sql
=
strdup
(
pTopic
->
sql
);
pReq
->
logicalPlan
=
strdup
(
pTopic
->
logicalPlan
);
pReq
->
physicalPlan
=
strdup
(
pTopic
->
physicalPlan
);
pReq
->
qmsg
=
strdup
(
pCEp
->
qmsg
);
int32_t
tlen
=
tEncodeSMqSetCVgReq
(
NULL
,
pReq
);
void
*
reqStr
=
malloc
(
tlen
);
if
(
reqStr
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
void
*
abuf
=
reqStr
;
tEncodeSMqSetCVgReq
(
&
abuf
,
pReq
);
SMsgHead
*
pMsgHead
=
(
SMsgHead
*
)
buf
;
pMsgHead
->
contLen
=
htonl
(
sizeof
(
SMsgHead
)
+
tlen
);
pMsgHead
->
vgId
=
htonl
(
pCEp
->
vgId
);
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tEncodeSMqSetCVgReq
(
&
abuf
,
&
req
);
// persist msg
STransAction
action
=
{
0
};
action
.
epSet
=
pCEp
->
epSet
;
action
.
pCont
=
reqStr
;
action
.
pCont
=
buf
;
action
.
contLen
=
tlen
;
action
.
msgType
=
TDMT_VND_MQ_SET_CONN
;
mndTransAppendRedoAction
(
pTrans
,
&
action
);
// persist raw
SSdbRaw
*
pRaw
=
mndSubActionEncode
(
pSub
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mndTransAppendRedolog
(
pTrans
,
pRaw
);
free
(
pReq
);
tfree
(
topic
);
tfree
(
cgroup
);
}
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
}
/*mndReleaseTopic(pMnode, pTopic);*/
mndReleaseTopic
(
pMnode
,
pTopic
);
mndTransDrop
(
pTrans
);
}
pIter
=
sdbFetch
(
pSdb
,
SDB_SUBSCRIBE
,
NULL
,
(
void
**
)
&
pSub
);
pIter
=
sdbFetch
(
pSdb
,
SDB_SUBSCRIBE
,
pIter
,
(
void
**
)
&
pSub
);
}
return
0
;
}
...
...
@@ -434,10 +488,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
pConsumer
->
epoch
=
1
;
pConsumer
->
consumerId
=
consumerId
;
strcpy
(
pConsumer
->
cgroup
,
consumerGroup
);
taosInitRWLatch
(
&
pConsumer
->
lock
);
}
else
{
pConsumer
->
epoch
++
;
oldSub
=
pConsumer
->
topics
;
}
pConsumer
->
topics
=
taosArrayInit
(
newTopicNum
,
sizeof
(
SMqConsumerTopic
));
...
...
@@ -541,6 +597,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
}
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
consumerGroup
,
newTopicName
);
bool
create
=
false
;
if
(
pSub
==
NULL
)
{
mDebug
(
"create new subscription, group: %s, topic %s"
,
consumerGroup
,
newTopicName
);
pSub
=
tNewSubscribeObj
();
...
...
@@ -549,10 +606,16 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
return
-
1
;
}
char
*
key
=
mndMakeSubscribeKey
(
consumerGroup
,
newTopicName
);
if
(
key
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
strcpy
(
pSub
->
key
,
key
);
free
(
key
);
// set unassigned vg
mndInitUnassignedVg
(
pMnode
,
pTopic
,
pSub
->
unassignedVg
);
// TODO: disable alter
create
=
true
;
}
taosArrayPush
(
pSub
->
availConsumer
,
&
consumerId
);
...
...
@@ -575,6 +638,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
SSdbRaw
*
pRaw
=
mndSubActionEncode
(
pSub
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mndTransAppendRedolog
(
pTrans
,
pRaw
);
if
(
!
create
)
mndReleaseSubscribe
(
pMnode
,
pSub
);
#if 0
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
if (pGroup == NULL) {
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
0712f87d
...
...
@@ -670,15 +670,13 @@ int tqItemSSize() {
int32_t
tqProcessConsumeReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
SMqConsumeReq
*
pReq
=
pMsg
->
pCont
;
SRpcMsg
rpcMsg
;
int64_t
reqId
=
pReq
->
reqId
;
int64_t
consumerId
=
pReq
->
consumerId
;
int64_t
reqOffset
=
pReq
->
offset
;
int64_t
fetchOffset
=
reqOffset
;
int64_t
fetchOffset
=
pReq
->
offset
;
int64_t
blockingTime
=
pReq
->
blockingTime
;
int
rspLen
=
0
;
SMqConsumeRsp
rsp
=
{.
consumerId
=
consumerId
,
.
numOfTopics
=
1
,
.
pBlockData
=
NULL
};
SMqConsumeRsp
rsp
=
{.
consumerId
=
consumerId
,
.
numOfTopics
=
0
,
.
pBlockData
=
NULL
};
STqConsumerHandle
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
consumerId
);
ASSERT
(
pConsumer
);
...
...
@@ -690,6 +688,9 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
if
(
strcmp
(
pTopic
->
topicName
,
pReq
->
topic
)
!=
0
)
{
continue
;
}
rsp
.
committedOffset
=
pTopic
->
committedOffset
;
rsp
.
reqOffset
=
pReq
->
offset
;
rsp
.
skipLogNum
=
0
;
if
(
fetchOffset
==
-
1
)
{
fetchOffset
=
pTopic
->
committedOffset
+
1
;
...
...
@@ -715,6 +716,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
if
(
pHead
->
head
.
msgType
==
TDMT_VND_SUBMIT
)
{
break
;
}
rsp
.
skipLogNum
++
;
if
(
walReadWithHandle
(
pTopic
->
pReadhandle
,
fetchOffset
)
<
0
)
{
atomic_store_8
(
&
pTopic
->
buffer
.
output
[
pos
].
status
,
0
);
skip
=
1
;
...
...
@@ -745,6 +747,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
}
//TODO copy
rsp
.
schemas
=
pTopic
->
buffer
.
output
[
pos
].
pReadHandle
->
pSchemaWrapper
;
rsp
.
rspOffset
=
fetchOffset
;
atomic_store_8
(
&
pTopic
->
buffer
.
output
[
pos
].
status
,
0
);
...
...
@@ -752,6 +755,8 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
taosArrayDestroy
(
pRes
);
fetchOffset
++
;
continue
;
}
else
{
rsp
.
numOfTopics
++
;
}
rsp
.
pBlockData
=
pRes
;
...
...
@@ -931,6 +936,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
SMemRow
row
;
int32_t
kvIdx
=
0
;
int32_t
curRow
=
0
;
tInitSubmitBlkIter
(
pHandle
->
pBlock
,
&
pHandle
->
blkIter
);
while
((
row
=
tGetSubmitBlkNext
(
&
pHandle
->
blkIter
))
!=
NULL
)
{
// get all wanted col of that block
...
...
@@ -940,8 +946,9 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
// TODO
ASSERT
(
pCol
->
colId
==
pColData
->
info
.
colId
);
void
*
val
=
tdGetMemRowDataOfColEx
(
row
,
pCol
->
colId
,
pCol
->
type
,
TD_DATA_ROW_HEAD_SIZE
+
pCol
->
offset
,
&
kvIdx
);
memcpy
(
pColData
->
pData
,
val
,
pCol
->
bytes
);
memcpy
(
POINTER_SHIFT
(
pColData
->
pData
,
curRow
*
pCol
->
bytes
)
,
val
,
pCol
->
bytes
);
}
curRow
++
;
}
return
pArray
;
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录