Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
c4df4431
T
TDengine
项目概览
taosdata
/
TDengine
接近 2 年 前同步成功
通知
1192
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
c4df4431
编写于
8月 15, 2022
作者:
L
Liu Jicong
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
refactor(tmq): put write logic into taosx.c
上级
b9c9ea59
变更
8
展开全部
隐藏空白更改
内联
并排
Showing
8 changed file
with
1853 addition
and
1770 deletion
+1853
-1770
examples/c/tmq.c
examples/c/tmq.c
+11
-23
include/client/taos.h
include/client/taos.h
+22
-18
include/util/taoserror.h
include/util/taoserror.h
+1
-0
include/util/tref.h
include/util/tref.h
+2
-2
source/client/src/clientMain.c
source/client/src/clientMain.c
+1
-0
source/client/src/taosx.c
source/client/src/taosx.c
+1628
-0
source/client/src/tmq.c
source/client/src/tmq.c
+187
-1727
source/util/src/terror.c
source/util/src/terror.c
+1
-0
未找到文件。
examples/c/tmq.c
浏览文件 @
c4df4431
...
...
@@ -45,10 +45,9 @@ static int32_t msg_process(TAOS_RES* msg) {
int32_t
numOfFields
=
taos_field_count
(
msg
);
int32_t
*
length
=
taos_fetch_lengths
(
msg
);
int32_t
precision
=
taos_result_precision
(
msg
);
const
char
*
tbName
=
tmq_get_table_name
(
msg
);
rows
++
;
taos_print_row
(
buf
,
row
,
fields
,
numOfFields
);
printf
(
"row content
from %s: %s
\n
"
,
(
tbName
!=
NULL
?
tbName
:
"table null"
)
,
buf
);
printf
(
"row content
: %s
\n
"
,
buf
);
}
return
rows
;
...
...
@@ -167,7 +166,7 @@ int32_t create_topic() {
}
taos_free_result
(
pRes
);
pRes
=
taos_query
(
pConn
,
"create topic topicname as select ts, c1, c2, c3 from tmqdb.stb where c1 > 1"
);
pRes
=
taos_query
(
pConn
,
"create topic topicname as select ts, c1, c2, c3
, tbname
from tmqdb.stb where c1 > 1"
);
if
(
taos_errno
(
pRes
)
!=
0
)
{
printf
(
"failed to create topic topicname, reason:%s
\n
"
,
taos_errstr
(
pRes
));
return
-
1
;
...
...
@@ -199,9 +198,7 @@ tmq_t* build_consumer() {
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
code
=
tmq_conf_set
(
conf
,
"auto.offset.reset"
,
"earliest"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
code
=
tmq_conf_set
(
conf
,
"experimental.snapshot.enable"
,
"true"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
code
=
tmq_conf_set
(
conf
,
"msg.with.table.name"
,
"true"
);
code
=
tmq_conf_set
(
conf
,
"experimental.snapshot.enable"
,
"false"
);
if
(
TMQ_CONF_OK
!=
code
)
return
NULL
;
tmq_conf_set_auto_commit_cb
(
conf
,
tmq_commit_cb_print
,
NULL
);
...
...
@@ -220,14 +217,7 @@ tmq_list_t* build_topic_list() {
return
topicList
;
}
void
basic_consume_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topicList
)
{
int32_t
code
;
if
((
code
=
tmq_subscribe
(
tmq
,
topicList
)))
{
fprintf
(
stderr
,
"%% Failed to tmq_subscribe(): %s
\n
"
,
tmq_err2str
(
code
));
return
;
}
void
basic_consume_loop
(
tmq_t
*
tmq
)
{
int32_t
totalRows
=
0
;
int32_t
msgCnt
=
0
;
int32_t
timeout
=
5000
;
...
...
@@ -237,8 +227,8 @@ void basic_consume_loop(tmq_t* tmq, tmq_list_t* topicList) {
msgCnt
++
;
totalRows
+=
msg_process
(
tmqmsg
);
taos_free_result
(
tmqmsg
);
/*} else {*/
/*break;*/
}
else
{
break
;
}
}
...
...
@@ -267,14 +257,12 @@ int main(int argc, char* argv[]) {
return
-
1
;
}
basic_consume_loop
(
tmq
,
topic_list
);
code
=
tmq_unsubscribe
(
tmq
);
if
(
code
)
{
fprintf
(
stderr
,
"%% Failed to unsubscribe: %s
\n
"
,
tmq_err2str
(
code
));
}
else
{
fprintf
(
stderr
,
"%% unsubscribe
\n
"
);
if
((
code
=
tmq_subscribe
(
tmq
,
topic_list
)))
{
fprintf
(
stderr
,
"%% Failed to tmq_subscribe(): %s
\n
"
,
tmq_err2str
(
code
));
}
tmq_list_destroy
(
topic_list
);
basic_consume_loop
(
tmq
);
code
=
tmq_consumer_close
(
tmq
);
if
(
code
)
{
...
...
include/client/taos.h
浏览文件 @
c4df4431
...
...
@@ -131,10 +131,10 @@ DLL_EXPORT int taos_options(TSDB_OPTION option, const void *arg, ...);
DLL_EXPORT
setConfRet
taos_set_config
(
const
char
*
config
);
DLL_EXPORT
int
taos_init
(
void
);
DLL_EXPORT
TAOS
*
taos_connect
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
pass
,
const
char
*
db
,
uint16_t
port
);
DLL_EXPORT
TAOS
*
taos_connect_auth
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
);
DLL_EXPORT
void
taos_close
(
TAOS
*
taos
);
DLL_EXPORT
TAOS
*
taos_connect_auth
(
const
char
*
ip
,
const
char
*
user
,
const
char
*
auth
,
const
char
*
db
,
uint16_t
port
);
DLL_EXPORT
void
taos_close
(
TAOS
*
taos
);
const
char
*
taos_data_type
(
int
type
);
const
char
*
taos_data_type
(
int
type
);
DLL_EXPORT
TAOS_STMT
*
taos_stmt_init
(
TAOS
*
taos
);
DLL_EXPORT
int
taos_stmt_prepare
(
TAOS_STMT
*
stmt
,
const
char
*
sql
,
unsigned
long
length
);
...
...
@@ -244,33 +244,37 @@ DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_comm
/* -------------------------TMQ MSG HANDLE INTERFACE---------------------- */
DLL_EXPORT
const
char
*
tmq_get_topic_name
(
TAOS_RES
*
res
);
DLL_EXPORT
const
char
*
tmq_get_db_name
(
TAOS_RES
*
res
);
DLL_EXPORT
int32_t
tmq_get_vgroup_id
(
TAOS_RES
*
res
);
/* ------------------------------ TAOSX -----------------------------------*/
// note: following apis are unstable
enum
tmq_res_t
{
TMQ_RES_INVALID
=
-
1
,
TMQ_RES_DATA
=
1
,
TMQ_RES_TABLE_META
=
2
,
};
typedef
struct
tmq_raw_data
{
void
*
raw
;
typedef
struct
tmq_raw_data
{
void
*
raw
;
uint32_t
raw_len
;
uint16_t
raw_type
;
}
tmq_raw_data
;
typedef
enum
tmq_res_t
tmq_res_t
;
DLL_EXPORT
tmq_res_t
tmq_get_res_type
(
TAOS_RES
*
res
);
DLL_EXPORT
int32_t
tmq_get_raw
(
TAOS_RES
*
res
,
tmq_raw_data
*
raw
);
DLL_EXPORT
int32_t
tmq_write_raw
(
TAOS
*
taos
,
tmq_raw_data
raw
);
DLL_EXPORT
int
taos_write_raw_block
(
TAOS
*
taos
,
int
numOfRows
,
char
*
pData
,
const
char
*
tbname
);
DLL_EXPORT
void
tmq_free_raw
(
tmq_raw_data
raw
);
DLL_EXPORT
char
*
tmq_get_json_meta
(
TAOS_RES
*
res
);
// Returning null means error. Returned result need to be freed by tmq_free_json_meta
DLL_EXPORT
void
tmq_free_json_meta
(
char
*
jsonMeta
);
DLL_EXPORT
const
char
*
tmq_get_topic_name
(
TAOS_RES
*
res
);
DLL_EXPORT
const
char
*
tmq_get_db_name
(
TAOS_RES
*
res
);
DLL_EXPORT
int32_t
tmq_get_vgroup_id
(
TAOS_RES
*
res
);
DLL_EXPORT
const
char
*
tmq_get_table_name
(
TAOS_RES
*
res
);
/* ------------------------------ TMQ END -------------------------------- */
DLL_EXPORT
const
char
*
tmq_get_table_name
(
TAOS_RES
*
res
);
DLL_EXPORT
tmq_res_t
tmq_get_res_type
(
TAOS_RES
*
res
);
DLL_EXPORT
int32_t
tmq_get_raw
(
TAOS_RES
*
res
,
tmq_raw_data
*
raw
);
DLL_EXPORT
int32_t
tmq_write_raw
(
TAOS
*
taos
,
tmq_raw_data
raw
);
DLL_EXPORT
int
taos_write_raw_block
(
TAOS
*
taos
,
int
numOfRows
,
char
*
pData
,
const
char
*
tbname
);
DLL_EXPORT
void
tmq_free_raw
(
tmq_raw_data
raw
);
// Returning null means error. Returned result need to be freed by tmq_free_json_meta
DLL_EXPORT
char
*
tmq_get_json_meta
(
TAOS_RES
*
res
);
DLL_EXPORT
void
tmq_free_json_meta
(
char
*
jsonMeta
);
/* ---------------------------- TAOSX END -------------------------------- */
typedef
enum
{
TSDB_SRV_STATUS_UNAVAILABLE
=
0
,
...
...
include/util/taoserror.h
浏览文件 @
c4df4431
...
...
@@ -622,6 +622,7 @@ int32_t* taosGetErrno();
//tmq
#define TSDB_CODE_TMQ_INVALID_MSG TAOS_DEF_ERROR_CODE(0, 0x4000)
#define TSDB_CODE_TMQ_CONSUMER_MISMATCH TAOS_DEF_ERROR_CODE(0, 0x4001)
#define TSDB_CODE_TMQ_CONSUMER_CLOSED TAOS_DEF_ERROR_CODE(0, 0x4002)
#ifdef __cplusplus
}
...
...
include/util/tref.h
浏览文件 @
c4df4431
...
...
@@ -29,11 +29,11 @@ int32_t taosOpenRef(int32_t max, void (*fp)(void *));
// close the reference set, refId is the return value by taosOpenRef
// return 0 if success. On error, -1 is returned, and terrno is set appropriately
int32_t
taosCloseRef
(
int32_t
r
ef
Id
);
int32_t
taosCloseRef
(
int32_t
r
set
Id
);
// add ref, p is the pointer to resource or pointer ID
// return Reference ID(rid) allocated. On error, -1 is returned, and terrno is set appropriately
int64_t
taosAddRef
(
int32_t
r
ef
Id
,
void
*
p
);
int64_t
taosAddRef
(
int32_t
r
set
Id
,
void
*
p
);
// remove ref, rid is the reference ID returned by taosAddRef
// return 0 if success. On error, -1 is returned, and terrno is set appropriately
...
...
source/client/src/clientMain.c
浏览文件 @
c4df4431
...
...
@@ -192,6 +192,7 @@ void taos_free_result(TAOS_RES *res) {
if
(
pRsp
->
rsp
.
withSchema
)
taosArrayDestroyP
(
pRsp
->
rsp
.
blockSchema
,
(
FDelete
)
tDeleteSSchemaWrapper
);
pRsp
->
resInfo
.
pRspMsg
=
NULL
;
doFreeReqResultInfo
(
&
pRsp
->
resInfo
);
taosMemoryFree
(
pRsp
);
}
else
if
(
TD_RES_TMQ_META
(
res
))
{
SMqMetaRspObj
*
pRspObj
=
(
SMqMetaRspObj
*
)
res
;
taosMemoryFree
(
pRspObj
->
metaRsp
.
metaRsp
);
...
...
source/client/src/taosx.c
0 → 100644
浏览文件 @
c4df4431
此差异已折叠。
点击以展开。
source/client/src/tmq.c
浏览文件 @
c4df4431
此差异已折叠。
点击以展开。
source/util/src/terror.c
浏览文件 @
c4df4431
...
...
@@ -624,6 +624,7 @@ TAOS_DEFINE_ERROR(TSDB_CODE_INDEX_REBUILDING, "Invalid index file"
//tmq
TAOS_DEFINE_ERROR
(
TSDB_CODE_TMQ_INVALID_MSG
,
"Invalid message"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TMQ_CONSUMER_MISMATCH
,
"Consumer mismatch"
)
TAOS_DEFINE_ERROR
(
TSDB_CODE_TMQ_CONSUMER_CLOSED
,
"Consumer closed"
)
#ifdef TAOS_ERROR_C
};
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录