Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
81ea4a8a
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
81ea4a8a
编写于
6月 03, 2020
作者:
B
Bomin Zhang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
first test case passed
上级
40c1d665
变更
5
隐藏空白更改
内联
并排
Showing
5 changed file
with
166 addition
and
51 deletion
+166
-51
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+1
-0
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+20
-17
src/client/src/tscStream.c
src/client/src/tscStream.c
+6
-33
src/cq/src/cqMain.c
src/cq/src/cqMain.c
+8
-1
tests/pytest/stream/stream1.py
tests/pytest/stream/stream1.py
+131
-0
未找到文件。
src/client/inc/tsclient.h
浏览文件 @
81ea4a8a
...
...
@@ -404,6 +404,7 @@ TAOS *taos_connect_a(char *ip, char *user, char *pass, char *db, uint16_t port,
void
*
param
,
void
**
taos
);
void
waitForQueryRsp
(
void
*
param
,
TAOS_RES
*
tres
,
int
code
)
;
int
doAsyncParseSql
(
SSqlObj
*
pSql
,
const
char
*
sqlstr
,
size_t
sqlLen
);
void
doAsyncQuery
(
STscObj
*
pObj
,
SSqlObj
*
pSql
,
void
(
*
fp
)(),
void
*
param
,
const
char
*
sqlstr
,
size_t
sqlLen
);
void
tscProcessMultiVnodesInsertFromFile
(
SSqlObj
*
pSql
);
...
...
src/client/src/tscAsync.c
浏览文件 @
81ea4a8a
...
...
@@ -40,30 +40,23 @@ static void tscProcessAsyncRetrieveImpl(void *param, TAOS_RES *tres, int numOfRo
static
void
tscAsyncFetchRowsProxy
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
);
static
void
tscAsyncFetchSingleRowProxy
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
);
void
doAsyncQuery
(
STscObj
*
pObj
,
SSqlObj
*
pSql
,
void
(
*
fp
)(),
void
*
param
,
const
char
*
sqlstr
,
size_t
sqlLen
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
pSql
->
signature
=
pSql
;
pSql
->
param
=
param
;
pSql
->
pTscObj
=
pObj
;
pSql
->
maxRetry
=
TSDB_MAX_REPLICA_NUM
;
pSql
->
fp
=
fp
;
sem_init
(
&
pSql
->
rspSem
,
0
,
0
);
if
(
TSDB_CODE_SUCCESS
!=
tscAllocPayload
(
pCmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
))
{
int
doAsyncParseSql
(
SSqlObj
*
pSql
,
const
char
*
sqlstr
,
size_t
sqlLen
)
{
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
int32_t
code
=
tscAllocPayload
(
pCmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscError
(
"failed to malloc payload"
);
tscQueueAsyncError
(
fp
,
param
,
TSDB_CODE_CLI_OUT_OF_MEMORY
);
return
;
tscQueueAsyncError
(
pSql
->
fp
,
pSql
->
param
,
TSDB_CODE_CLI_OUT_OF_MEMORY
);
return
code
;
}
// todo check for OOM problem
pSql
->
sqlstr
=
calloc
(
1
,
sqlLen
+
1
);
if
(
pSql
->
sqlstr
==
NULL
)
{
tscError
(
"%p failed to malloc sql string buffer"
,
pSql
);
tscQueueAsyncError
(
fp
,
param
,
TSDB_CODE_CLI_OUT_OF_MEMORY
);
tscQueueAsyncError
(
pSql
->
fp
,
pSql
->
param
,
TSDB_CODE_CLI_OUT_OF_MEMORY
);
free
(
pCmd
->
payload
);
return
;
return
TSDB_CODE_CLI_OUT_OF_MEMORY
;
}
pRes
->
qhandle
=
0
;
...
...
@@ -72,7 +65,17 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
strtolower
(
pSql
->
sqlstr
,
sqlstr
);
tscDump
(
"%p SQL: %s"
,
pSql
,
pSql
->
sqlstr
);
int32_t
code
=
tsParseSql
(
pSql
,
true
);
return
tsParseSql
(
pSql
,
true
);
}
void
doAsyncQuery
(
STscObj
*
pObj
,
SSqlObj
*
pSql
,
void
(
*
fp
)(),
void
*
param
,
const
char
*
sqlstr
,
size_t
sqlLen
)
{
pSql
->
signature
=
pSql
;
pSql
->
param
=
param
;
pSql
->
pTscObj
=
pObj
;
pSql
->
maxRetry
=
TSDB_MAX_REPLICA_NUM
;
pSql
->
fp
=
fp
;
int32_t
code
=
doAsyncParseSql
(
pSql
,
sqlstr
,
sqlLen
);
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
return
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
src/client/src/tscStream.c
浏览文件 @
81ea4a8a
...
...
@@ -497,46 +497,18 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
}
pSql
->
signature
=
pSql
;
pSql
->
param
=
pSql
;
pSql
->
pTscObj
=
pObj
;
pSql
->
fp
=
asyncCallback
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlRes
*
pRes
=
&
pSql
->
res
;
int
ret
=
tscAllocPayload
(
pCmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
);
if
(
TSDB_CODE_SUCCESS
!=
ret
)
{
setErrorInfo
(
pSql
,
ret
,
NULL
);
free
(
pSql
);
return
NULL
;
}
pSql
->
sqlstr
=
strdup
(
sqlstr
);
if
(
pSql
->
sqlstr
==
NULL
)
{
setErrorInfo
(
pSql
,
TSDB_CODE_CLI_OUT_OF_MEMORY
,
NULL
);
tfree
(
pSql
);
return
NULL
;
}
tsem_init
(
&
pSql
->
rspSem
,
0
,
0
);
SSqlInfo
SQLInfo
=
{
0
};
tSQLParse
(
&
SQLInfo
,
pSql
->
sqlstr
);
tscResetSqlCmdObj
(
&
pSql
->
cmd
);
ret
=
tscAllocPayload
(
&
pSql
->
cmd
,
TSDB_DEFAULT_PAYLOAD_SIZE
);
if
(
TSDB_CODE_SUCCESS
!=
ret
)
{
setErrorInfo
(
pSql
,
ret
,
NULL
);
tscError
(
"%p open stream failed, sql:%s, code:%d"
,
pSql
,
sqlstr
,
TSDB_CODE_CLI_OUT_OF_MEMORY
);
tscFreeSqlObj
(
pSql
);
return
NULL
;
}
pSql
->
param
=
pSql
;
pSql
->
fp
=
asyncCallback
;
pRes
->
code
=
tscToSQLCmd
(
pSql
,
&
SQLInfo
);
if
(
pRes
->
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
int32_t
code
=
doAsyncParseSql
(
pSql
,
sqlstr
,
strlen
(
sqlstr
));
if
(
code
==
TSDB_CODE_ACTION_IN_PROGRESS
)
{
sem_wait
(
&
pSql
->
rspSem
);
}
SQLInfoDestroy
(
&
SQLInfo
);
if
(
pRes
->
code
!=
TSDB_CODE_SUCCESS
)
{
setErrorInfo
(
pSql
,
pRes
->
code
,
pCmd
->
payload
);
...
...
@@ -575,6 +547,7 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
pStream
->
stime
=
tscGetStreamStartTimestamp
(
pSql
,
pStream
,
stime
);
int64_t
starttime
=
tscGetLaunchTimestamp
(
pStream
);
pCmd
->
command
=
TSDB_SQL_SELECT
;
taosTmrReset
(
tscProcessStreamTimer
,
starttime
,
pStream
,
tscTmr
,
&
pStream
->
pTimer
);
tscTrace
(
"%p stream:%p is opened, query on:%s, interval:%"
PRId64
", sliding:%"
PRId64
", first launched in:%"
PRId64
", sql:%s"
,
pSql
,
...
...
src/cq/src/cqMain.c
浏览文件 @
81ea4a8a
...
...
@@ -75,7 +75,14 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
strcpy
(
pContext
->
user
,
pCfg
->
user
);
strcpy
(
pContext
->
pass
,
pCfg
->
pass
);
strcpy
(
pContext
->
db
,
pCfg
->
db
);
const
char
*
db
=
pCfg
->
db
;
for
(
const
char
*
p
=
db
;
*
p
!=
0
;
p
++
)
{
if
(
*
p
==
'.'
)
{
db
=
p
+
1
;
break
;
}
}
strcpy
(
pContext
->
db
,
db
);
pContext
->
vgId
=
pCfg
->
vgId
;
pContext
->
cqWrite
=
pCfg
->
cqWrite
;
pContext
->
ahandle
=
ahandle
;
...
...
tests/pytest/stream/stream1.py
0 → 100644
浏览文件 @
81ea4a8a
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import
sys
import
time
import
taos
from
util.log
import
tdLog
from
util.cases
import
tdCases
from
util.sql
import
tdSql
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
(),
logSql
)
def
run
(
self
):
tbNum
=
10
rowNum
=
20
tdSql
.
prepare
()
tdLog
.
info
(
"===== step1 ====="
)
tdSql
.
execute
(
"create table stb0(ts timestamp, col1 int, col2 float) tags(tgcol int)"
)
for
i
in
range
(
tbNum
):
tdSql
.
execute
(
"create table tb%d using stb0 tags(%d)"
%
(
i
,
i
))
for
j
in
range
(
rowNum
):
tdSql
.
execute
(
"insert into tb%d values (now - %dm, %d, %d)"
%
(
i
,
1440
-
j
,
j
,
j
))
time
.
sleep
(
0.1
)
tdLog
.
info
(
"===== step2 ====="
)
tdSql
.
query
(
"select count(*), count(col1), count(col2) from tb0 interval(1d)"
)
tdSql
.
checkData
(
0
,
1
,
rowNum
)
tdSql
.
checkData
(
0
,
2
,
rowNum
)
tdSql
.
checkData
(
0
,
3
,
rowNum
)
tdSql
.
query
(
"show tables"
)
tdSql
.
checkRows
(
tbNum
)
tdSql
.
execute
(
"create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)"
)
tdSql
.
query
(
"show tables"
)
tdSql
.
checkRows
(
tbNum
+
1
)
tdLog
.
info
(
"===== step3 ====="
)
tdLog
.
info
(
"sleeping 120 seconds"
)
time
.
sleep
(
120
)
tdSql
.
query
(
"select * from s0"
)
tdSql
.
checkData
(
0
,
1
,
rowNum
)
tdSql
.
checkData
(
0
,
2
,
rowNum
)
tdSql
.
checkData
(
0
,
3
,
rowNum
)
tdLog
.
info
(
"===== step4 ====="
)
tdSql
.
execute
(
"drop table s0"
)
tdSql
.
query
(
"show tables"
)
tdSql
.
checkRows
(
tbNum
)
tdLog
.
info
(
"===== step5 ====="
)
tdSql
.
error
(
"select * from s0"
)
tdLog
.
info
(
"===== step6 ====="
)
time
.
sleep
(
0.1
)
tdSql
.
execute
(
"create table s0 as select count(*), count(col1), count(col2) from tb0 interval(1d)"
)
tdSql
.
query
(
"show tables"
)
tdSql
.
checkRows
(
tbNum
+
1
)
tdLog
.
info
(
"===== step7 ====="
)
tdLog
.
info
(
"sleeping 120 seconds"
)
time
.
sleep
(
120
)
tdSql
.
query
(
"select * from s0"
)
tdSql
.
checkData
(
0
,
1
,
rowNum
)
tdSql
.
checkData
(
0
,
2
,
rowNum
)
tdSql
.
checkData
(
0
,
3
,
rowNum
)
tdLog
.
info
(
"===== step8 ====="
)
tdSql
.
query
(
"select count(*), count(col1), count(col2) from stb0 interval(1d)"
)
tdSql
.
checkData
(
0
,
1
,
rowNum
*
tbNum
)
tdSql
.
checkData
(
0
,
2
,
rowNum
*
tbNum
)
tdSql
.
checkData
(
0
,
3
,
rowNum
*
tbNum
)
tdSql
.
query
(
"show tables"
)
tdSql
.
checkRows
(
tbNum
+
1
)
tdSql
.
execute
(
"create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)"
)
tdSql
.
query
(
"show tables"
)
tdSql
.
checkRows
(
tbNum
+
2
)
tdLog
.
info
(
"===== step9 ====="
)
tdLog
.
info
(
"sleeping 120 seconds"
)
time
.
sleep
(
120
)
tdSql
.
query
(
"select * from s1"
)
tdSql
.
checkData
(
0
,
1
,
rowNum
*
tbNum
)
tdSql
.
checkData
(
0
,
2
,
rowNum
*
tbNum
)
tdSql
.
checkData
(
0
,
3
,
rowNum
*
tbNum
)
tdLog
.
info
(
"===== step10 ====="
)
tdSql
.
execute
(
"drop table s1"
)
tdSql
.
query
(
"show tables"
)
tdSql
.
checkRows
(
tbNum
+
1
)
tdLog
.
info
(
"===== step11 ====="
)
tdSql
.
error
(
"select * from s1"
)
tdLog
.
info
(
"===== step12 ====="
)
tdSql
.
execute
(
"create table s1 as select count(*), count(col1), count(col2) from stb0 interval(1d)"
)
tdSql
.
query
(
"show tables"
)
tdSql
.
checkRows
(
tbNum
+
2
)
tdLog
.
info
(
"===== step13 ====="
)
tdLog
.
info
(
"sleeping 120 seconds"
)
time
.
sleep
(
120
)
tdSql
.
query
(
"select * from s1"
)
tdSql
.
checkData
(
0
,
1
,
rowNum
*
tbNum
)
tdSql
.
checkData
(
0
,
2
,
rowNum
*
tbNum
)
tdSql
.
checkData
(
0
,
3
,
rowNum
*
tbNum
)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录