Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
adf8bd5e
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
adf8bd5e
编写于
4月 01, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
add log
上级
61d8ef38
变更
3
隐藏空白更改
内联
并排
Showing
3 changed file
with
16 addition
and
26 deletion
+16
-26
source/client/src/tmq.c
source/client/src/tmq.c
+11
-25
source/dnode/mnode/impl/src/mndSubscribe.c
source/dnode/mnode/impl/src/mndSubscribe.c
+1
-1
source/dnode/mnode/sdb/src/sdbHash.c
source/dnode/mnode/sdb/src/sdbHash.c
+4
-0
未找到文件。
source/client/src/tmq.c
浏览文件 @
adf8bd5e
...
...
@@ -911,7 +911,7 @@ CREATE_MSG_FAIL:
bool
tmqUpdateEp
(
tmq_t
*
tmq
,
int32_t
epoch
,
SMqCMGetSubEpRsp
*
pRsp
)
{
/*printf("call update ep %d\n", epoch);*/
/*printf("tmq update ep epoch %d to epoch %d\n", tmq->epoch, epoch);*/
tscDebug
(
"tmq update ep epoch %d to epoch %d"
,
tmq
->
epoch
,
epoch
);
bool
set
=
false
;
int32_t
topicNumGet
=
taosArrayGetSize
(
pRsp
->
topics
);
char
vgKey
[
TSDB_TOPIC_FNAME_LEN
+
22
];
...
...
@@ -984,6 +984,7 @@ bool tmqUpdateEp(tmq_t* tmq, int32_t epoch, SMqCMGetSubEpRsp* pRsp) {
int32_t
tmqAskEpCb
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
SMqAskEpCbParam
*
pParam
=
(
SMqAskEpCbParam
*
)
param
;
tmq_t
*
tmq
=
pParam
->
tmq
;
tscDebug
(
"consumer %ld recv ep"
,
tmq
->
consumerId
);
if
(
code
!=
0
)
{
tscError
(
"get topic endpoint error, not ready, wait:%d
\n
"
,
pParam
->
sync
);
goto
END
;
...
...
@@ -1032,12 +1033,14 @@ END:
int32_t
tmqAskEp
(
tmq_t
*
tmq
,
bool
sync
)
{
int8_t
epStatus
=
atomic_val_compare_exchange_8
(
&
tmq
->
epStatus
,
0
,
1
);
if
(
epStatus
==
1
)
{
tscDebug
(
"consumer %ld skip ask ep"
,
tmq
->
consumerId
);
return
0
;
}
int32_t
tlen
=
sizeof
(
SMqCMGetSubEpReq
);
SMqCMGetSubEpReq
*
req
=
taosMemoryMalloc
(
tlen
);
if
(
req
==
NULL
)
{
tscError
(
"failed to malloc get subscribe ep buf"
);
atomic_store_8
(
&
tmq
->
epStatus
,
0
);
return
-
1
;
}
req
->
consumerId
=
htobe64
(
tmq
->
consumerId
);
...
...
@@ -1048,6 +1051,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
if
(
pParam
==
NULL
)
{
tscError
(
"failed to malloc subscribe param"
);
taosMemoryFree
(
req
);
atomic_store_8
(
&
tmq
->
epStatus
,
0
);
return
-
1
;
}
pParam
->
tmq
=
tmq
;
...
...
@@ -1059,6 +1063,7 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
tsem_destroy
(
&
pParam
->
rspSem
);
taosMemoryFree
(
pParam
);
taosMemoryFree
(
req
);
atomic_store_8
(
&
tmq
->
epStatus
,
0
);
return
-
1
;
}
...
...
@@ -1076,6 +1081,8 @@ int32_t tmqAskEp(tmq_t* tmq, bool sync) {
SEpSet
epSet
=
getEpSet_s
(
&
tmq
->
pTscObj
->
pAppInfo
->
mgmtEp
);
tscDebug
(
"consumer %ld ask ep"
,
tmq
->
consumerId
);
int64_t
transporterId
=
0
;
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
epSet
,
&
transporterId
,
sendInfo
);
...
...
@@ -1214,7 +1221,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
j
);
int32_t
vgStatus
=
atomic_val_compare_exchange_32
(
&
pVg
->
vgStatus
,
TMQ_VG_STATUS__IDLE
,
TMQ_VG_STATUS__WAIT
);
if
(
vgStatus
!=
TMQ_VG_STATUS__IDLE
)
{
tscDebug
(
"
skip vg %d"
,
pVg
->
vgId
);
tscDebug
(
"
consumer %ld skip vg %d"
,
tmq
->
consumerId
,
pVg
->
vgId
);
continue
;
}
SMqPollReq
*
pReq
=
tmqBuildConsumeReqImpl
(
tmq
,
blockingTime
,
pTopic
,
pVg
);
...
...
@@ -1258,7 +1265,7 @@ int32_t tmqPollImpl(tmq_t* tmq, int64_t blockingTime) {
int64_t
transporterId
=
0
;
/*printf("send poll\n");*/
atomic_add_fetch_32
(
&
tmq
->
waitingRequest
,
1
);
tscDebug
(
"
tmq send poll: vg %d, req offset %ld"
,
pVg
->
vgId
,
pVg
->
currentOffset
);
tscDebug
(
"
consumer %ld send poll: vg %d, req offset %ld"
,
tmq
->
consumerId
,
pVg
->
vgId
,
pVg
->
currentOffset
);
/*printf("send vg %d %ld\n", pVg->vgId, pVg->currentOffset);*/
asyncSendMsgToServer
(
tmq
->
pTscObj
->
pAppInfo
->
pTransporter
,
&
pVg
->
epSet
,
&
transporterId
,
sendInfo
);
pVg
->
pollCnt
++
;
...
...
@@ -1322,7 +1329,7 @@ tmq_message_t* tmqHandleAllRsp(tmq_t* tmq, int64_t blockingTime, bool pollIfRese
tmqHandleNoPollRsp
(
tmq
,
rspHead
,
&
reset
);
taosFreeQitem
(
rspHead
);
if
(
pollIfReset
&&
reset
)
{
tscDebug
(
"
reset and repoll
\n
"
);
tscDebug
(
"
consumer %ld reset and repoll"
,
tmq
->
consumerId
);
tmqPollImpl
(
tmq
,
blockingTime
);
}
}
...
...
@@ -1561,24 +1568,3 @@ TAOS_ROW tmq_get_row(tmq_message_t* message) {
}
char
*
tmq_get_topic_name
(
tmq_message_t
*
message
)
{
return
"not implemented yet"
;
}
#if 0
tmq_t* tmqCreateConsumerImpl(TAOS* conn, tmq_conf_t* conf) {
tmq_t* pTmq = taosMemoryMalloc(sizeof(tmq_t));
if (pTmq == NULL) {
return NULL;
}
strcpy(pTmq->groupId, conf->groupId);
strcpy(pTmq->clientId, conf->clientId);
pTmq->pTscObj = (STscObj*)conn;
pTmq->pTscObj->connType = HEARTBEAT_TYPE_MQ;
return pTmq;
}
static void destroySendMsgInfo(SMsgSendInfo* pMsgBody) {
assert(pMsgBody != NULL);
taosMemoryFreeClear(pMsgBody->msgInfo.pData);
taosMemoryFreeClear(pMsgBody);
}
#endif
source/dnode/mnode/impl/src/mndSubscribe.c
浏览文件 @
adf8bd5e
...
...
@@ -507,7 +507,7 @@ static int32_t mndProcessDoRebalanceMsg(SNodeMsg *pMsg) {
// TODO: log rebalance statistics
SSdbRaw
*
pSubRaw
=
mndSubActionEncode
(
pSub
);
sdbSetRawStatus
(
pSubRaw
,
SDB_STATUS_
UPDATING
);
sdbSetRawStatus
(
pSubRaw
,
SDB_STATUS_
READY
);
mndTransAppendRedolog
(
pTrans
,
pSubRaw
);
}
mndReleaseSubscribe
(
pMnode
,
pSub
);
...
...
source/dnode/mnode/sdb/src/sdbHash.c
浏览文件 @
adf8bd5e
...
...
@@ -40,6 +40,10 @@ const char *sdbTableName(ESdbType type) {
return
"auth"
;
case
SDB_ACCT
:
return
"acct"
;
case
SDB_STREAM
:
return
"stream"
;
case
SDB_OFFSET
:
return
"offset"
;
case
SDB_SUBSCRIBE
:
return
"subscribe"
;
case
SDB_CONSUMER
:
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录