Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
36112eb4
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
36112eb4
编写于
1月 27, 2022
作者:
L
Liu Jicong
提交者:
GitHub
1月 27, 2022
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #10069 from taosdata/feature/tq
Feature/tq
上级
c14cb952
169fed61
变更
7
隐藏空白更改
内联
并排
Showing
7 changed file
with
285 addition
and
76 deletion
+285
-76
include/common/common.h
include/common/common.h
+12
-0
include/common/tmsg.h
include/common/tmsg.h
+7
-0
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+119
-43
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+9
-1
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+100
-26
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+14
-0
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+24
-6
未找到文件。
include/common/common.h
浏览文件 @
36112eb4
...
@@ -130,6 +130,12 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
...
@@ -130,6 +130,12 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
int32_t
tlen
=
0
;
int32_t
tlen
=
0
;
int32_t
sz
=
0
;
int32_t
sz
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
consumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
consumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
committedOffset
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
reqOffset
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
rspOffset
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
skipLogNum
);
tlen
+=
taosEncodeFixedI32
(
buf
,
pRsp
->
numOfTopics
);
if
(
pRsp
->
numOfTopics
==
0
)
return
tlen
;
tlen
+=
tEncodeSSchemaWrapper
(
buf
,
pRsp
->
schemas
);
tlen
+=
tEncodeSSchemaWrapper
(
buf
,
pRsp
->
schemas
);
if
(
pRsp
->
pBlockData
)
{
if
(
pRsp
->
pBlockData
)
{
sz
=
taosArrayGetSize
(
pRsp
->
pBlockData
);
sz
=
taosArrayGetSize
(
pRsp
->
pBlockData
);
...
@@ -145,6 +151,12 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
...
@@ -145,6 +151,12 @@ static FORCE_INLINE int32_t tEncodeSMqConsumeRsp(void** buf, const SMqConsumeRsp
static
FORCE_INLINE
void
*
tDecodeSMqConsumeRsp
(
void
*
buf
,
SMqConsumeRsp
*
pRsp
)
{
static
FORCE_INLINE
void
*
tDecodeSMqConsumeRsp
(
void
*
buf
,
SMqConsumeRsp
*
pRsp
)
{
int32_t
sz
;
int32_t
sz
;
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
consumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
consumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
committedOffset
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
reqOffset
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
rspOffset
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
skipLogNum
);
buf
=
taosDecodeFixedI32
(
buf
,
&
pRsp
->
numOfTopics
);
if
(
pRsp
->
numOfTopics
==
0
)
return
buf
;
pRsp
->
schemas
=
(
SSchemaWrapper
*
)
calloc
(
1
,
sizeof
(
SSchemaWrapper
));
pRsp
->
schemas
=
(
SSchemaWrapper
*
)
calloc
(
1
,
sizeof
(
SSchemaWrapper
));
if
(
pRsp
->
schemas
==
NULL
)
return
NULL
;
if
(
pRsp
->
schemas
==
NULL
)
return
NULL
;
buf
=
tDecodeSSchemaWrapper
(
buf
,
pRsp
->
schemas
);
buf
=
tDecodeSSchemaWrapper
(
buf
,
pRsp
->
schemas
);
...
...
include/common/tmsg.h
浏览文件 @
36112eb4
...
@@ -1702,6 +1702,10 @@ typedef struct SMqTopicBlk {
...
@@ -1702,6 +1702,10 @@ typedef struct SMqTopicBlk {
typedef
struct
SMqConsumeRsp
{
typedef
struct
SMqConsumeRsp
{
int64_t
consumerId
;
int64_t
consumerId
;
SSchemaWrapper
*
schemas
;
SSchemaWrapper
*
schemas
;
int64_t
committedOffset
;
int64_t
reqOffset
;
int64_t
rspOffset
;
int32_t
skipLogNum
;
int32_t
numOfTopics
;
int32_t
numOfTopics
;
SArray
*
pBlockData
;
//SArray<SSDataBlock>
SArray
*
pBlockData
;
//SArray<SSDataBlock>
}
SMqConsumeRsp
;
}
SMqConsumeRsp
;
...
@@ -1735,6 +1739,7 @@ typedef struct SMqSubTopicEp {
...
@@ -1735,6 +1739,7 @@ typedef struct SMqSubTopicEp {
typedef
struct
SMqCMGetSubEpRsp
{
typedef
struct
SMqCMGetSubEpRsp
{
int64_t
consumerId
;
int64_t
consumerId
;
int64_t
epoch
;
char
cgroup
[
TSDB_CONSUMER_GROUP_LEN
];
char
cgroup
[
TSDB_CONSUMER_GROUP_LEN
];
SArray
*
topics
;
// SArray<SMqSubTopicEp>
SArray
*
topics
;
// SArray<SMqSubTopicEp>
}
SMqCMGetSubEpRsp
;
}
SMqCMGetSubEpRsp
;
...
@@ -1783,6 +1788,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
...
@@ -1783,6 +1788,7 @@ static FORCE_INLINE void* tDecodeSMqSubTopicEp(void* buf, SMqSubTopicEp* pTopicE
static
FORCE_INLINE
int32_t
tEncodeSMqCMGetSubEpRsp
(
void
**
buf
,
const
SMqCMGetSubEpRsp
*
pRsp
)
{
static
FORCE_INLINE
int32_t
tEncodeSMqCMGetSubEpRsp
(
void
**
buf
,
const
SMqCMGetSubEpRsp
*
pRsp
)
{
int32_t
tlen
=
0
;
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
consumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
consumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pRsp
->
epoch
);
tlen
+=
taosEncodeString
(
buf
,
pRsp
->
cgroup
);
tlen
+=
taosEncodeString
(
buf
,
pRsp
->
cgroup
);
int32_t
sz
=
taosArrayGetSize
(
pRsp
->
topics
);
int32_t
sz
=
taosArrayGetSize
(
pRsp
->
topics
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
...
@@ -1795,6 +1801,7 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu
...
@@ -1795,6 +1801,7 @@ static FORCE_INLINE int32_t tEncodeSMqCMGetSubEpRsp(void** buf, const SMqCMGetSu
static
FORCE_INLINE
void
*
tDecodeSMqCMGetSubEpRsp
(
void
*
buf
,
SMqCMGetSubEpRsp
*
pRsp
)
{
static
FORCE_INLINE
void
*
tDecodeSMqCMGetSubEpRsp
(
void
*
buf
,
SMqCMGetSubEpRsp
*
pRsp
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
consumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
consumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pRsp
->
epoch
);
buf
=
taosDecodeStringTo
(
buf
,
pRsp
->
cgroup
);
buf
=
taosDecodeStringTo
(
buf
,
pRsp
->
cgroup
);
int32_t
sz
;
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
...
...
source/client/src/clientImpl.c
浏览文件 @
36112eb4
...
@@ -325,13 +325,17 @@ int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
...
@@ -325,13 +325,17 @@ int32_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value) {
struct
tmq_t
{
struct
tmq_t
{
char
groupId
[
256
];
char
groupId
[
256
];
char
clientId
[
256
];
char
clientId
[
256
];
SRWLatch
lock
;
int64_t
consumerId
;
int64_t
consumerId
;
int64_t
epoch
;
int64_t
status
;
int64_t
status
;
tsem_t
rspSem
;
tsem_t
rspSem
;
STscObj
*
pTscObj
;
STscObj
*
pTscObj
;
tmq_commit_cb
*
commit_cb
;
tmq_commit_cb
*
commit_cb
;
int32_t
nextTopicIdx
;
int32_t
nextTopicIdx
;
SArray
*
clientTopics
;
//SArray<SMqClientTopic>
SArray
*
clientTopics
;
//SArray<SMqClientTopic>
//stat
int64_t
pollCnt
;
};
};
tmq_t
*
taos_consumer_new
(
void
*
conn
,
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
)
{
tmq_t
*
taos_consumer_new
(
void
*
conn
,
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
)
{
...
@@ -341,6 +345,9 @@ tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t err
...
@@ -341,6 +345,9 @@ tmq_t* taos_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t err
}
}
pTmq
->
pTscObj
=
(
STscObj
*
)
conn
;
pTmq
->
pTscObj
=
(
STscObj
*
)
conn
;
pTmq
->
status
=
0
;
pTmq
->
status
=
0
;
pTmq
->
pollCnt
=
0
;
pTmq
->
epoch
=
0
;
taosInitRWLatch
(
&
pTmq
->
lock
);
strcpy
(
pTmq
->
clientId
,
conf
->
clientId
);
strcpy
(
pTmq
->
clientId
,
conf
->
clientId
);
strcpy
(
pTmq
->
groupId
,
conf
->
groupId
);
strcpy
(
pTmq
->
groupId
,
conf
->
groupId
);
pTmq
->
commit_cb
=
conf
->
commit_cb
;
pTmq
->
commit_cb
=
conf
->
commit_cb
;
...
@@ -615,34 +622,67 @@ struct tmq_message_t {
...
@@ -615,34 +622,67 @@ struct tmq_message_t {
};
};
int32_t
tmq_poll_cb_inner
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
int32_t
tmq_poll_cb_inner
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
if
(
code
==
-
1
)
{
printf
(
"discard
\n
"
);
return
0
;
}
SMqClientVg
*
pVg
=
(
SMqClientVg
*
)
param
;
SMqConsumeRsp
rsp
;
SMqConsumeRsp
rsp
;
tDecodeSMqConsumeRsp
(
pMsg
->
pData
,
&
rsp
);
tDecodeSMqConsumeRsp
(
pMsg
->
pData
,
&
rsp
);
if
(
rsp
.
numOfTopics
==
0
)
{
/*printf("no data\n");*/
return
0
;
}
int32_t
colNum
=
rsp
.
schemas
->
nCols
;
int32_t
colNum
=
rsp
.
schemas
->
nCols
;
pVg
->
currentOffset
=
rsp
.
rspOffset
;
/*printf("rsp offset: %ld\n", rsp.rspOffset);*/
/*printf("-----msg begin----\n");*/
printf
(
"|"
);
for
(
int32_t
i
=
0
;
i
<
colNum
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
colNum
;
i
++
)
{
printf
(
"
| %
s |"
,
rsp
.
schemas
->
pSchema
[
i
].
name
);
printf
(
"
%15
s |"
,
rsp
.
schemas
->
pSchema
[
i
].
name
);
}
}
printf
(
"
\n
"
);
printf
(
"
\n
"
);
printf
(
"=====================================
\n
"
);
int32_t
sz
=
taosArrayGetSize
(
rsp
.
pBlockData
);
int32_t
sz
=
taosArrayGetSize
(
rsp
.
pBlockData
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
rsp
.
pBlockData
,
i
);
SSDataBlock
*
pDataBlock
=
taosArrayGet
(
rsp
.
pBlockData
,
i
);
int32_t
rows
=
pDataBlock
->
info
.
rows
;
int32_t
rows
=
pDataBlock
->
info
.
rows
;
for
(
int32_t
j
=
0
;
j
<
colNum
;
j
++
)
{
for
(
int32_t
j
=
0
;
j
<
rows
;
j
++
)
{
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
j
);
printf
(
"|"
);
for
(
int32_t
k
=
0
;
k
<
rows
;
k
++
)
{
for
(
int32_t
k
=
0
;
k
<
colNum
;
k
++
)
{
void
*
var
=
POINTER_SHIFT
(
pColInfoData
->
pData
,
k
*
pColInfoData
->
info
.
bytes
);
SColumnInfoData
*
pColInfoData
=
taosArrayGet
(
pDataBlock
->
pDataBlock
,
k
);
if
(
j
==
0
)
printf
(
" %ld "
,
*
(
int64_t
*
)
var
);
void
*
var
=
POINTER_SHIFT
(
pColInfoData
->
pData
,
j
*
pColInfoData
->
info
.
bytes
);
if
(
j
==
1
)
printf
(
" %d "
,
*
(
int32_t
*
)
var
);
switch
(
pColInfoData
->
info
.
type
)
{
case
TSDB_DATA_TYPE_TIMESTAMP
:
printf
(
" %15lu |"
,
*
(
uint64_t
*
)
var
);
break
;
case
TSDB_DATA_TYPE_INT
:
case
TSDB_DATA_TYPE_UINT
:
printf
(
" %15u |"
,
*
(
uint32_t
*
)
var
);
break
;
}
}
}
printf
(
"
\n
"
);
}
}
/*pDataBlock->*/
}
}
/*printf("\n-----msg end------\n");*/
return
0
;
return
0
;
}
}
typedef
struct
SMqAskEpCbParam
{
tmq_t
*
tmq
;
int32_t
wait
;
}
SMqAskEpCbParam
;
int32_t
tmq_ask_ep_cb
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
int32_t
tmq_ask_ep_cb
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
tmq_t
*
tmq
=
(
tmq_t
*
)
param
;
SMqAskEpCbParam
*
pParam
=
(
SMqAskEpCbParam
*
)
param
;
tmq_t
*
tmq
=
pParam
->
tmq
;
if
(
code
!=
0
)
{
if
(
code
!=
0
)
{
tsem_post
(
&
tmq
->
rspSem
);
printf
(
"exit wait %d
\n
"
,
pParam
->
wait
);
if
(
pParam
->
wait
)
{
tsem_post
(
&
tmq
->
rspSem
);
}
return
0
;
return
0
;
}
}
tscDebug
(
"tmq ask ep cb called"
);
tscDebug
(
"tmq ask ep cb called"
);
...
@@ -651,36 +691,47 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
...
@@ -651,36 +691,47 @@ int32_t tmq_ask_ep_cb(void* param, const SDataBuf* pMsg, int32_t code) {
tDecodeSMqCMGetSubEpRsp
(
pMsg
->
pData
,
&
rsp
);
tDecodeSMqCMGetSubEpRsp
(
pMsg
->
pData
,
&
rsp
);
int32_t
sz
=
taosArrayGetSize
(
rsp
.
topics
);
int32_t
sz
=
taosArrayGetSize
(
rsp
.
topics
);
// TODO: lock
// TODO: lock
tmq
->
clientTopics
=
taosArrayInit
(
sz
,
sizeof
(
SMqClientTopic
));
printf
(
"rsp epoch %ld sz %ld
\n
"
,
rsp
.
epoch
,
rsp
.
topics
->
size
);
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
printf
(
"tmq epoch %ld sz %ld
\n
"
,
tmq
->
epoch
,
tmq
->
clientTopics
->
size
);
SMqClientTopic
topic
=
{
0
};
if
(
rsp
.
epoch
!=
tmq
->
epoch
)
{
SMqSubTopicEp
*
pTopicEp
=
taosArrayGet
(
rsp
.
topics
,
i
);
//TODO
topic
.
topicName
=
strdup
(
pTopicEp
->
topic
);
if
(
tmq
->
clientTopics
)
taosArrayDestroy
(
tmq
->
clientTopics
);
int32_t
vgSz
=
taosArrayGetSize
(
pTopicEp
->
vgs
);
tmq
->
clientTopics
=
taosArrayInit
(
sz
,
sizeof
(
SMqClientTopic
));
topic
.
vgs
=
taosArrayInit
(
vgSz
,
sizeof
(
SMqClientVg
));
for
(
int32_t
i
=
0
;
i
<
sz
;
i
++
)
{
for
(
int32_t
j
=
0
;
j
<
vgSz
;
j
++
)
{
SMqClientTopic
topic
=
{
0
};
SMqSubVgEp
*
pVgEp
=
taosArrayGet
(
pTopicEp
->
vgs
,
j
);
SMqSubTopicEp
*
pTopicEp
=
taosArrayGet
(
rsp
.
topics
,
i
);
SMqClientVg
clientVg
=
{
topic
.
topicName
=
strdup
(
pTopicEp
->
topic
);
.
pollCnt
=
0
,
int32_t
vgSz
=
taosArrayGetSize
(
pTopicEp
->
vgs
);
.
committedOffset
=
-
1
,
topic
.
vgs
=
taosArrayInit
(
vgSz
,
sizeof
(
SMqClientVg
));
.
currentOffset
=
-
1
,
for
(
int32_t
j
=
0
;
j
<
vgSz
;
j
++
)
{
.
vgId
=
pVgEp
->
vgId
,
SMqSubVgEp
*
pVgEp
=
taosArrayGet
(
pTopicEp
->
vgs
,
j
);
.
epSet
=
pVgEp
->
epSet
SMqClientVg
clientVg
=
{
};
.
pollCnt
=
0
,
taosArrayPush
(
topic
.
vgs
,
&
clientVg
);
.
committedOffset
=
-
1
,
set
=
true
;
.
currentOffset
=
-
1
,
.
vgId
=
pVgEp
->
vgId
,
.
epSet
=
pVgEp
->
epSet
};
taosArrayPush
(
topic
.
vgs
,
&
clientVg
);
set
=
true
;
}
taosArrayPush
(
tmq
->
clientTopics
,
&
topic
);
}
}
taosArrayPush
(
tmq
->
clientTopics
,
&
topic
);
tmq
->
epoch
=
rsp
.
epoch
;
}
if
(
set
)
{
atomic_store_64
(
&
tmq
->
status
,
1
);
}
}
if
(
set
)
tmq
->
status
=
1
;
// unlock
// unlock
tsem_post
(
&
tmq
->
rspSem
);
/*tsem_post(&tmq->rspSem);*/
if
(
pParam
->
wait
)
{
tsem_post
(
&
tmq
->
rspSem
);
}
free
(
pParam
);
return
0
;
return
0
;
}
}
tmq_message_t
*
tmq_consume_poll
(
tmq_t
*
tmq
,
int64_t
blocking_time
)
{
int32_t
tmqAsyncAskEp
(
tmq_t
*
tmq
,
bool
wait
)
{
if
(
taosArrayGetSize
(
tmq
->
clientTopics
)
==
0
||
tmq
->
status
==
0
)
{
int32_t
tlen
=
sizeof
(
SMqCMGetSubEpReq
);
int32_t
tlen
=
sizeof
(
SMqCMGetSubEpReq
);
SMqCMGetSubEpReq
*
buf
=
malloc
(
tlen
);
SMqCMGetSubEpReq
*
buf
=
malloc
(
tlen
);
if
(
buf
==
NULL
)
{
if
(
buf
==
NULL
)
{
...
@@ -696,9 +747,17 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
...
@@ -696,9 +747,17 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){
.
pData
=
buf
,
.
len
=
tlen
};
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){
.
pData
=
buf
,
.
len
=
tlen
};
SMqAskEpCbParam
*
pParam
=
malloc
(
sizeof
(
SMqAskEpCbParam
));
if
(
pParam
==
NULL
)
{
free
(
buf
);
goto
END
;
}
pParam
->
tmq
=
tmq
;
pParam
->
wait
=
wait
;
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
param
=
tmq
;
sendInfo
->
param
=
pParam
;
sendInfo
->
fp
=
tmq_ask_ep_cb
;
sendInfo
->
fp
=
tmq_ask_ep_cb
;
SEpSet
epSet
=
getEpSet_s
(
&
tmq
->
pTscObj
->
pAppInfo
->
mgmtEp
);
SEpSet
epSet
=
getEpSet_s
(
&
tmq
->
pTscObj
->
pAppInfo
->
mgmtEp
);
...
@@ -706,11 +765,20 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
...
@@ -706,11 +765,20 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
int64_t
transporterId
=
0
;
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
sendInfo
);
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
sendInfo
);
tsem_wait
(
&
tmq
->
rspSem
);
END:
}
if
(
wait
)
tsem_wait
(
&
tmq
->
rspSem
);
return
0
;
}
tmq_message_t
*
tmq_consume_poll
(
tmq_t
*
tmq
,
int64_t
blocking_time
)
{
int64_t
status
=
atomic_load_64
(
&
tmq
->
status
);
tmqAsyncAskEp
(
tmq
,
status
==
0
||
taosArrayGetSize
(
tmq
->
clientTopics
));
if
(
blocking_time
<
0
)
blocking_time
=
500
;
if
(
taosArrayGetSize
(
tmq
->
clientTopics
)
==
0
)
{
if
(
taosArrayGetSize
(
tmq
->
clientTopics
)
==
0
)
{
tscDebug
(
"consumer:%ld poll but not assigned"
,
tmq
->
consumerId
);
tscDebug
(
"consumer:%ld poll but not assigned"
,
tmq
->
consumerId
);
usleep
(
blocking_time
*
1000
);
return
NULL
;
return
NULL
;
}
}
...
@@ -724,10 +792,15 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
...
@@ -724,10 +792,15 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
SMqClientTopic
*
pTopic
=
taosArrayGet
(
tmq
->
clientTopics
,
tmq
->
nextTopicIdx
);
SMqClientTopic
*
pTopic
=
taosArrayGet
(
tmq
->
clientTopics
,
tmq
->
nextTopicIdx
);
tmq
->
nextTopicIdx
=
(
tmq
->
nextTopicIdx
+
1
)
%
taosArrayGetSize
(
tmq
->
clientTopics
);
tmq
->
nextTopicIdx
=
(
tmq
->
nextTopicIdx
+
1
)
%
taosArrayGetSize
(
tmq
->
clientTopics
);
strcpy
(
pReq
->
topic
,
pTopic
->
topicName
);
strcpy
(
pReq
->
topic
,
pTopic
->
topicName
);
int32_t
nextVgIdx
=
pTopic
->
nextVgIdx
;
int32_t
vgSz
=
taosArrayGetSize
(
pTopic
->
vgs
);
pTopic
->
nextVgIdx
=
(
nextVgIdx
+
1
)
%
taosArrayGetSize
(
pTopic
->
vgs
);
if
(
vgSz
==
0
)
{
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
nextVgIdx
);
free
(
pReq
);
pReq
->
offset
=
pVg
->
currentOffset
;
usleep
(
blocking_time
*
1000
);
return
NULL
;
}
pTopic
->
nextVgIdx
=
(
pTopic
->
nextVgIdx
+
1
%
vgSz
);
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
pTopic
->
nextVgIdx
);
pReq
->
offset
=
pVg
->
currentOffset
+
1
;
pReq
->
head
.
vgId
=
htonl
(
pVg
->
vgId
);
pReq
->
head
.
vgId
=
htonl
(
pVg
->
vgId
);
pReq
->
head
.
contLen
=
htonl
(
sizeof
(
SMqConsumeReq
));
pReq
->
head
.
contLen
=
htonl
(
sizeof
(
SMqConsumeReq
));
...
@@ -737,13 +810,16 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
...
@@ -737,13 +810,16 @@ tmq_message_t* tmq_consume_poll(tmq_t* tmq, int64_t blocking_time) {
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
requestObjRefId
=
0
;
/*sendInfo->param = &tmq_message;*/
sendInfo
->
param
=
pVg
;
sendInfo
->
fp
=
tmq_poll_cb_inner
;
sendInfo
->
fp
=
tmq_poll_cb_inner
;
/*printf("req offset: %ld\n", pReq->offset);*/
int64_t
transporterId
=
0
;
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
pVg
->
epSet
,
&
transporterId
,
sendInfo
);
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
pVg
->
epSet
,
&
transporterId
,
sendInfo
);
tmq
->
pollCnt
++
;
tsem_wait
(
&
pRequest
->
body
.
rspSem
);
usleep
(
blocking_time
*
1000
);
return
tmq_message
;
return
tmq_message
;
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
36112eb4
...
@@ -660,12 +660,17 @@ typedef struct SMqConsumerObj {
...
@@ -660,12 +660,17 @@ typedef struct SMqConsumerObj {
SRWLatch
lock
;
SRWLatch
lock
;
char
cgroup
[
TSDB_CONSUMER_GROUP_LEN
];
char
cgroup
[
TSDB_CONSUMER_GROUP_LEN
];
SArray
*
topics
;
// SArray<SMqConsumerTopic>
SArray
*
topics
;
// SArray<SMqConsumerTopic>
// SHashObj *topicHash; //SHashObj<SMqTopicObj>
int64_t
epoch
;
// stat
int64_t
pollCnt
;
}
SMqConsumerObj
;
}
SMqConsumerObj
;
static
FORCE_INLINE
int32_t
tEncodeSMqConsumerObj
(
void
**
buf
,
const
SMqConsumerObj
*
pConsumer
)
{
static
FORCE_INLINE
int32_t
tEncodeSMqConsumerObj
(
void
**
buf
,
const
SMqConsumerObj
*
pConsumer
)
{
int32_t
tlen
=
0
;
int32_t
tlen
=
0
;
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
consumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
consumerId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
connId
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
epoch
);
tlen
+=
taosEncodeFixedI64
(
buf
,
pConsumer
->
pollCnt
);
tlen
+=
taosEncodeString
(
buf
,
pConsumer
->
cgroup
);
tlen
+=
taosEncodeString
(
buf
,
pConsumer
->
cgroup
);
int32_t
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
int32_t
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
tlen
+=
taosEncodeFixedI32
(
buf
,
sz
);
...
@@ -678,6 +683,9 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerO
...
@@ -678,6 +683,9 @@ static FORCE_INLINE int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerO
static
FORCE_INLINE
void
*
tDecodeSMqConsumerObj
(
void
*
buf
,
SMqConsumerObj
*
pConsumer
)
{
static
FORCE_INLINE
void
*
tDecodeSMqConsumerObj
(
void
*
buf
,
SMqConsumerObj
*
pConsumer
)
{
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
consumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
consumerId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
connId
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
epoch
);
buf
=
taosDecodeFixedI64
(
buf
,
&
pConsumer
->
pollCnt
);
buf
=
taosDecodeStringTo
(
buf
,
pConsumer
->
cgroup
);
buf
=
taosDecodeStringTo
(
buf
,
pConsumer
->
cgroup
);
int32_t
sz
;
int32_t
sz
;
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
buf
=
taosDecodeFixedI32
(
buf
,
&
sz
);
...
...
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
36112eb4
...
@@ -30,6 +30,8 @@
...
@@ -30,6 +30,8 @@
#define MND_SUBSCRIBE_VER_NUMBER 1
#define MND_SUBSCRIBE_VER_NUMBER 1
#define MND_SUBSCRIBE_RESERVE_SIZE 64
#define MND_SUBSCRIBE_RESERVE_SIZE 64
#define MND_SUBSCRIBE_REBALANCE_MS 5000
static
char
*
mndMakeSubscribeKey
(
char
*
cgroup
,
char
*
topicName
);
static
char
*
mndMakeSubscribeKey
(
char
*
cgroup
,
char
*
topicName
);
static
SSdbRaw
*
mndSubActionEncode
(
SMqSubscribeObj
*
);
static
SSdbRaw
*
mndSubActionEncode
(
SMqSubscribeObj
*
);
...
@@ -69,6 +71,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
...
@@ -69,6 +71,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
SMqCMGetSubEpReq
*
pReq
=
(
SMqCMGetSubEpReq
*
)
pMsg
->
rpcMsg
.
pCont
;
SMqCMGetSubEpReq
*
pReq
=
(
SMqCMGetSubEpReq
*
)
pMsg
->
rpcMsg
.
pCont
;
SMqCMGetSubEpRsp
rsp
;
SMqCMGetSubEpRsp
rsp
;
int64_t
consumerId
=
be64toh
(
pReq
->
consumerId
);
int64_t
consumerId
=
be64toh
(
pReq
->
consumerId
);
int64_t
currentTs
=
taosGetTimestampMs
();
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMsg
->
pMnode
,
consumerId
);
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMsg
->
pMnode
,
consumerId
);
if
(
pConsumer
==
NULL
)
{
if
(
pConsumer
==
NULL
)
{
...
@@ -79,6 +82,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
...
@@ -79,6 +82,7 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
strcpy
(
rsp
.
cgroup
,
pReq
->
cgroup
);
strcpy
(
rsp
.
cgroup
,
pReq
->
cgroup
);
rsp
.
consumerId
=
consumerId
;
rsp
.
consumerId
=
consumerId
;
rsp
.
epoch
=
pConsumer
->
epoch
;
SArray
*
pTopics
=
pConsumer
->
topics
;
SArray
*
pTopics
=
pConsumer
->
topics
;
int32_t
sz
=
taosArrayGetSize
(
pTopics
);
int32_t
sz
=
taosArrayGetSize
(
pTopics
);
rsp
.
topics
=
taosArrayInit
(
sz
,
sizeof
(
SMqSubTopicEp
));
rsp
.
topics
=
taosArrayInit
(
sz
,
sizeof
(
SMqSubTopicEp
));
...
@@ -88,21 +92,42 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
...
@@ -88,21 +92,42 @@ static int32_t mndProcessGetSubEpReq(SMnodeMsg *pMsg) {
strcpy
(
topicEp
.
topic
,
pConsumerTopic
->
name
);
strcpy
(
topicEp
.
topic
,
pConsumerTopic
->
name
);
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
pConsumer
->
cgroup
,
pConsumerTopic
->
name
);
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
pConsumer
->
cgroup
,
pConsumerTopic
->
name
);
ASSERT
(
pSub
);
bool
found
=
0
;
bool
changed
=
0
;
for
(
int32_t
j
=
0
;
j
<
taosArrayGetSize
(
pSub
->
availConsumer
);
j
++
)
{
if
(
*
(
int64_t
*
)
taosArrayGet
(
pSub
->
availConsumer
,
j
)
==
consumerId
)
{
found
=
1
;
break
;
}
}
if
(
found
==
0
)
{
taosArrayPush
(
pSub
->
availConsumer
,
&
consumerId
);
}
int32_t
assignedSz
=
taosArrayGetSize
(
pSub
->
assigned
);
int32_t
assignedSz
=
taosArrayGetSize
(
pSub
->
assigned
);
topicEp
.
vgs
=
taosArrayInit
(
assignedSz
,
sizeof
(
SMqSubVgEp
));
topicEp
.
vgs
=
taosArrayInit
(
assignedSz
,
sizeof
(
SMqSubVgEp
));
for
(
int32_t
j
=
0
;
j
<
assignedSz
;
j
++
)
{
for
(
int32_t
j
=
0
;
j
<
assignedSz
;
j
++
)
{
SMqConsumerEp
*
pCEp
=
taosArrayGet
(
pSub
->
assigned
,
j
);
SMqConsumerEp
*
pCEp
=
taosArrayGet
(
pSub
->
assigned
,
j
);
if
(
pCEp
->
consumerId
==
consumerId
)
{
if
(
pCEp
->
consumerId
==
consumerId
)
{
pCEp
->
lastConsumerHbTs
=
currentTs
;
SMqSubVgEp
vgEp
=
{
SMqSubVgEp
vgEp
=
{
.
epSet
=
pCEp
->
epSet
,
.
epSet
=
pCEp
->
epSet
,
.
vgId
=
pCEp
->
vgId
.
vgId
=
pCEp
->
vgId
};
};
taosArrayPush
(
topicEp
.
vgs
,
&
vgEp
);
taosArrayPush
(
topicEp
.
vgs
,
&
vgEp
);
changed
=
1
;
}
}
}
}
if
(
taosArrayGetSize
(
topicEp
.
vgs
)
!=
0
)
{
if
(
taosArrayGetSize
(
topicEp
.
vgs
)
!=
0
)
{
taosArrayPush
(
rsp
.
topics
,
&
topicEp
);
taosArrayPush
(
rsp
.
topics
,
&
topicEp
);
}
}
if
(
changed
||
found
)
{
SSdbRaw
*
pRaw
=
mndSubActionEncode
(
pSub
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
sdbWriteNotFree
(
pMnode
->
pSdb
,
pRaw
);
}
mndReleaseSubscribe
(
pMnode
,
pSub
);
}
}
int32_t
tlen
=
tEncodeSMqCMGetSubEpRsp
(
NULL
,
&
rsp
);
int32_t
tlen
=
tEncodeSMqCMGetSubEpRsp
(
NULL
,
&
rsp
);
void
*
buf
=
rpcMallocCont
(
tlen
);
void
*
buf
=
rpcMallocCont
(
tlen
);
...
@@ -124,9 +149,9 @@ static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
...
@@ -124,9 +149,9 @@ static int32_t mndSplitSubscribeKey(char *key, char **topic, char **cgroup) {
i
++
;
i
++
;
}
}
key
[
i
]
=
0
;
key
[
i
]
=
0
;
*
topic
=
strdup
(
key
);
*
cgroup
=
strdup
(
key
);
key
[
i
]
=
':'
;
key
[
i
]
=
':'
;
*
cgroup
=
strdup
(
&
key
[
i
+
1
]);
*
topic
=
strdup
(
&
key
[
i
+
1
]);
return
0
;
return
0
;
}
}
...
@@ -135,9 +160,40 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
...
@@ -135,9 +160,40 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SSdb
*
pSdb
=
pMnode
->
pSdb
;
SMqSubscribeObj
*
pSub
=
NULL
;
SMqSubscribeObj
*
pSub
=
NULL
;
void
*
pIter
=
sdbFetch
(
pSdb
,
SDB_SUBSCRIBE
,
NULL
,
(
void
**
)
&
pSub
);
void
*
pIter
=
sdbFetch
(
pSdb
,
SDB_SUBSCRIBE
,
NULL
,
(
void
**
)
&
pSub
);
int
sz
;
int64_t
currentTs
=
taosGetTimestampMs
();
int32_t
sz
;
while
(
pIter
!=
NULL
)
{
while
(
pIter
!=
NULL
)
{
if
((
sz
=
taosArrayGetSize
(
pSub
->
unassignedVg
))
>
0
)
{
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
pSub
->
assigned
);
i
++
)
{
SMqConsumerEp
*
pCEp
=
taosArrayGet
(
pSub
->
assigned
,
i
);
int64_t
consumerId
=
pCEp
->
consumerId
;
if
(
pCEp
->
lastConsumerHbTs
!=
-
1
&&
currentTs
-
pCEp
->
lastConsumerHbTs
>
MND_SUBSCRIBE_REBALANCE_MS
)
{
// put consumer into lostConsumer
taosArrayPush
(
pSub
->
lostConsumer
,
pCEp
);
// put vg into unassgined
taosArrayPush
(
pSub
->
unassignedVg
,
pCEp
);
// remove from assigned
// TODO: swap with last one, reduce size and reset i
taosArrayRemove
(
pSub
->
assigned
,
i
);
// remove from available consumer
for
(
int
j
=
0
;
j
<
taosArrayGetSize
(
pSub
->
availConsumer
);
j
++
)
{
if
(
*
(
int64_t
*
)
taosArrayGet
(
pSub
->
availConsumer
,
i
)
==
pCEp
->
consumerId
)
{
taosArrayRemove
(
pSub
->
availConsumer
,
j
);
break
;
}
// TODO: acquire consumer, set status to unavail
}
#if 0
SMqConsumerObj* pConsumer = mndAcquireConsumer(pMnode, consumerId);
pConsumer->epoch++;
printf("current epoch %ld size %ld", pConsumer->epoch, pConsumer->topics->size);
SSdbRaw* pRaw = mndConsumerActionEncode(pConsumer);
sdbSetRawStatus(pRaw, SDB_STATUS_READY);
sdbWriteNotFree(pMnode->pSdb, pRaw);
mndReleaseConsumer(pMnode, pConsumer);
#endif
}
}
if
((
sz
=
taosArrayGetSize
(
pSub
->
unassignedVg
))
>
0
&&
taosArrayGetSize
(
pSub
->
availConsumer
)
>
0
)
{
char
*
topic
=
NULL
;
char
*
topic
=
NULL
;
char
*
cgroup
=
NULL
;
char
*
cgroup
=
NULL
;
mndSplitSubscribeKey
(
pSub
->
key
,
&
topic
,
&
cgroup
);
mndSplitSubscribeKey
(
pSub
->
key
,
&
topic
,
&
cgroup
);
...
@@ -146,58 +202,66 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
...
@@ -146,58 +202,66 @@ static int32_t mndProcessMqTimerMsg(SMnodeMsg *pMsg) {
// create trans
// create trans
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
&
pMsg
->
rpcMsg
);
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_RETRY
,
&
pMsg
->
rpcMsg
);
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
for
(
int
32_t
i
=
0
;
i
<
sz
;
i
++
)
{
int64_t
consumerId
=
*
(
int64_t
*
)
taosArrayGet
(
pSub
->
availConsumer
,
pSub
->
nextConsumerIdx
);
int64_t
consumerId
=
*
(
int64_t
*
)
taosArrayGet
(
pSub
->
availConsumer
,
pSub
->
nextConsumerIdx
);
SMqConsumerEp
*
pCEp
=
taosArrayPop
(
pSub
->
unassignedVg
);
SMqConsumerEp
*
pCEp
=
taosArrayPop
(
pSub
->
unassignedVg
);
pCEp
->
consumerId
=
consumerId
;
pCEp
->
consumerId
=
consumerId
;
taosArrayPush
(
pSub
->
assigned
,
pCEp
);
taosArrayPush
(
pSub
->
assigned
,
pCEp
);
pSub
->
nextConsumerIdx
=
(
pSub
->
nextConsumerIdx
+
1
)
%
taosArrayGetSize
(
pSub
->
availConsumer
);
pSub
->
nextConsumerIdx
=
(
pSub
->
nextConsumerIdx
+
1
)
%
taosArrayGetSize
(
pSub
->
availConsumer
);
SMqConsumerObj
*
pConsumer
=
mndAcquireConsumer
(
pMnode
,
consumerId
);
pConsumer
->
epoch
++
;
/*SSdbRaw* pConsumerRaw = mndConsumerActionEncode(pConsumer);*/
/*sdbSetRawStatus(pConsumerRaw, SDB_STATUS_READY);*/
/*sdbWriteNotFree(pMnode->pSdb, pConsumerRaw);*/
mndReleaseConsumer
(
pMnode
,
pConsumer
);
// build msg
// build msg
SMqSetCVgReq
*
pReq
=
malloc
(
sizeof
(
SMqSetCVgReq
));
SMqSetCVgReq
req
=
{
0
};
if
(
pReq
==
NULL
)
{
strcpy
(
req
.
cgroup
,
cgroup
);
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
strcpy
(
req
.
topicName
,
topic
);
return
-
1
;
req
.
sql
=
pTopic
->
sql
;
}
req
.
logicalPlan
=
pTopic
->
logicalPlan
;
strcpy
(
pReq
->
cgroup
,
cgroup
);
req
.
physicalPlan
=
pTopic
->
physicalPlan
;
strcpy
(
pReq
->
topicName
,
topic
);
req
.
qmsg
=
pCEp
->
qmsg
;
pReq
->
sql
=
strdup
(
pTopic
->
sql
);
req
.
newConsumerId
=
consumerId
;
pReq
->
logicalPlan
=
strdup
(
pTopic
->
logicalPlan
);
int32_t
tlen
=
tEncodeSMqSetCVgReq
(
NULL
,
&
req
);
pReq
->
physicalPlan
=
strdup
(
pTopic
->
physicalPlan
);
void
*
buf
=
malloc
(
sizeof
(
SMsgHead
)
+
tlen
);
pReq
->
qmsg
=
strdup
(
pCEp
->
qmsg
);
if
(
buf
==
NULL
)
{
int32_t
tlen
=
tEncodeSMqSetCVgReq
(
NULL
,
pReq
);
void
*
reqStr
=
malloc
(
tlen
);
if
(
reqStr
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
return
-
1
;
}
}
void
*
abuf
=
reqStr
;
SMsgHead
*
pMsgHead
=
(
SMsgHead
*
)
buf
;
tEncodeSMqSetCVgReq
(
&
abuf
,
pReq
);
pMsgHead
->
contLen
=
htonl
(
sizeof
(
SMsgHead
)
+
tlen
);
pMsgHead
->
vgId
=
htonl
(
pCEp
->
vgId
);
void
*
abuf
=
POINTER_SHIFT
(
buf
,
sizeof
(
SMsgHead
));
tEncodeSMqSetCVgReq
(
&
abuf
,
&
req
);
// persist msg
// persist msg
STransAction
action
=
{
0
};
STransAction
action
=
{
0
};
action
.
epSet
=
pCEp
->
epSet
;
action
.
epSet
=
pCEp
->
epSet
;
action
.
pCont
=
reqStr
;
action
.
pCont
=
buf
;
action
.
contLen
=
tlen
;
action
.
contLen
=
sizeof
(
SMsgHead
)
+
tlen
;
action
.
msgType
=
TDMT_VND_MQ_SET_CONN
;
action
.
msgType
=
TDMT_VND_MQ_SET_CONN
;
mndTransAppendRedoAction
(
pTrans
,
&
action
);
mndTransAppendRedoAction
(
pTrans
,
&
action
);
// persist raw
// persist raw
SSdbRaw
*
pRaw
=
mndSubActionEncode
(
pSub
);
SSdbRaw
*
pRaw
=
mndSubActionEncode
(
pSub
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mndTransAppendRedolog
(
pTrans
,
pRaw
);
mndTransAppendRedolog
(
pTrans
,
pRaw
);
free
(
pReq
);
tfree
(
topic
);
tfree
(
topic
);
tfree
(
cgroup
);
tfree
(
cgroup
);
}
}
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
if
(
mndTransPrepare
(
pMnode
,
pTrans
)
!=
0
)
{
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
mError
(
"trans:%d, failed to prepare since %s"
,
pTrans
->
id
,
terrstr
());
}
}
/*mndReleaseTopic(pMnode, pTopic);*/
mndReleaseTopic
(
pMnode
,
pTopic
);
mndTransDrop
(
pTrans
);
mndTransDrop
(
pTrans
);
}
}
pIter
=
sdbFetch
(
pSdb
,
SDB_SUBSCRIBE
,
NULL
,
(
void
**
)
&
pSub
);
pIter
=
sdbFetch
(
pSdb
,
SDB_SUBSCRIBE
,
pIter
,
(
void
**
)
&
pSub
);
}
}
return
0
;
return
0
;
}
}
...
@@ -435,10 +499,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
...
@@ -435,10 +499,12 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
return
-
1
;
}
}
pConsumer
->
epoch
=
1
;
pConsumer
->
consumerId
=
consumerId
;
pConsumer
->
consumerId
=
consumerId
;
strcpy
(
pConsumer
->
cgroup
,
consumerGroup
);
strcpy
(
pConsumer
->
cgroup
,
consumerGroup
);
taosInitRWLatch
(
&
pConsumer
->
lock
);
taosInitRWLatch
(
&
pConsumer
->
lock
);
}
else
{
}
else
{
pConsumer
->
epoch
++
;
oldSub
=
pConsumer
->
topics
;
oldSub
=
pConsumer
->
topics
;
}
}
pConsumer
->
topics
=
taosArrayInit
(
newTopicNum
,
sizeof
(
SMqConsumerTopic
));
pConsumer
->
topics
=
taosArrayInit
(
newTopicNum
,
sizeof
(
SMqConsumerTopic
));
...
@@ -542,6 +608,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
...
@@ -542,6 +608,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
}
}
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
consumerGroup
,
newTopicName
);
SMqSubscribeObj
*
pSub
=
mndAcquireSubscribe
(
pMnode
,
consumerGroup
,
newTopicName
);
bool
create
=
false
;
if
(
pSub
==
NULL
)
{
if
(
pSub
==
NULL
)
{
mDebug
(
"create new subscription, group: %s, topic %s"
,
consumerGroup
,
newTopicName
);
mDebug
(
"create new subscription, group: %s, topic %s"
,
consumerGroup
,
newTopicName
);
pSub
=
tNewSubscribeObj
();
pSub
=
tNewSubscribeObj
();
...
@@ -550,10 +617,16 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
...
@@ -550,10 +617,16 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
return
-
1
;
return
-
1
;
}
}
char
*
key
=
mndMakeSubscribeKey
(
consumerGroup
,
newTopicName
);
char
*
key
=
mndMakeSubscribeKey
(
consumerGroup
,
newTopicName
);
if
(
key
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
return
-
1
;
}
strcpy
(
pSub
->
key
,
key
);
strcpy
(
pSub
->
key
,
key
);
free
(
key
);
// set unassigned vg
// set unassigned vg
mndInitUnassignedVg
(
pMnode
,
pTopic
,
pSub
->
unassignedVg
);
mndInitUnassignedVg
(
pMnode
,
pTopic
,
pSub
->
unassignedVg
);
// TODO: disable alter
// TODO: disable alter
create
=
true
;
}
}
taosArrayPush
(
pSub
->
availConsumer
,
&
consumerId
);
taosArrayPush
(
pSub
->
availConsumer
,
&
consumerId
);
...
@@ -576,6 +649,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
...
@@ -576,6 +649,7 @@ static int32_t mndProcessSubscribeReq(SMnodeMsg *pMsg) {
SSdbRaw
*
pRaw
=
mndSubActionEncode
(
pSub
);
SSdbRaw
*
pRaw
=
mndSubActionEncode
(
pSub
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
sdbSetRawStatus
(
pRaw
,
SDB_STATUS_READY
);
mndTransAppendRedolog
(
pTrans
,
pRaw
);
mndTransAppendRedolog
(
pTrans
,
pRaw
);
if
(
!
create
)
mndReleaseSubscribe
(
pMnode
,
pSub
);
#if 0
#if 0
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
SMqCGroup *pGroup = taosHashGet(pTopic->cgroups, consumerGroup, cgroupLen);
if (pGroup == NULL) {
if (pGroup == NULL) {
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
36112eb4
...
@@ -71,6 +71,7 @@ typedef struct {
...
@@ -71,6 +71,7 @@ typedef struct {
typedef
struct
STqReadHandle
{
typedef
struct
STqReadHandle
{
int64_t
ver
;
int64_t
ver
;
uint64_t
tbUid
;
uint64_t
tbUid
;
SHashObj
*
tbIdHash
;
SSubmitMsg
*
pMsg
;
SSubmitMsg
*
pMsg
;
SSubmitBlk
*
pBlock
;
SSubmitBlk
*
pBlock
;
SSubmitMsgIter
msgIter
;
SSubmitMsgIter
msgIter
;
...
@@ -211,6 +212,19 @@ static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, uint64_t t
...
@@ -211,6 +212,19 @@ static FORCE_INLINE void tqReadHandleSetTbUid(STqReadHandle* pHandle, uint64_t t
pHandle
->
tbUid
=
tbUid
;
pHandle
->
tbUid
=
tbUid
;
}
}
static
FORCE_INLINE
int
tqReadHandleSetTbUidList
(
STqReadHandle
*
pHandle
,
SArray
*
tbUidList
)
{
pHandle
->
tbIdHash
=
taosHashInit
(
64
,
taosGetDefaultHashFunction
(
TSDB_DATA_TYPE_UBIGINT
),
true
,
HASH_NO_LOCK
);
if
(
pHandle
->
tbIdHash
==
NULL
)
{
return
-
1
;
}
for
(
int
i
=
0
;
i
<
taosArrayGetSize
(
tbUidList
);
i
++
)
{
int64_t
*
pKey
=
(
int64_t
*
)
taosArrayGet
(
tbUidList
,
i
);
taosHashPut
(
pHandle
->
tbIdHash
,
pKey
,
sizeof
(
int64_t
),
NULL
,
0
);
//pHandle->tbUid = tbUid;
}
return
0
;
}
void
tqReadHandleSetMsg
(
STqReadHandle
*
pHandle
,
SSubmitMsg
*
pMsg
,
int64_t
ver
);
void
tqReadHandleSetMsg
(
STqReadHandle
*
pHandle
,
SSubmitMsg
*
pMsg
,
int64_t
ver
);
bool
tqNextDataBlock
(
STqReadHandle
*
pHandle
);
bool
tqNextDataBlock
(
STqReadHandle
*
pHandle
);
int
tqRetrieveDataBlockInfo
(
STqReadHandle
*
pHandle
,
SDataBlockInfo
*
pBlockInfo
);
int
tqRetrieveDataBlockInfo
(
STqReadHandle
*
pHandle
,
SDataBlockInfo
*
pBlockInfo
);
...
...
source/dnode/vnode/src/tq/tq.c
浏览文件 @
36112eb4
...
@@ -670,18 +670,22 @@ int tqItemSSize() {
...
@@ -670,18 +670,22 @@ int tqItemSSize() {
int32_t
tqProcessConsumeReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
int32_t
tqProcessConsumeReq
(
STQ
*
pTq
,
SRpcMsg
*
pMsg
)
{
SMqConsumeReq
*
pReq
=
pMsg
->
pCont
;
SMqConsumeReq
*
pReq
=
pMsg
->
pCont
;
SRpcMsg
rpcMsg
;
int64_t
reqId
=
pReq
->
reqId
;
int64_t
reqId
=
pReq
->
reqId
;
int64_t
consumerId
=
pReq
->
consumerId
;
int64_t
consumerId
=
pReq
->
consumerId
;
int64_t
reqOffset
=
pReq
->
offset
;
int64_t
fetchOffset
=
pReq
->
offset
;
int64_t
fetchOffset
=
reqOffset
;
int64_t
blockingTime
=
pReq
->
blockingTime
;
int64_t
blockingTime
=
pReq
->
blockingTime
;
int
rspLen
=
0
;
int
rspLen
=
0
;
SMqConsumeRsp
rsp
=
{.
consumerId
=
consumerId
,
.
numOfTopics
=
1
,
.
pBlockData
=
NULL
};
SMqConsumeRsp
rsp
=
{.
consumerId
=
consumerId
,
.
numOfTopics
=
0
,
.
pBlockData
=
NULL
};
STqConsumerHandle
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
consumerId
);
STqConsumerHandle
*
pConsumer
=
tqHandleGet
(
pTq
->
tqMeta
,
consumerId
);
ASSERT
(
pConsumer
);
if
(
pConsumer
==
NULL
)
{
pMsg
->
pCont
=
NULL
;
pMsg
->
contLen
=
0
;
pMsg
->
code
=
-
1
;
rpcSendResponse
(
pMsg
);
return
0
;
}
int
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
int
sz
=
taosArrayGetSize
(
pConsumer
->
topics
);
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
...
@@ -690,6 +694,9 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
...
@@ -690,6 +694,9 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
if
(
strcmp
(
pTopic
->
topicName
,
pReq
->
topic
)
!=
0
)
{
if
(
strcmp
(
pTopic
->
topicName
,
pReq
->
topic
)
!=
0
)
{
continue
;
continue
;
}
}
rsp
.
committedOffset
=
pTopic
->
committedOffset
;
rsp
.
reqOffset
=
pReq
->
offset
;
rsp
.
skipLogNum
=
0
;
if
(
fetchOffset
==
-
1
)
{
if
(
fetchOffset
==
-
1
)
{
fetchOffset
=
pTopic
->
committedOffset
+
1
;
fetchOffset
=
pTopic
->
committedOffset
+
1
;
...
@@ -715,6 +722,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
...
@@ -715,6 +722,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
if
(
pHead
->
head
.
msgType
==
TDMT_VND_SUBMIT
)
{
if
(
pHead
->
head
.
msgType
==
TDMT_VND_SUBMIT
)
{
break
;
break
;
}
}
rsp
.
skipLogNum
++
;
if
(
walReadWithHandle
(
pTopic
->
pReadhandle
,
fetchOffset
)
<
0
)
{
if
(
walReadWithHandle
(
pTopic
->
pReadhandle
,
fetchOffset
)
<
0
)
{
atomic_store_8
(
&
pTopic
->
buffer
.
output
[
pos
].
status
,
0
);
atomic_store_8
(
&
pTopic
->
buffer
.
output
[
pos
].
status
,
0
);
skip
=
1
;
skip
=
1
;
...
@@ -745,6 +753,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
...
@@ -745,6 +753,7 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
}
}
//TODO copy
//TODO copy
rsp
.
schemas
=
pTopic
->
buffer
.
output
[
pos
].
pReadHandle
->
pSchemaWrapper
;
rsp
.
schemas
=
pTopic
->
buffer
.
output
[
pos
].
pReadHandle
->
pSchemaWrapper
;
rsp
.
rspOffset
=
fetchOffset
;
atomic_store_8
(
&
pTopic
->
buffer
.
output
[
pos
].
status
,
0
);
atomic_store_8
(
&
pTopic
->
buffer
.
output
[
pos
].
status
,
0
);
...
@@ -752,6 +761,8 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
...
@@ -752,6 +761,8 @@ int32_t tqProcessConsumeReq(STQ* pTq, SRpcMsg* pMsg) {
taosArrayDestroy
(
pRes
);
taosArrayDestroy
(
pRes
);
fetchOffset
++
;
fetchOffset
++
;
continue
;
continue
;
}
else
{
rsp
.
numOfTopics
++
;
}
}
rsp
.
pBlockData
=
pRes
;
rsp
.
pBlockData
=
pRes
;
...
@@ -862,6 +873,11 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
...
@@ -862,6 +873,11 @@ bool tqNextDataBlock(STqReadHandle* pHandle) {
pHandle
->
pBlock
->
schemaLen
=
htonl
(
pHandle
->
pBlock
->
schemaLen
);
pHandle
->
pBlock
->
schemaLen
=
htonl
(
pHandle
->
pBlock
->
schemaLen
);
pHandle
->
pBlock
->
numOfRows
=
htons
(
pHandle
->
pBlock
->
numOfRows
);
pHandle
->
pBlock
->
numOfRows
=
htons
(
pHandle
->
pBlock
->
numOfRows
);
return
true
;
return
true
;
}
else
if
(
pHandle
->
tbIdHash
!=
NULL
)
{
void
*
ret
=
taosHashGet
(
pHandle
->
tbIdHash
,
&
pHandle
->
pBlock
->
uid
,
sizeof
(
int64_t
));
if
(
ret
!=
NULL
)
{
return
true
;
}
}
}
}
}
return
false
;
return
false
;
...
@@ -931,6 +947,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
...
@@ -931,6 +947,7 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
SMemRow
row
;
SMemRow
row
;
int32_t
kvIdx
=
0
;
int32_t
kvIdx
=
0
;
int32_t
curRow
=
0
;
tInitSubmitBlkIter
(
pHandle
->
pBlock
,
&
pHandle
->
blkIter
);
tInitSubmitBlkIter
(
pHandle
->
pBlock
,
&
pHandle
->
blkIter
);
while
((
row
=
tGetSubmitBlkNext
(
&
pHandle
->
blkIter
))
!=
NULL
)
{
while
((
row
=
tGetSubmitBlkNext
(
&
pHandle
->
blkIter
))
!=
NULL
)
{
// get all wanted col of that block
// get all wanted col of that block
...
@@ -940,8 +957,9 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
...
@@ -940,8 +957,9 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
// TODO
// TODO
ASSERT
(
pCol
->
colId
==
pColData
->
info
.
colId
);
ASSERT
(
pCol
->
colId
==
pColData
->
info
.
colId
);
void
*
val
=
tdGetMemRowDataOfColEx
(
row
,
pCol
->
colId
,
pCol
->
type
,
TD_DATA_ROW_HEAD_SIZE
+
pCol
->
offset
,
&
kvIdx
);
void
*
val
=
tdGetMemRowDataOfColEx
(
row
,
pCol
->
colId
,
pCol
->
type
,
TD_DATA_ROW_HEAD_SIZE
+
pCol
->
offset
,
&
kvIdx
);
memcpy
(
pColData
->
pData
,
val
,
pCol
->
bytes
);
memcpy
(
POINTER_SHIFT
(
pColData
->
pData
,
curRow
*
pCol
->
bytes
)
,
val
,
pCol
->
bytes
);
}
}
curRow
++
;
}
}
return
pArray
;
return
pArray
;
}
}
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录