Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
660a600d
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22017
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
660a600d
编写于
1月 28, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor
上级
fd44bdbf
变更
1
隐藏空白更改
内联
并排
Showing
1 changed file
with
130 addition
and
100 deletion
+130
-100
source/client/src/clientImpl.c
source/client/src/clientImpl.c
+130
-100
未找到文件。
source/client/src/clientImpl.c
浏览文件 @
660a600d
...
...
@@ -21,6 +21,93 @@
} \
} while (0)
typedef
struct
SMqClientVg
{
// statistics
int64_t
pollCnt
;
// offset
int64_t
committedOffset
;
int64_t
currentOffset
;
//connection info
int32_t
vgId
;
SEpSet
epSet
;
}
SMqClientVg
;
typedef
struct
SMqClientTopic
{
// subscribe info
int32_t
sqlLen
;
char
*
sql
;
char
*
topicName
;
int64_t
topicId
;
int32_t
nextVgIdx
;
SArray
*
vgs
;
//SArray<SMqClientVg>
}
SMqClientTopic
;
struct
tmq_resp_err_t
{
int32_t
code
;
};
struct
tmq_topic_vgroup_t
{
char
*
topic
;
int32_t
vgId
;
int64_t
commitOffset
;
};
struct
tmq_topic_vgroup_list_t
{
int32_t
cnt
;
int32_t
size
;
tmq_topic_vgroup_t
*
elems
;
};
typedef
struct
SMqConsumeCbParam
{
tmq_t
*
tmq
;
SMqClientVg
*
pVg
;
tmq_message_t
**
retMsg
;
}
SMqConsumeCbParam
;
struct
tmq_conf_t
{
char
clientId
[
256
];
char
groupId
[
256
];
char
*
ip
;
uint16_t
port
;
tmq_commit_cb
*
commit_cb
;
};
struct
tmq_message_t
{
SMqConsumeRsp
rsp
;
};
tmq_conf_t
*
tmq_conf_new
()
{
tmq_conf_t
*
conf
=
calloc
(
1
,
sizeof
(
tmq_conf_t
));
return
conf
;
}
int32_t
tmq_conf_set
(
tmq_conf_t
*
conf
,
const
char
*
key
,
const
char
*
value
)
{
if
(
strcmp
(
key
,
"group.id"
)
==
0
)
{
strcpy
(
conf
->
groupId
,
value
);
}
if
(
strcmp
(
key
,
"client.id"
)
==
0
)
{
strcpy
(
conf
->
clientId
,
value
);
}
return
0
;
}
struct
tmq_t
{
char
groupId
[
256
];
char
clientId
[
256
];
SRWLatch
lock
;
int64_t
consumerId
;
int64_t
epoch
;
int64_t
status
;
tsem_t
rspSem
;
STscObj
*
pTscObj
;
tmq_commit_cb
*
commit_cb
;
int32_t
nextTopicIdx
;
SArray
*
clientTopics
;
//SArray<SMqClientTopic>
//stat
int64_t
pollCnt
;
};
static
int32_t
initEpSetFromCfg
(
const
char
*
firstEp
,
const
char
*
secondEp
,
SCorEpSet
*
pEpSet
);
static
SMsgSendInfo
*
buildConnectMsg
(
SRequestObj
*
pRequest
);
static
void
destroySendMsgInfo
(
SMsgSendInfo
*
pMsgBody
);
...
...
@@ -259,81 +346,6 @@ int32_t scheduleQuery(SRequestObj* pRequest, SQueryDag* pDag, SArray* pNodeList)
}
typedef
struct
SMqClientVg
{
// statistics
int64_t
pollCnt
;
// offset
int64_t
committedOffset
;
int64_t
currentOffset
;
//connection info
int32_t
vgId
;
SEpSet
epSet
;
}
SMqClientVg
;
typedef
struct
SMqClientTopic
{
// subscribe info
int32_t
sqlLen
;
char
*
sql
;
char
*
topicName
;
int64_t
topicId
;
int32_t
nextVgIdx
;
SArray
*
vgs
;
//SArray<SMqClientVg>
}
SMqClientTopic
;
struct
tmq_resp_err_t
{
int32_t
code
;
};
struct
tmq_topic_vgroup_t
{
char
*
topic
;
int32_t
vgId
;
int64_t
commitOffset
;
};
struct
tmq_topic_vgroup_list_t
{
int32_t
cnt
;
int32_t
size
;
tmq_topic_vgroup_t
*
elems
;
};
struct
tmq_conf_t
{
char
clientId
[
256
];
char
groupId
[
256
];
char
*
ip
;
uint16_t
port
;
tmq_commit_cb
*
commit_cb
;
};
tmq_conf_t
*
tmq_conf_new
()
{
tmq_conf_t
*
conf
=
calloc
(
1
,
sizeof
(
tmq_conf_t
));
return
conf
;
}
int32_t
tmq_conf_set
(
tmq_conf_t
*
conf
,
const
char
*
key
,
const
char
*
value
)
{
if
(
strcmp
(
key
,
"group.id"
)
==
0
)
{
strcpy
(
conf
->
groupId
,
value
);
}
if
(
strcmp
(
key
,
"client.id"
)
==
0
)
{
strcpy
(
conf
->
clientId
,
value
);
}
return
0
;
}
struct
tmq_t
{
char
groupId
[
256
];
char
clientId
[
256
];
SRWLatch
lock
;
int64_t
consumerId
;
int64_t
epoch
;
int64_t
status
;
tsem_t
rspSem
;
STscObj
*
pTscObj
;
tmq_commit_cb
*
commit_cb
;
int32_t
nextTopicIdx
;
SArray
*
clientTopics
;
//SArray<SMqClientTopic>
//stat
int64_t
pollCnt
;
};
tmq_t
*
taos_consumer_new
(
void
*
conn
,
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
)
{
tmq_t
*
pTmq
=
calloc
(
sizeof
(
tmq_t
),
1
);
...
...
@@ -580,7 +592,7 @@ TAOS_RES *taos_create_topic(TAOS* taos, const char* topicName, const char* sql,
int
tlen
=
tSerializeSCMCreateTopicReq
(
NULL
,
&
req
);
void
*
buf
=
malloc
(
tlen
);
if
(
buf
==
NULL
)
{
if
(
buf
==
NULL
)
{
goto
_return
;
}
...
...
@@ -614,10 +626,6 @@ _return:
/*typedef SMqConsumeRsp tmq_message_t;*/
struct
tmq_message_t
{
SMqConsumeRsp
rsp
;
};
int32_t
tmq_poll_cb_inner
(
void
*
param
,
const
SDataBuf
*
pMsg
,
int32_t
code
)
{
if
(
code
==
-
1
)
{
printf
(
"discard
\n
"
);
...
...
@@ -766,47 +774,69 @@ END:
return
0
;
}
SMqConsumeReq
*
tmqBuildConsumeReqImpl
(
tmq_t
*
tmq
,
int64_t
blocking_time
,
int32_t
type
,
SMqClientTopic
*
pTopic
,
SMqClientVg
**
ppVg
)
{
SMqConsumeReq
*
pReq
=
malloc
(
sizeof
(
SMqConsumeReq
));
if
(
pReq
==
NULL
)
{
return
NULL
;
}
pReq
->
reqType
=
type
;
pReq
->
blockingTime
=
blocking_time
;
pReq
->
consumerId
=
tmq
->
consumerId
;
strcpy
(
pReq
->
cgroup
,
tmq
->
groupId
);
tmq
->
nextTopicIdx
=
(
tmq
->
nextTopicIdx
+
1
)
%
taosArrayGetSize
(
tmq
->
clientTopics
);
strcpy
(
pReq
->
topic
,
pTopic
->
topicName
);
pTopic
->
nextVgIdx
=
(
pTopic
->
nextVgIdx
+
1
%
taosArrayGetSize
(
pTopic
->
vgs
));
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
pTopic
->
nextVgIdx
);
pReq
->
offset
=
pVg
->
currentOffset
+
1
;
pReq
->
head
.
vgId
=
htonl
(
pVg
->
vgId
);
pReq
->
head
.
contLen
=
htonl
(
sizeof
(
SMqConsumeReq
));
return
pReq
;
}
tmq_message_t
*
tmq_consumer_poll
(
tmq_t
*
tmq
,
int64_t
blocking_time
)
{
tmq_message_t
*
tmq_message
=
NULL
;
int64_t
status
=
atomic_load_64
(
&
tmq
->
status
);
tmqAsyncAskEp
(
tmq
,
status
==
0
||
taosArrayGetSize
(
tmq
->
clientTopics
));
if
(
blocking_time
<
0
)
blocking_time
=
500
;
/*if (blocking_time < 0) blocking_time = 500;*/
blocking_time
=
1000
;
if
(
taosArrayGetSize
(
tmq
->
clientTopics
)
==
0
)
{
tscDebug
(
"consumer:%ld poll but not assigned"
,
tmq
->
consumerId
);
usleep
(
blocking_time
*
1000
);
return
NULL
;
}
SMqConsumeReq
*
pReq
=
malloc
(
sizeof
(
SMqConsumeReq
));
pReq
->
reqType
=
1
;
pReq
->
blockingTime
=
blocking_time
;
pReq
->
consumerId
=
tmq
->
consumerId
;
tmq_message_t
*
tmq_message
=
NULL
;
strcpy
(
pReq
->
cgroup
,
tmq
->
groupId
);
SMqClientTopic
*
pTopic
=
taosArrayGet
(
tmq
->
clientTopics
,
tmq
->
nextTopicIdx
);
tmq
->
nextTopicIdx
=
(
tmq
->
nextTopicIdx
+
1
)
%
taosArrayGetSize
(
tmq
->
clientTopics
);
strcpy
(
pReq
->
topic
,
pTopic
->
topicName
);
int32_t
vgSz
=
taosArrayGetSize
(
pTopic
->
vgs
);
if
(
vgSz
==
0
)
{
free
(
pReq
);
if
(
taosArrayGetSize
(
pTopic
->
vgs
)
==
0
)
{
usleep
(
blocking_time
*
1000
);
return
NULL
;
}
pTopic
->
nextVgIdx
=
(
pTopic
->
nextVgIdx
+
1
%
vgSz
);
SMqClientVg
*
pVg
=
taosArrayGet
(
pTopic
->
vgs
,
pTopic
->
nextVgIdx
);
pReq
->
offset
=
pVg
->
currentOffset
+
1
;
pReq
->
head
.
vgId
=
htonl
(
pVg
->
vgId
);
pReq
->
head
.
contLen
=
htonl
(
sizeof
(
SMqConsumeReq
));
SMqClientVg
*
pVg
=
NULL
;
SMqConsumeReq
*
pReq
=
tmqBuildConsumeReqImpl
(
tmq
,
blocking_time
,
TMQ_REQ_TYPE_CONSUME_ONLY
,
pTopic
,
&
pVg
);
if
(
pReq
==
NULL
)
{
usleep
(
blocking_time
*
1000
);
return
NULL
;
}
SMqConsumeCbParam
*
param
=
malloc
(
sizeof
(
SMqConsumeCbParam
));
if
(
param
==
NULL
)
{
usleep
(
blocking_time
*
1000
);
return
NULL
;
}
param
->
tmq
=
tmq
;
param
->
retMsg
=
&
tmq_message
;
param
->
pVg
=
pVg
;
SRequestObj
*
pRequest
=
createRequest
(
tmq
->
pTscObj
,
NULL
,
NULL
,
TDMT_VND_CONSUME
);
pRequest
->
body
.
requestMsg
=
(
SDataBuf
){
.
pData
=
pReq
,
.
len
=
sizeof
(
SMqConsumeReq
)
};
SMsgSendInfo
*
sendInfo
=
buildMsgInfoImpl
(
pRequest
);
sendInfo
->
requestObjRefId
=
0
;
sendInfo
->
param
=
p
Vg
;
sendInfo
->
param
=
p
aram
;
sendInfo
->
fp
=
tmq_poll_cb_inner
;
/*printf("req offset: %ld\n", pReq->offset);*/
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录