Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
06285a25
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
06285a25
编写于
3月 26, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor tmq container
上级
71097ea5
变更
8
隐藏空白更改
内联
并排
Showing
8 changed file
with
155 addition
and
50 deletion
+155
-50
example/src/tmq.c
example/src/tmq.c
+1
-1
include/client/taos.h
include/client/taos.h
+4
-1
source/client/src/tmq.c
source/client/src/tmq.c
+99
-29
source/dnode/mnode/impl/inc/mndDef.h
source/dnode/mnode/impl/inc/mndDef.h
+13
-12
source/dnode/mnode/impl/src/mndStream.c
source/dnode/mnode/impl/src/mndStream.c
+6
-5
source/dnode/mnode/impl/src/mndTopic.c
source/dnode/mnode/impl/src/mndTopic.c
+30
-0
source/os/src/osMemory.c
source/os/src/osMemory.c
+1
-1
tests/test/c/tmqDemo.c
tests/test/c/tmqDemo.c
+1
-1
未找到文件。
example/src/tmq.c
浏览文件 @
06285a25
...
...
@@ -44,7 +44,7 @@ int32_t init_env() {
pRes
=
taos_query
(
pConn
,
"create stable if not exists st1 (ts timestamp, k int) tags(a int)"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create super table
123_$^)
, reason:%s
\n
"
,
taos_errstr
(
pRes
));
printf
(
"failed to create super table
st1
, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
}
taos_free_result
(
pRes
);
...
...
include/client/taos.h
浏览文件 @
06285a25
...
...
@@ -213,8 +213,10 @@ typedef void(tmq_commit_cb(tmq_t *, tmq_resp_err_t, tmq_topic_vgroup_list_t *, v
DLL_EXPORT
tmq_list_t
*
tmq_list_new
();
DLL_EXPORT
int32_t
tmq_list_append
(
tmq_list_t
*
,
const
char
*
);
DLL_EXPORT
void
tmq_list_destroy
(
tmq_list_t
*
);
DLL_EXPORT
tmq_t
*
tmq_consumer_new
(
void
*
conn
,
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
DLL_EXPORT
tmq_t
*
tmq_consumer_new1
(
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
);
DLL_EXPORT
void
tmq_message_destroy
(
tmq_message_t
*
tmq_message
);
DLL_EXPORT
const
char
*
tmq_err2str
(
tmq_resp_err_t
);
...
...
@@ -244,8 +246,8 @@ enum tmq_conf_res_t {
typedef
enum
tmq_conf_res_t
tmq_conf_res_t
;
DLL_EXPORT
tmq_conf_t
*
tmq_conf_new
();
DLL_EXPORT
void
tmq_conf_destroy
(
tmq_conf_t
*
conf
);
DLL_EXPORT
tmq_conf_res_t
tmq_conf_set
(
tmq_conf_t
*
conf
,
const
char
*
key
,
const
char
*
value
);
DLL_EXPORT
void
tmq_conf_destroy
(
tmq_conf_t
*
conf
);
DLL_EXPORT
void
tmq_conf_set_offset_commit_cb
(
tmq_conf_t
*
conf
,
tmq_commit_cb
*
cb
);
// temporary used function for demo only
...
...
@@ -256,6 +258,7 @@ int32_t tmqGetSkipLogNum(tmq_message_t *tmq_message);
DLL_EXPORT
TAOS_ROW
tmq_get_row
(
tmq_message_t
*
message
);
DLL_EXPORT
char
*
tmq_get_topic_name
(
tmq_message_t
*
message
);
DLL_EXPORT
char
*
tmq_get_topic_schema
(
tmq_t
*
tmq
,
const
char
*
topic
);
/* --------------------TMPORARY INTERFACE FOR TESTING--------------------- */
DLL_EXPORT
TAOS_RES
*
tmq_create_topic
(
TAOS
*
taos
,
const
char
*
name
,
const
char
*
sql
,
int
sqlLen
);
...
...
source/client/src/tmq.c
浏览文件 @
06285a25
...
...
@@ -27,9 +27,7 @@
#include "tref.h"
struct
tmq_list_t
{
int32_t
cnt
;
int32_t
tot
;
char
*
elems
[];
SArray
container
;
};
struct
tmq_topic_vgroup_t
{
...
...
@@ -45,11 +43,14 @@ struct tmq_topic_vgroup_list_t {
struct
tmq_conf_t
{
char
clientId
[
256
];
char
groupId
[
TSDB_CGROUP_LEN
];
int8_t
auto
_c
ommit
;
int8_t
auto
C
ommit
;
int8_t
resetOffset
;
uint16_t
port
;
char
*
ip
;
char
*
user
;
char
*
pass
;
char
*
db
;
tmq_commit_cb
*
commit_cb
;
/*char* ip;*/
/*uint16_t port;*/
};
struct
tmq_t
{
...
...
@@ -98,12 +99,13 @@ typedef struct {
typedef
struct
{
// subscribe info
int32_t
sqlLen
;
char
*
sql
;
char
*
topicName
;
int64_t
topicId
;
int32_t
nextVgIdx
;
SArray
*
vgs
;
// SArray<SMqClientVg>
int32_t
sqlLen
;
char
*
sql
;
char
*
topicName
;
int64_t
topicId
;
int32_t
nextVgIdx
;
SArray
*
vgs
;
// SArray<SMqClientVg>
SSchemaWrapper
schema
;
}
SMqClientTopic
;
typedef
struct
{
...
...
@@ -137,7 +139,7 @@ typedef struct {
tmq_conf_t
*
tmq_conf_new
()
{
tmq_conf_t
*
conf
=
taosMemoryCalloc
(
1
,
sizeof
(
tmq_conf_t
));
conf
->
auto
_c
ommit
=
false
;
conf
->
auto
C
ommit
=
false
;
conf
->
resetOffset
=
TMQ_CONF__RESET_OFFSET__EARLIEAST
;
return
conf
;
}
...
...
@@ -151,21 +153,24 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
strcpy
(
conf
->
groupId
,
value
);
return
TMQ_CONF_OK
;
}
if
(
strcmp
(
key
,
"client.id"
)
==
0
)
{
strcpy
(
conf
->
clientId
,
value
);
return
TMQ_CONF_OK
;
}
if
(
strcmp
(
key
,
"enable.auto.commit"
)
==
0
)
{
if
(
strcmp
(
value
,
"true"
)
==
0
)
{
conf
->
auto
_c
ommit
=
true
;
conf
->
auto
C
ommit
=
true
;
return
TMQ_CONF_OK
;
}
else
if
(
strcmp
(
value
,
"false"
)
==
0
)
{
conf
->
auto
_c
ommit
=
false
;
conf
->
auto
C
ommit
=
false
;
return
TMQ_CONF_OK
;
}
else
{
return
TMQ_CONF_INVALID
;
}
}
if
(
strcmp
(
key
,
"auto.offset.reset"
)
==
0
)
{
if
(
strcmp
(
value
,
"none"
)
==
0
)
{
conf
->
resetOffset
=
TMQ_CONF__RESET_OFFSET__NONE
;
...
...
@@ -180,26 +185,49 @@ tmq_conf_res_t tmq_conf_set(tmq_conf_t* conf, const char* key, const char* value
return
TMQ_CONF_INVALID
;
}
}
if
(
strcmp
(
key
,
"connection.ip"
)
==
0
)
{
conf
->
ip
=
strdup
(
value
);
return
TMQ_CONF_OK
;
}
if
(
strcmp
(
key
,
"connection.user"
)
==
0
)
{
conf
->
user
=
strdup
(
value
);
return
TMQ_CONF_OK
;
}
if
(
strcmp
(
key
,
"connection.pass"
)
==
0
)
{
conf
->
pass
=
strdup
(
value
);
return
TMQ_CONF_OK
;
}
if
(
strcmp
(
key
,
"connection.port"
)
==
0
)
{
conf
->
port
=
atoi
(
value
);
return
TMQ_CONF_OK
;
}
if
(
strcmp
(
key
,
"connection.db"
)
==
0
)
{
conf
->
db
=
strdup
(
value
);
return
TMQ_CONF_OK
;
}
return
TMQ_CONF_UNKNOWN
;
}
tmq_list_t
*
tmq_list_new
()
{
tmq_list_t
*
ptr
=
taosMemoryMalloc
(
sizeof
(
tmq_list_t
)
+
8
*
sizeof
(
char
*
));
if
(
ptr
==
NULL
)
{
return
ptr
;
}
ptr
->
cnt
=
0
;
ptr
->
tot
=
8
;
return
ptr
;
//
return
(
tmq_list_t
*
)
taosArrayInit
(
0
,
sizeof
(
void
*
));
}
int32_t
tmq_list_append
(
tmq_list_t
*
ptr
,
const
char
*
src
)
{
if
(
ptr
->
cnt
>=
ptr
->
tot
-
1
)
return
-
1
;
ptr
->
elems
[
ptr
->
cnt
]
=
strdup
(
src
);
ptr
->
cnt
++
;
int32_t
tmq_list_append
(
tmq_list_t
*
list
,
const
char
*
src
)
{
SArray
*
container
=
&
list
->
container
;
char
*
topic
=
strdup
(
src
);
if
(
taosArrayPush
(
container
,
topic
)
==
NULL
)
return
-
1
;
return
0
;
}
void
tmq_list_destroy
(
tmq_list_t
*
list
)
{
SArray
*
container
=
(
SArray
*
)
list
;
taosArrayDestroy
(
container
);
/*taosArrayDestroyEx(container, free);*/
}
void
tmqClearUnhandleMsg
(
tmq_t
*
tmq
)
{
tmq_message_t
*
msg
;
while
(
1
)
{
...
...
@@ -268,17 +296,57 @@ tmq_t* tmq_consumer_new(void* conn, tmq_conf_t* conf, char* errstr, int32_t errs
// set conf
strcpy
(
pTmq
->
clientId
,
conf
->
clientId
);
strcpy
(
pTmq
->
groupId
,
conf
->
groupId
);
pTmq
->
autoCommit
=
conf
->
auto
_c
ommit
;
pTmq
->
autoCommit
=
conf
->
auto
C
ommit
;
pTmq
->
commit_cb
=
conf
->
commit_cb
;
pTmq
->
resetOffsetCfg
=
conf
->
resetOffset
;
pTmq
->
consumerId
=
generateRequestId
()
&
(((
uint64_t
)
-
1
)
>>
1
);
pTmq
->
clientTopics
=
taosArrayInit
(
0
,
sizeof
(
SMqClientTopic
));
if
(
pTmq
->
clientTopics
==
NULL
)
{
taosMemoryFree
(
pTmq
);
return
NULL
;
}
pTmq
->
mqueue
=
taosOpenQueue
();
pTmq
->
qall
=
taosAllocateQall
();
tsem_init
(
&
pTmq
->
rspSem
,
0
,
0
);
return
pTmq
;
}
tmq_t
*
tmq_consumer_new1
(
tmq_conf_t
*
conf
,
char
*
errstr
,
int32_t
errstrLen
)
{
tmq_t
*
pTmq
=
taosMemoryCalloc
(
1
,
sizeof
(
tmq_t
));
if
(
pTmq
==
NULL
)
{
return
NULL
;
}
pTmq
->
pTscObj
=
taos_connect
(
conf
->
ip
,
conf
->
user
,
conf
->
pass
,
conf
->
db
,
conf
->
port
);
pTmq
->
inWaiting
=
0
;
pTmq
->
status
=
0
;
pTmq
->
pollCnt
=
0
;
pTmq
->
epoch
=
0
;
pTmq
->
waitingRequest
=
0
;
pTmq
->
readyRequest
=
0
;
// set conf
strcpy
(
pTmq
->
clientId
,
conf
->
clientId
);
strcpy
(
pTmq
->
groupId
,
conf
->
groupId
);
pTmq
->
autoCommit
=
conf
->
autoCommit
;
pTmq
->
commit_cb
=
conf
->
commit_cb
;
pTmq
->
resetOffsetCfg
=
conf
->
resetOffset
;
pTmq
->
consumerId
=
generateRequestId
()
&
(((
uint64_t
)
-
1
)
>>
1
);
pTmq
->
clientTopics
=
taosArrayInit
(
0
,
sizeof
(
SMqClientTopic
));
if
(
pTmq
->
clientTopics
==
NULL
)
{
taosMemoryFree
(
pTmq
);
return
NULL
;
}
pTmq
->
mqueue
=
taosOpenQueue
();
pTmq
->
qall
=
taosAllocateQall
();
tsem_init
(
&
pTmq
->
rspSem
,
0
,
0
);
return
pTmq
;
}
...
...
@@ -372,7 +440,8 @@ tmq_resp_err_t tmq_commit(tmq_t* tmq, const tmq_topic_vgroup_list_t* offsets, in
tmq_resp_err_t
tmq_subscribe
(
tmq_t
*
tmq
,
tmq_list_t
*
topic_list
)
{
SRequestObj
*
pRequest
=
NULL
;
int32_t
sz
=
topic_list
->
cnt
;
SArray
*
container
=
&
topic_list
->
container
;
int32_t
sz
=
taosArrayGetSize
(
container
);
// destroy ex
taosArrayDestroy
(
tmq
->
clientTopics
);
tmq
->
clientTopics
=
taosArrayInit
(
sz
,
sizeof
(
SMqClientTopic
));
...
...
@@ -384,7 +453,8 @@ tmq_resp_err_t tmq_subscribe(tmq_t* tmq, tmq_list_t* topic_list) {
req
.
topicNames
=
taosArrayInit
(
sz
,
sizeof
(
void
*
));
for
(
int
i
=
0
;
i
<
sz
;
i
++
)
{
char
*
topicName
=
topic_list
->
elems
[
i
];
/*char* topicName = topic_list->elems[i];*/
char
*
topicName
=
taosArrayGetP
(
container
,
i
);
SName
name
=
{
0
};
char
*
dbName
=
getDbOfConnection
(
tmq
->
pTscObj
);
...
...
source/dnode/mnode/impl/inc/mndDef.h
浏览文件 @
06285a25
...
...
@@ -633,18 +633,19 @@ static FORCE_INLINE void tDeleteSMqSubscribeObj(SMqSubscribeObj* pSub) {
}
typedef
struct
{
char
name
[
TSDB_TOPIC_FNAME_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
int64_t
createTime
;
int64_t
updateTime
;
int64_t
uid
;
int64_t
dbUid
;
int32_t
version
;
SRWLatch
lock
;
int32_t
sqlLen
;
char
*
sql
;
char
*
logicalPlan
;
char
*
physicalPlan
;
char
name
[
TSDB_TOPIC_FNAME_LEN
];
char
db
[
TSDB_DB_FNAME_LEN
];
int64_t
createTime
;
int64_t
updateTime
;
int64_t
uid
;
int64_t
dbUid
;
int32_t
version
;
SRWLatch
lock
;
int32_t
sqlLen
;
char
*
sql
;
char
*
logicalPlan
;
char
*
physicalPlan
;
SSchemaWrapper
schema
;
}
SMqTopicObj
;
typedef
struct
{
...
...
source/dnode/mnode/impl/src/mndStream.c
浏览文件 @
06285a25
...
...
@@ -248,15 +248,16 @@ static int32_t mndStreamGetPlanString(const char *ast, char **pStr) {
int32_t
mndAddStreamToTrans
(
SMnode
*
pMnode
,
SStreamObj
*
pStream
,
const
char
*
ast
,
STrans
*
pTrans
)
{
SNode
*
pAst
=
NULL
;
#if 1 // TODO: remove debug info later
printf
(
"ast = %s
\n
"
,
ast
);
#endif
if
(
nodesStringToNode
(
ast
,
&
pAst
)
<
0
)
{
return
-
1
;
}
#if 1
qExtractResultSchema
(
pAst
,
(
int32_t
*
)
&
pStream
->
outputSchema
.
nCols
,
&
pStream
->
outputSchema
.
pSchema
);
if
(
qExtractResultSchema
(
pAst
,
(
int32_t
*
)
&
pStream
->
outputSchema
.
nCols
,
&
pStream
->
outputSchema
.
pSchema
)
!=
0
)
{
return
-
1
;
}
#if 1
printf
(
"|"
);
for
(
int
i
=
0
;
i
<
pStream
->
outputSchema
.
nCols
;
i
++
)
{
printf
(
" %15s |"
,
(
char
*
)
pStream
->
outputSchema
.
pSchema
[
i
].
name
);
...
...
source/dnode/mnode/impl/src/mndTopic.c
浏览文件 @
06285a25
...
...
@@ -23,6 +23,7 @@
#include "mndTrans.h"
#include "mndUser.h"
#include "mndVgroup.h"
#include "parser.h"
#include "tname.h"
#define MND_TOPIC_VER_NUMBER 1
...
...
@@ -85,6 +86,16 @@ SSdbRaw *mndTopicActionEncode(SMqTopicObj *pTopic) {
SDB_SET_INT32
(
pRaw
,
dataPos
,
physicalPlanLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
physicalPlan
,
physicalPlanLen
,
TOPIC_ENCODE_OVER
);
int32_t
swLen
=
taosEncodeSSchemaWrapper
(
NULL
,
&
pTopic
->
schema
);
void
*
swBuf
=
taosMemoryMalloc
(
swLen
);
if
(
swBuf
==
NULL
)
{
goto
TOPIC_ENCODE_OVER
;
}
void
*
aswBuf
=
swBuf
;
taosEncodeSSchemaWrapper
(
&
aswBuf
,
&
pTopic
->
schema
);
SDB_SET_INT32
(
pRaw
,
dataPos
,
swLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_BINARY
(
pRaw
,
dataPos
,
swBuf
,
swLen
,
TOPIC_ENCODE_OVER
);
SDB_SET_RESERVE
(
pRaw
,
dataPos
,
MND_TOPIC_RESERVE_SIZE
,
TOPIC_ENCODE_OVER
);
SDB_SET_DATALEN
(
pRaw
,
dataPos
,
TOPIC_ENCODE_OVER
);
...
...
@@ -149,6 +160,17 @@ SSdbRow *mndTopicActionDecode(SSdbRaw *pRaw) {
}
SDB_GET_BINARY
(
pRaw
,
dataPos
,
pTopic
->
physicalPlan
,
len
,
TOPIC_DECODE_OVER
);
SDB_GET_INT32
(
pRaw
,
dataPos
,
&
len
,
TOPIC_DECODE_OVER
);
void
*
buf
=
taosMemoryMalloc
(
len
);
if
(
buf
==
NULL
)
{
terrno
=
TSDB_CODE_OUT_OF_MEMORY
;
goto
TOPIC_DECODE_OVER
;
}
SDB_GET_BINARY
(
pRaw
,
dataPos
,
buf
,
len
,
TOPIC_DECODE_OVER
);
if
(
taosDecodeSSchemaWrapper
(
buf
,
&
pTopic
->
schema
)
==
NULL
)
{
goto
TOPIC_DECODE_OVER
;
}
SDB_GET_RESERVE
(
pRaw
,
dataPos
,
MND_TOPIC_RESERVE_SIZE
,
TOPIC_DECODE_OVER
);
terrno
=
TSDB_CODE_SUCCESS
;
...
...
@@ -283,6 +305,14 @@ static int32_t mndCreateTopic(SMnode *pMnode, SNodeMsg *pReq, SCMCreateTopicReq
topicObj
.
physicalPlan
=
pPlanStr
;
}
SNode
*
pAst
=
NULL
;
if
(
nodesStringToNode
(
pCreate
->
ast
,
&
pAst
)
<
0
)
{
return
-
1
;
}
if
(
qExtractResultSchema
(
pAst
,
&
topicObj
.
schema
.
nCols
,
&
topicObj
.
schema
.
pSchema
)
!=
0
)
{
return
-
1
;
}
STrans
*
pTrans
=
mndTransCreate
(
pMnode
,
TRN_POLICY_ROLLBACK
,
TRN_TYPE_CREATE_TOPIC
,
&
pReq
->
rpcMsg
);
if
(
pTrans
==
NULL
)
{
mError
(
"topic:%s, failed to create since %s"
,
pCreate
->
name
,
terrstr
());
...
...
source/os/src/osMemory.c
浏览文件 @
06285a25
...
...
@@ -131,4 +131,4 @@ int32_t taosMemorySize(void *ptr) {
assert
(
pTdMemoryInfo
->
symbol
==
TD_MEMORY_SYMBOL
);
return
pTdMemoryInfo
->
memorySize
;
}
\ No newline at end of file
}
tests/test/c/tmqDemo.c
浏览文件 @
06285a25
...
...
@@ -694,7 +694,7 @@ int main(int32_t argc, char *argv[]) {
walLogSize
=
getDirectorySize
(
g_stConfInfo
.
vnodeWalPath
);
if
(
walLogSize
<=
0
)
{
printf
(
"vnode2/wal size incorrect!"
);
exit
(
-
1
);
/*exit(-1);*/
}
else
{
if
(
0
==
g_stConfInfo
.
simCase
)
{
pPrint
(
".log file size in vnode2/wal: %.3f MBytes
\n
"
,
(
double
)
walLogSize
/
(
1024
*
1024
.
0
));
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录