Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
d6356319
T
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
d6356319
编写于
12月 07, 2020
作者:
B
Bomin Zhang
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
[TD-2310]<feature>: add dest table into show streams
上级
ffadd16e
变更
11
隐藏空白更改
内联
并排
Showing
11 changed file
with
37 addition
and
9 deletion
+37
-9
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+3
-0
src/client/src/tscProfile.c
src/client/src/tscProfile.c
+5
-0
src/client/src/tscStream.c
src/client/src/tscStream.c
+4
-0
src/cq/src/cqMain.c
src/cq/src/cqMain.c
+9
-4
src/cq/test/cqtest.c
src/cq/test/cqtest.c
+1
-1
src/inc/taosmsg.h
src/inc/taosmsg.h
+1
-0
src/inc/tcq.h
src/inc/tcq.h
+1
-1
src/inc/tsdb.h
src/inc/tsdb.h
+1
-1
src/mnode/src/mnodeProfile.c
src/mnode/src/mnodeProfile.c
+10
-0
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+1
-1
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+1
-1
未找到文件。
src/client/inc/tsclient.h
浏览文件 @
d6356319
...
...
@@ -382,6 +382,7 @@ typedef struct SSqlObj {
typedef
struct
SSqlStream
{
SSqlObj
*
pSql
;
const
char
*
dstTable
;
uint32_t
streamId
;
char
listed
;
bool
isProject
;
...
...
@@ -408,6 +409,8 @@ typedef struct SSqlStream {
struct
SSqlStream
*
prev
,
*
next
;
}
SSqlStream
;
void
tscSetStreamDestTable
(
SSqlStream
*
pStream
,
const
char
*
dstTable
);
int32_t
tscInitRpc
(
const
char
*
user
,
const
char
*
secret
,
void
**
pDnodeConn
);
void
tscInitMsgsFp
();
...
...
src/client/src/tscProfile.c
浏览文件 @
d6356319
...
...
@@ -262,6 +262,11 @@ int tscBuildQueryStreamDesc(void *pMsg, STscObj *pObj) {
SSqlStream
*
pStream
=
pObj
->
streamList
;
while
(
pStream
)
{
tstrncpy
(
pSdesc
->
sql
,
pStream
->
pSql
->
sqlstr
,
sizeof
(
pSdesc
->
sql
));
if
(
pStream
->
dstTable
==
NULL
)
{
pSdesc
->
dstTable
[
0
]
=
0
;
}
else
{
tstrncpy
(
pSdesc
->
dstTable
,
pStream
->
dstTable
,
sizeof
(
pSdesc
->
dstTable
));
}
pSdesc
->
streamId
=
htonl
(
pStream
->
streamId
);
pSdesc
->
num
=
htobe64
(
pStream
->
num
);
...
...
src/client/src/tscStream.c
浏览文件 @
d6356319
...
...
@@ -535,6 +535,10 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
pStream
,
pTableMetaInfo
->
name
,
pStream
->
interval
.
interval
,
pStream
->
interval
.
sliding
,
starttime
,
pSql
->
sqlstr
);
}
void
tscSetStreamDestTable
(
SSqlStream
*
pStream
,
const
char
*
dstTable
)
{
pStream
->
dstTable
=
dstTable
;
}
TAOS_STREAM
*
taos_open_stream
(
TAOS
*
taos
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
int64_t
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
))
{
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
...
...
src/cq/src/cqMain.c
浏览文件 @
d6356319
...
...
@@ -57,6 +57,7 @@ typedef struct SCqObj {
uint64_t
uid
;
int32_t
tid
;
// table ID
int32_t
rowSize
;
// bytes of a row
char
*
dstTable
;
char
*
sqlStr
;
// SQL string
STSchema
*
pSchema
;
// pointer to schema array
void
*
pStream
;
...
...
@@ -185,7 +186,7 @@ void cqStop(void *handle) {
pthread_mutex_unlock
(
&
pContext
->
mutex
);
}
void
*
cqCreate
(
void
*
handle
,
uint64_t
uid
,
int32_t
tid
,
char
*
sqlStr
,
STSchema
*
pSchema
)
{
void
*
cqCreate
(
void
*
handle
,
uint64_t
uid
,
int32_t
sid
,
const
char
*
dstTable
,
char
*
sqlStr
,
STSchema
*
pSchema
)
{
if
(
tsEnableStream
==
0
)
{
return
NULL
;
}
...
...
@@ -195,9 +196,11 @@ void *cqCreate(void *handle, uint64_t uid, int32_t tid, char *sqlStr, STSchema *
if
(
pObj
==
NULL
)
return
NULL
;
pObj
->
uid
=
uid
;
pObj
->
tid
=
tid
;
pObj
->
sqlStr
=
malloc
(
strlen
(
sqlStr
)
+
1
);
strcpy
(
pObj
->
sqlStr
,
sqlStr
);
pObj
->
tid
=
sid
;
if
(
dstTable
!=
NULL
)
{
pObj
->
dstTable
=
strdup
(
dstTable
);
}
pObj
->
sqlStr
=
strdup
(
sqlStr
);
pObj
->
pSchema
=
tdDupSchema
(
pSchema
);
pObj
->
rowSize
=
schemaTLen
(
pSchema
);
...
...
@@ -247,6 +250,7 @@ void cqDrop(void *handle) {
cInfo
(
"vgId:%d, id:%d CQ:%s is dropped"
,
pContext
->
vgId
,
pObj
->
tid
,
pObj
->
sqlStr
);
tdFreeSchema
(
pObj
->
pSchema
);
free
(
pObj
->
dstTable
);
free
(
pObj
->
sqlStr
);
free
(
pObj
);
...
...
@@ -292,6 +296,7 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
if
(
pObj
->
pStream
==
NULL
)
{
pObj
->
pStream
=
taos_open_stream
(
pContext
->
dbConn
,
pObj
->
sqlStr
,
cqProcessStreamRes
,
0
,
pObj
,
NULL
);
if
(
pObj
->
pStream
)
{
tscSetStreamDestTable
(
pObj
->
pStream
,
pObj
->
dstTable
);
pContext
->
num
++
;
cInfo
(
"vgId:%d, id:%d CQ:%s is openned"
,
pContext
->
vgId
,
pObj
->
tid
,
pObj
->
sqlStr
);
}
else
{
...
...
src/cq/test/cqtest.c
浏览文件 @
d6356319
...
...
@@ -70,7 +70,7 @@ int main(int argc, char *argv[]) {
tdDestroyTSchemaBuilder
(
&
schemaBuilder
);
for
(
int
sid
=
1
;
sid
<
10
;
++
sid
)
{
cqCreate
(
pCq
,
sid
,
sid
,
"select avg(speed) from demo.t1 sliding(1s) interval(5s)"
,
pSchema
);
cqCreate
(
pCq
,
sid
,
sid
,
NULL
,
"select avg(speed) from demo.t1 sliding(1s) interval(5s)"
,
pSchema
);
}
tdFreeSchema
(
pSchema
);
...
...
src/inc/taosmsg.h
浏览文件 @
d6356319
...
...
@@ -790,6 +790,7 @@ typedef struct {
typedef
struct
{
char
sql
[
TSDB_SHOW_SQL_LEN
];
char
dstTable
[
TSDB_TABLE_NAME_LEN
];
uint32_t
streamId
;
int64_t
num
;
// number of computing/cycles
int64_t
useconds
;
...
...
src/inc/tcq.h
浏览文件 @
d6356319
...
...
@@ -42,7 +42,7 @@ void cqStart(void *handle);
void
cqStop
(
void
*
handle
);
// cqCreate is called by TSDB to start an instance of CQ
void
*
cqCreate
(
void
*
handle
,
uint64_t
uid
,
int32_t
sid
,
char
*
sqlStr
,
STSchema
*
pSchema
);
void
*
cqCreate
(
void
*
handle
,
uint64_t
uid
,
int32_t
sid
,
c
onst
char
*
dstTable
,
c
har
*
sqlStr
,
STSchema
*
pSchema
);
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
void
cqDrop
(
void
*
handle
);
...
...
src/inc/tsdb.h
浏览文件 @
d6356319
...
...
@@ -48,7 +48,7 @@ typedef struct {
void
*
cqH
;
int
(
*
notifyStatus
)(
void
*
,
int
status
,
int
eno
);
int
(
*
eventCallBack
)(
void
*
);
void
*
(
*
cqCreateFunc
)(
void
*
handle
,
uint64_t
uid
,
int
sid
,
char
*
sqlStr
,
STSchema
*
pSchema
);
void
*
(
*
cqCreateFunc
)(
void
*
handle
,
uint64_t
uid
,
int
32_t
sid
,
const
char
*
dstTable
,
char
*
sqlStr
,
STSchema
*
pSchema
);
void
(
*
cqDropFunc
)(
void
*
handle
);
}
STsdbAppH
;
...
...
src/mnode/src/mnodeProfile.c
浏览文件 @
d6356319
...
...
@@ -450,6 +450,12 @@ static int32_t mnodeGetStreamMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
TSDB_TABLE_NAME_LEN
+
VARSTR_HEADER_SIZE
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"dest table"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
pShow
->
bytes
[
cols
]
=
TSDB_IPv4ADDR_LEN
+
6
+
VARSTR_HEADER_SIZE
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_BINARY
;
strcpy
(
pSchema
[
cols
].
name
,
"ip:port"
);
...
...
@@ -524,6 +530,10 @@ static int32_t mnodeRetrieveStreams(SShowObj *pShow, char *data, int32_t rows, v
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
pConnObj
->
user
,
pShow
->
bytes
[
cols
]);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
pDesc
->
dstTable
,
pShow
->
bytes
[
cols
]);
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
snprintf
(
ipStr
,
sizeof
(
ipStr
),
"%s:%u"
,
taosIpStr
(
pConnObj
->
ip
),
pConnObj
->
port
);
STR_WITH_MAXSIZE_TO_VARSTR
(
pWrite
,
ipStr
,
pShow
->
bytes
[
cols
]);
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
d6356319
...
...
@@ -872,7 +872,7 @@ static void tsdbStartStream(STsdbRepo *pRepo) {
for
(
int
i
=
0
;
i
<
pMeta
->
maxTables
;
i
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
i
];
if
(
pTable
&&
pTable
->
type
==
TSDB_STREAM_TABLE
)
{
pTable
->
cqhandle
=
(
*
pRepo
->
appH
.
cqCreateFunc
)(
pRepo
->
appH
.
cqH
,
TABLE_UID
(
pTable
),
TABLE_TID
(
pTable
),
pTable
->
sql
,
pTable
->
cqhandle
=
(
*
pRepo
->
appH
.
cqCreateFunc
)(
pRepo
->
appH
.
cqH
,
TABLE_UID
(
pTable
),
TABLE_TID
(
pTable
),
TABLE_NAME
(
pTable
)
->
data
,
pTable
->
sql
,
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
-
1
));
}
}
...
...
src/tsdb/src/tsdbMeta.c
浏览文件 @
d6356319
...
...
@@ -828,7 +828,7 @@ static int tsdbAddTableToMeta(STsdbRepo *pRepo, STable *pTable, bool addIdx, boo
if
(
lock
&&
tsdbUnlockRepoMeta
(
pRepo
)
<
0
)
return
-
1
;
if
(
TABLE_TYPE
(
pTable
)
==
TSDB_STREAM_TABLE
&&
addIdx
)
{
pTable
->
cqhandle
=
(
*
pRepo
->
appH
.
cqCreateFunc
)(
pRepo
->
appH
.
cqH
,
TABLE_UID
(
pTable
),
TABLE_TID
(
pTable
),
pTable
->
sql
,
pTable
->
cqhandle
=
(
*
pRepo
->
appH
.
cqCreateFunc
)(
pRepo
->
appH
.
cqH
,
TABLE_UID
(
pTable
),
TABLE_TID
(
pTable
),
TABLE_NAME
(
pTable
)
->
data
,
pTable
->
sql
,
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
-
1
));
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录