Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
3c29ec92
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
3c29ec92
编写于
3月 29, 2022
作者:
H
Haojun Liao
浏览文件
操作
浏览文件
下载
差异文件
Merge remote-tracking branch 'origin/feature/3.0_liaohj' into feature/3.0_liaohj
上级
f6591305
37021822
变更
9
隐藏空白更改
内联
并排
Showing
9 changed file
with
109 addition
and
156 deletion
+109
-156
include/client/taos.h
include/client/taos.h
+6
-3
source/client/src/clientMain.c
source/client/src/clientMain.c
+22
-8
source/dnode/vnode/src/tq/tq.c
source/dnode/vnode/src/tq/tq.c
+2
-0
source/dnode/vnode/src/tq/tqRead.c
source/dnode/vnode/src/tq/tqRead.c
+2
-57
tests/script/jenkins/basic.txt
tests/script/jenkins/basic.txt
+1
-1
tests/script/tsim/parser/groupby-basic.sim
tests/script/tsim/parser/groupby-basic.sim
+25
-51
tests/script/tsim/testCaseSuite.sim
tests/script/tsim/testCaseSuite.sim
+1
-1
tests/script/tsim/tmq/basic1.sim
tests/script/tsim/tmq/basic1.sim
+18
-17
tests/test/c/tmqSim.c
tests/test/c/tmqSim.c
+32
-18
未找到文件。
include/client/taos.h
浏览文件 @
3c29ec92
...
...
@@ -29,8 +29,8 @@ typedef void TAOS_RES;
typedef
void
**
TAOS_ROW
;
#if 0
typedef void TAOS_STREAM;
typedef void TAOS_SUB;
#endif
typedef
void
TAOS_SUB
;
// Data type definition
#define TSDB_DATA_TYPE_NULL 0 // 1 bytes
...
...
@@ -182,13 +182,16 @@ DLL_EXPORT int taos_errno(TAOS_RES *tres);
DLL_EXPORT
void
taos_query_a
(
TAOS
*
taos
,
const
char
*
sql
,
__taos_async_fn_t
fp
,
void
*
param
);
DLL_EXPORT
void
taos_fetch_rows_a
(
TAOS_RES
*
res
,
__taos_async_fn_t
fp
,
void
*
param
);
#if 0
// Shuduo: temporary enable for app build
#if 1
typedef
void
(
*
__taos_sub_fn_t
)(
TAOS_SUB
*
tsub
,
TAOS_RES
*
res
,
void
*
param
,
int
code
);
DLL_EXPORT
TAOS_SUB
*
taos_subscribe
(
TAOS
*
taos
,
int
restart
,
const
char
*
topic
,
const
char
*
sql
,
__taos_sub_fn_t
fp
,
void
*
param
,
int
interval
);
DLL_EXPORT
TAOS_RES
*
taos_consume
(
TAOS_SUB
*
tsub
);
DLL_EXPORT
void
taos_unsubscribe
(
TAOS_SUB
*
tsub
,
int
keepProgress
);
#endif
#if 0
DLL_EXPORT TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sql, void (*fp)(void *param, TAOS_RES *, TAOS_ROW row),
int64_t stime, void *param, void (*callback)(void *));
DLL_EXPORT void taos_close_stream(TAOS_STREAM *tstr);
...
...
@@ -281,7 +284,7 @@ DLL_EXPORT TAOS_RES *tmq_create_topic(TAOS *taos, const char *name, const char *
DLL_EXPORT
TAOS_RES
*
tmq_create_stream
(
TAOS
*
taos
,
const
char
*
streamName
,
const
char
*
tbName
,
const
char
*
sql
);
/* ------------------------------ TMQ END -------------------------------- */
#if
0
#if
1 // Shuduo: temporary enable for app build
typedef
void
(
*
TAOS_SUBSCRIBE_CALLBACK
)(
TAOS_SUB
*
tsub
,
TAOS_RES
*
res
,
void
*
param
,
int
code
);
#endif
...
...
source/client/src/clientMain.c
浏览文件 @
3c29ec92
...
...
@@ -392,7 +392,7 @@ void taos_query_a(TAOS *taos, const char *sql, __taos_async_fn_t fp, void *param
void
taos_fetch_rows_a
(
TAOS_RES
*
res
,
__taos_async_fn_t
fp
,
void
*
param
)
{
// TODO
}
#if 0
TAOS_SUB
*
taos_subscribe
(
TAOS
*
taos
,
int
restart
,
const
char
*
topic
,
const
char
*
sql
,
TAOS_SUBSCRIBE_CALLBACK
fp
,
void
*
param
,
int
interval
)
{
// TODO
return
NULL
;
...
...
@@ -406,7 +406,6 @@ TAOS_RES *taos_consume(TAOS_SUB *tsub) {
void
taos_unsubscribe
(
TAOS_SUB
*
tsub
,
int
keepProgress
)
{
// TODO
}
#endif
TAOS_STMT
*
taos_stmt_init
(
TAOS
*
taos
)
{
// TODO
...
...
@@ -458,12 +457,27 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
return
-
1
;
}
int
taos_stmt_
add_batch
(
TAOS_STMT
*
stm
t
)
{
// TODO
return
-
1
;
int
taos_stmt_
is_insert
(
TAOS_STMT
*
stmt
,
int
*
inser
t
)
{
// TODO
return
-
1
;
}
int
taos_stmt_bind_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
)
{
// TODO
return
-
1
;
int
taos_stmt_num_params
(
TAOS_STMT
*
stmt
,
int
*
nums
)
{
// TODO
return
-
1
;
}
int
taos_stmt_add_batch
(
TAOS_STMT
*
stmt
)
{
// TODO
return
-
1
;
}
TAOS_RES
*
taos_stmt_use_result
(
TAOS_STMT
*
stmt
)
{
// TODO
return
NULL
;
}
int
taos_stmt_bind_param_batch
(
TAOS_STMT
*
stmt
,
TAOS_MULTI_BIND
*
bind
)
{
// TODO
return
-
1
;
}
source/dnode/vnode/src/tq/tq.c
浏览文件 @
3c29ec92
...
...
@@ -275,6 +275,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
pMsg
->
pCont
=
NULL
;
pMsg
->
contLen
=
0
;
pMsg
->
code
=
-
1
;
ASSERT
(
0
);
rpcSendResponse
(
pMsg
);
return
0
;
}
...
...
@@ -356,6 +357,7 @@ int32_t tqProcessPollReq(STQ* pTq, SRpcMsg* pMsg) {
void
*
buf
=
rpcMallocCont
(
tlen
);
if
(
buf
==
NULL
)
{
pMsg
->
code
=
-
1
;
ASSERT
(
0
);
return
-
1
;
}
((
SMqRspHead
*
)
buf
)
->
mqMsgType
=
TMQ_MSG_TYPE__POLL_RSP
;
...
...
source/dnode/vnode/src/tq/tqRead.c
浏览文件 @
3c29ec92
...
...
@@ -142,15 +142,6 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
colInfo
.
info
.
colId
=
pColSchema
->
colId
;
colInfo
.
info
.
type
=
pColSchema
->
type
;
#if 0
colInfo.pData = taosMemoryCalloc(1, sz);
if (colInfo.pData == NULL) {
// TODO free
taosArrayDestroy(pArray);
return NULL;
}
#endif
if
(
blockDataEnsureColumnCapacity
(
&
colInfo
,
numOfRows
)
<
0
)
{
taosArrayDestroyEx
(
pArray
,
(
void
(
*
)(
void
*
))
tDeleteSSDataBlock
);
return
NULL
;
...
...
@@ -161,39 +152,9 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
}
}
#if 0
int j = 0;
for (int32_t i = 0; i < colNumNeed; i++) {
col_id_t colId = *(col_id_t*)taosArrayGet(pHandle->pColIdList, i);
while (j < pSchemaWrapper->nCols && pSchemaWrapper->pSchema[j].colId < colId) {
j++;
}
if (j >= pSchemaWrapper->nCols) {
continue;
}
SSchema* pColSchema = &pSchemaWrapper->pSchema[j];
SColumnInfoData colInfo = {0};
int sz = numOfRows * pColSchema->bytes;
colInfo.info.bytes = pColSchema->bytes;
colInfo.info.colId = colId;
colInfo.info.type = pColSchema->type;
colInfo.pData = taosMemoryCalloc(1, sz);
if (colInfo.pData == NULL) {
// TODO free
taosArrayDestroy(pArray);
return NULL;
}
blockDataEnsureColumnCapacity(&colInfo, numOfRows);
taosArrayPush(pArray, &colInfo);
}
#endif
STSRowIter
iter
=
{
0
};
tdSTSRowIterInit
(
&
iter
,
pTschema
);
STSRow
*
row
;
// int32_t kvIdx = 0;
int32_t
curRow
=
0
;
tInitSubmitBlkIter
(
pHandle
->
pBlock
,
&
pHandle
->
blkIter
);
while
((
row
=
tGetSubmitBlkNext
(
&
pHandle
->
blkIter
))
!=
NULL
)
{
...
...
@@ -206,25 +167,9 @@ SArray* tqRetrieveDataBlock(STqReadHandle* pHandle) {
if
(
!
tdSTSRowIterNext
(
&
iter
,
pColData
->
info
.
colId
,
pColData
->
info
.
type
,
&
sVal
))
{
break
;
}
memcpy
(
POINTER_SHIFT
(
pColData
->
pData
,
curRow
*
pColData
->
info
.
bytes
),
sVal
.
val
,
pColData
->
info
.
bytes
);
}
#if 0
for (int32_t i = 0; i < colNumNeed; i++) {
SColumnInfoData* pColData = taosArrayGet(pArray, i);
STColumn* pCol = schemaColAt(pTschema, i);
// TODO
if(pCol->colId != pColData->info.colId) {
continue;
}
// void* val = tdGetMemRowDataOfColEx(row, pCol->colId, pCol->type, TD_DATA_ROW_HEAD_SIZE + pCol->offset, &kvIdx);
SCellVal sVal = {0};
if (!tdSTSRowIterNext(&iter, pCol->colId, pCol->type, &sVal)) {
// TODO: reach end
break;
}
memcpy(POINTER_SHIFT(pColData->pData, curRow * pCol->bytes), sVal.val, pCol->bytes);
// TODO handle null
colDataAppend
(
pColData
,
curRow
,
sVal
.
val
,
sVal
.
valType
==
TD_VTYPE_NULL
);
}
#endif
curRow
++
;
}
return
pArray
;
...
...
tests/script/jenkins/basic.txt
浏览文件 @
3c29ec92
...
...
@@ -37,6 +37,6 @@
# ---- tmq
./test.sh -f tsim/tmq/basic.sim
#
./test.sh -f tsim/tmq/basic1.sim
./test.sh -f tsim/tmq/basic1.sim
#======================b1-end===============
tests/script/tsim/parser/groupby-basic.sim
浏览文件 @
3c29ec92
...
...
@@ -107,80 +107,54 @@ print rows: $rows
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
if $row != 20 then
return -1
endi
if $data00 != 100 then
return -1
print $data80 $data81 $data82 $data83
print $data90 $data91 $data92 $data93
if $rows != 10 then
return -1
endi
#if $data00 != 10 then
# return -1
#endi
if $data01 != 0 then
return -1
endi
if $data10 != 100 then
#if $data10 != 10 then
# return -1
#endi
if $data11 != 1 then
return -1
endi
if $data11 != 1 then
#if $data90 != 10 then
# return -1
#endi
if $data91 != 9 then
return -1
endi
sql select first(ts),c1 from group_tb0 where c1<20 group by c1;
if $row != 20 then
sql select first(ts),c1 from group_tb0 group by c1;
print rows: $rows
print $data00 $data01 $data02 $data03
print $data10 $data11 $data12 $data13
print $data20 $data21 $data22 $data23
print $data80 $data81 $data82 $data83
print $data90 $data91 $data92 $data93
if $row != 10 then
return -1
endi
if $data00 != @
70-01-01 08:01:4
0.000@ then
if $data00 != @
2022-01-01 00:00:0
0.000@ then
return -1
endi
if $data01 != 0 then
return -1
endi
if $data90 != @70-01-01 08:01:40.009@ then
if $data90 != @2022-01-01 00:00:00.009@ then
return -1
endi
if $data91 != 9 then
return -1
endi
sql select first(ts), ts, c1 from group_tb0 where c1 < 20 group by c1;
print $row
if $row != 20 then
return -1
endi
if $data00 != $data01 then
return -1
endi
if $data10 != $data11 then
return -1
endi
if $data20 != $data21 then
return -1
endi
if $data90 != $data91 then
return -1
endi
if $data02 != 0 then
return -1
endi
if $data12 != 1 then
return -1
endi
if $data92 != 9 then
return -1
endi
sql select sum(c1), c1, avg(c1), min(c1), max(c2) from group_tb0 where c1 < 20 group by c1;
if $row != 20 then
...
...
tests/script/tsim/testCaseSuite.sim
浏览文件 @
3c29ec92
...
...
@@ -26,4 +26,4 @@ run tsim/show/basic.sim
run tsim/table/basic1.sim
run tsim/tmq/basic.sim
#
run tsim/tmq/basic1.sim
run tsim/tmq/basic1.sim
tests/script/tsim/tmq/basic1.sim
浏览文件 @
3c29ec92
...
...
@@ -3,9 +3,10 @@
# vgroups=1, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# vgroups=4, one topic for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# vgroups=4, multi topics for one consumer, include: columns from stb/ctb/ntb, * from stb/ctb/ntb, Scalar function from stb/ctb/ntb
# notes1: Scalar function: ABS/ACOS/ASIN/ATAN/CEIL/COS/FLOOR/LOG/POW/ROUND/SIN/SQRT/TAN
# The above use cases are combined with where filter conditions, such as: where ts > "2017-08-12 18:25:58.128Z" and sin(a) > 0.5;
#
# notes: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
# notes
2
: not support aggregate functions(such as sum/count/min/max) and time-windows(interval).
#
system sh/stop_dnodes.sh
...
...
@@ -135,44 +136,44 @@ print inserted totalMsgCnt: $totalMsgCnt
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_column" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 20}@ then
if $system_content != @{consume success: 20
, 0
}@ then
return -1
endi
#print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2"
#system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_stb_all" -k "group.id:tg2"
#print cmd result----> $system_content
#if $system_content != @{consume success: 20}@ then
#if $system_content != @{consume success: 20
, 0
}@ then
# return -1
#endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_column" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 10}@ then
if $system_content != @{consume success: 10
, 0
}@ then
return -1
endi
#
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
#
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
#
print cmd result----> $system_content
#if $system_content != @{consume success: 1
0}@ then
#
return -1
#
endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ctb_all" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 10,
0}@ then
return -1
endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_column" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 20}@ then
if $system_content != @{consume success: 20
, 0
}@ then
return -1
endi
#
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
#
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
#
print cmd result----> $system_content
#if $system_content != @{consume success: 2
0}@ then
#
return -1
#
endi
print cmd===> system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
system_content ../../debug/tests/test/c/tmq_sim -c ../../sim/tsim/cfg -d $dbNamme -t "topic_ntb_all" -k "group.id:tg2"
print cmd result----> $system_content
if $system_content != @{consume success: 20,
0}@ then
return -1
endi
print =============== create database , vgroup 4
$dbNamme = d1
...
...
tests/test/c/tmqSim.c
浏览文件 @
3c29ec92
...
...
@@ -219,33 +219,33 @@ tmq_list_t* build_topic_list() {
return
topic_list
;
}
void
perf_loop
(
tmq_t
*
tmq
,
tmq_list_t
*
topics
)
{
void
loop_consume
(
tmq_t
*
tmq
)
{
tmq_resp_err_t
err
;
if
((
err
=
tmq_subscribe
(
tmq
,
topics
)))
{
printf
(
"tmq_subscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
exit
(
-
1
);
}
int32_t
totalMsgs
=
0
;
int32_t
totalRows
=
0
;
int32_t
skipLogNum
=
0
;
//int64_t startTime = taosGetTimestampUs();
while
(
running
)
{
tmq_message_t
*
tmqmessage
=
tmq_consumer_poll
(
tmq
,
1
);
if
(
tmqmessage
)
{
totalMsgs
++
;
skipLogNum
+=
tmqGetSkipLogNum
(
tmqmessage
);
tmq_message_t
*
tmqMsg
=
tmq_consumer_poll
(
tmq
,
1
);
if
(
tmqMsg
)
{
totalMsgs
++
;
#if 0
TAOS_ROW row;
while (NULL != (row = tmq_get_row(tmqMsg))) {
totalRows++;
}
#endif
skipLogNum
+=
tmqGetSkipLogNum
(
tmqMsg
);
if
(
0
!=
g_stConfInfo
.
showMsgFlag
)
{
msg_process
(
tmq
message
);
msg_process
(
tmq
Msg
);
}
tmq_message_destroy
(
tmq
message
);
tmq_message_destroy
(
tmq
Msg
);
}
else
{
break
;
}
}
//int64_t endTime = taosGetTimestampUs();
//double consumeTime = (double)(endTime - startTime) / 1000000;
err
=
tmq_consumer_close
(
tmq
);
if
(
err
)
{
...
...
@@ -253,7 +253,7 @@ void perf_loop(tmq_t* tmq, tmq_list_t* topics) {
exit
(
-
1
);
}
printf
(
"{consume success: %d
}"
,
totalMsg
s
);
printf
(
"{consume success: %d
, %d}"
,
totalMsgs
,
totalRow
s
);
}
int
main
(
int32_t
argc
,
char
*
argv
[])
{
...
...
@@ -266,7 +266,21 @@ int main(int32_t argc, char *argv[]) {
return
-
1
;
}
perf_loop
(
tmq
,
topic_list
);
tmq_resp_err_t
err
=
tmq_subscribe
(
tmq
,
topic_list
);
if
(
err
)
{
printf
(
"tmq_subscribe() fail, reason: %s
\n
"
,
tmq_err2str
(
err
));
exit
(
-
1
);
}
loop_consume
(
tmq
);
#if 0
err = tmq_unsubscribe(tmq);
if (err) {
printf("tmq_unsubscribe() fail, reason: %s\n", tmq_err2str(err));
exit(-1);
}
#endif
return
0
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录