Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
27fab2cf
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
27fab2cf
编写于
5月 02, 2020
作者:
J
jtao1735
浏览文件
操作
浏览文件
下载
电子邮件补丁
差异文件
pass the unit testing
上级
cdcb0daa
变更
7
显示空白变更内容
内联
并排
Showing
7 changed file
with
79 addition
and
96 deletion
+79
-96
src/cq/CMakeLists.txt
src/cq/CMakeLists.txt
+1
-0
src/cq/src/cqMain.c
src/cq/src/cqMain.c
+38
-68
src/cq/test/cqtest.c
src/cq/test/cqtest.c
+21
-8
src/inc/tcq.h
src/inc/tcq.h
+11
-3
src/inc/tsdb.h
src/inc/tsdb.h
+1
-1
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+7
-6
src/vnode/src/vnodeWrite.c
src/vnode/src/vnodeWrite.c
+0
-10
未找到文件。
src/cq/CMakeLists.txt
浏览文件 @
27fab2cf
...
...
@@ -4,6 +4,7 @@ PROJECT(TDengine)
INCLUDE_DIRECTORIES
(
${
TD_OS_DIR
}
/inc
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/src/inc
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/src/util/inc
)
INCLUDE_DIRECTORIES
(
${
TD_COMMUNITY_DIR
}
/src/common/inc
)
INCLUDE_DIRECTORIES
(
inc
)
AUX_SOURCE_DIRECTORY
(
${
CMAKE_CURRENT_SOURCE_DIR
}
/src SRC
)
...
...
src/cq/src/cqMain.c
浏览文件 @
27fab2cf
...
...
@@ -20,6 +20,7 @@
#include <pthread.h>
#include "taosdef.h"
#include "taosmsg.h"
#include "tglobal.h"
#include "tlog.h"
#include "twal.h"
#include "tcq.h"
...
...
@@ -32,7 +33,6 @@
typedef
struct
{
int
vgId
;
char
path
[
TSDB_FILENAME_LEN
];
char
user
[
TSDB_USER_LEN
];
char
pass
[
TSDB_PASSWORD_LEN
];
FCqWrite
cqWrite
;
...
...
@@ -44,12 +44,13 @@ typedef struct {
}
SCqContext
;
typedef
struct
SCqObj
{
int
s
id
;
// table ID
int
t
id
;
// table ID
int
rowSize
;
// bytes of a row
char
*
sqlStr
;
// SQL string
int
columns
;
// number of columns
SSchema
*
pSchema
;
// pointer to schema array
void
*
pStream
;
struct
SCqObj
*
prev
;
struct
SCqObj
*
next
;
SCqContext
*
pContext
;
}
SCqObj
;
...
...
@@ -65,30 +66,10 @@ void *cqOpen(void *ahandle, const SCqCfg *pCfg) {
strcpy
(
pContext
->
user
,
pCfg
->
user
);
strcpy
(
pContext
->
pass
,
pCfg
->
pass
);
strcpy
(
pContext
->
path
,
pCfg
->
path
);
pContext
->
vgId
=
pCfg
->
vgId
;
pContext
->
cqWrite
=
pCfg
->
cqWrite
;
pContext
->
ahandle
=
ahandle
;
// open meta data file
// loop each record
while
(
1
)
{
SCqObj
*
pObj
=
calloc
(
sizeof
(
SCqObj
),
1
);
if
(
pObj
==
NULL
)
{
cError
(
"vgId:%d, no memory"
,
pContext
->
vgId
);
continue
;
}
pObj
->
next
=
pContext
->
pHead
;
pContext
->
pHead
=
pObj
;
// assigne each field in SCqObj
// pObj->sid =
// strcpy(pObj->sqlStr, ?? );
// schema, columns
}
pthread_mutex_init
(
&
pContext
->
mutex
,
NULL
);
cTrace
(
"vgId:%d, CQ is opened"
,
pContext
->
vgId
);
...
...
@@ -102,8 +83,6 @@ void cqClose(void *handle) {
// stop all CQs
cqStop
(
pContext
);
// save the meta data
// free all resources
SCqObj
*
pObj
=
pContext
->
pHead
;
while
(
pObj
)
{
...
...
@@ -125,23 +104,23 @@ void cqStart(void *handle) {
pthread_mutex_lock
(
&
pContext
->
mutex
);
tscEmbedded
=
1
;
pContext
->
dbConn
=
taos_connect
(
"localhost"
,
pContext
->
user
,
pContext
->
pass
,
NULL
,
0
);
if
(
pContext
->
dbConn
)
{
if
(
pContext
->
dbConn
==
NULL
)
{
cError
(
"vgId:%d, failed to connect to TDengine(%s)"
,
pContext
->
vgId
,
tstrerror
(
terrno
));
pthread_mutex_unlock
(
&
pContext
->
mutex
);
return
;
}
SCqObj
*
pObj
=
pContext
->
pHead
;
while
(
pObj
)
{
int64_t
lastKey
=
0
;
pObj
->
pStream
=
taos_open_stream
(
pContext
->
dbConn
,
pObj
->
sqlStr
,
cqProcessStreamRes
,
lastKey
,
pObj
,
NULL
);
if
(
pObj
->
pStream
)
{
pContext
->
num
++
;
cTrace
(
"vgId:%d, id:%d CQ:%s is openned"
,
pContext
->
vgId
,
pObj
->
s
id
,
pObj
->
sqlStr
);
cTrace
(
"vgId:%d, id:%d CQ:%s is openned"
,
pContext
->
vgId
,
pObj
->
t
id
,
pObj
->
sqlStr
);
}
else
{
cError
(
"vgId:%d, id:%d CQ:%s, failed to open"
,
pContext
->
vgId
,
pObj
->
sqlStr
);
cError
(
"vgId:%d, id:%d CQ:%s, failed to open"
,
pContext
->
vgId
,
pObj
->
tid
,
pObj
->
sqlStr
);
}
pObj
=
pObj
->
next
;
}
...
...
@@ -158,9 +137,11 @@ void cqStop(void *handle) {
SCqObj
*
pObj
=
pContext
->
pHead
;
while
(
pObj
)
{
if
(
pObj
->
pStream
)
taos_close_stream
(
pObj
->
pStream
);
if
(
pObj
->
pStream
)
{
taos_close_stream
(
pObj
->
pStream
);
pObj
->
pStream
=
NULL
;
cTrace
(
"vgId:%d, id:%d CQ:%s is closed"
,
pContext
->
vgId
,
pObj
->
sid
,
pObj
->
sqlStr
);
cTrace
(
"vgId:%d, id:%d CQ:%s is closed"
,
pContext
->
vgId
,
pObj
->
tid
,
pObj
->
sqlStr
);
}
pObj
=
pObj
->
next
;
}
...
...
@@ -171,13 +152,13 @@ void cqStop(void *handle) {
pthread_mutex_unlock
(
&
pContext
->
mutex
);
}
void
cqCreate
(
void
*
handle
,
int
s
id
,
char
*
sqlStr
,
SSchema
*
pSchema
,
int
columns
)
{
void
*
cqCreate
(
void
*
handle
,
int
t
id
,
char
*
sqlStr
,
SSchema
*
pSchema
,
int
columns
)
{
SCqContext
*
pContext
=
handle
;
SCqObj
*
pObj
=
calloc
(
sizeof
(
SCqObj
),
1
);
if
(
pObj
==
NULL
)
return
;
if
(
pObj
==
NULL
)
return
NULL
;
pObj
->
sid
=
s
id
;
pObj
->
tid
=
t
id
;
pObj
->
sqlStr
=
malloc
(
strlen
(
sqlStr
)
+
1
);
strcpy
(
pObj
->
sqlStr
,
sqlStr
);
...
...
@@ -187,11 +168,12 @@ void cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns
pObj
->
pSchema
=
malloc
(
size
);
memcpy
(
pObj
->
pSchema
,
pSchema
,
size
);
cTrace
(
"vgId:%d, id:%d CQ:%s is created"
,
pContext
->
vgId
,
pObj
->
s
id
,
pObj
->
sqlStr
);
cTrace
(
"vgId:%d, id:%d CQ:%s is created"
,
pContext
->
vgId
,
pObj
->
t
id
,
pObj
->
sqlStr
);
pthread_mutex_lock
(
&
pContext
->
mutex
);
pObj
->
next
=
pContext
->
pHead
;
if
(
pContext
->
pHead
)
pContext
->
pHead
->
prev
=
pObj
;
pContext
->
pHead
=
pObj
;
if
(
pContext
->
dbConn
)
{
...
...
@@ -199,50 +181,39 @@ void cqCreate(void *handle, int sid, char *sqlStr, SSchema *pSchema, int columns
pObj
->
pStream
=
taos_open_stream
(
pContext
->
dbConn
,
pObj
->
sqlStr
,
cqProcessStreamRes
,
lastKey
,
pObj
,
NULL
);
if
(
pObj
->
pStream
)
{
pContext
->
num
++
;
cTrace
(
"vgId:%d, id:%d CQ:%s is openned"
,
pContext
->
vgId
,
pObj
->
s
id
,
pObj
->
sqlStr
);
cTrace
(
"vgId:%d, id:%d CQ:%s is openned"
,
pContext
->
vgId
,
pObj
->
t
id
,
pObj
->
sqlStr
);
}
else
{
cError
(
"vgId:%d, id:%d CQ:%s, failed to launch"
,
pContext
->
vgId
,
pObj
->
s
id
,
pObj
->
sqlStr
);
cError
(
"vgId:%d, id:%d CQ:%s, failed to launch"
,
pContext
->
vgId
,
pObj
->
t
id
,
pObj
->
sqlStr
);
}
}
pthread_mutex_unlock
(
&
pContext
->
mutex
);
return
pObj
;
}
void
cqDrop
(
void
*
handle
,
int
sid
)
{
SCqContext
*
pContext
=
handle
;
void
cqDrop
(
void
*
handle
)
{
SCqObj
*
pObj
=
handle
;
SCqContext
*
pContext
=
pObj
->
pContext
;
pthread_mutex_lock
(
&
pContext
->
mutex
);
// locate the pObj;
SCqObj
*
prev
=
NULL
;
SCqObj
*
pObj
=
pContext
->
pHead
;
while
(
pObj
)
{
if
(
pObj
->
sid
!=
sid
)
{
prev
=
pObj
;
pObj
=
pObj
->
next
;
continue
;
}
// remove from the linked list
if
(
prev
)
{
prev
->
next
=
pObj
->
next
;
if
(
pObj
->
prev
)
{
pObj
->
prev
->
next
=
pObj
->
next
;
}
else
{
pContext
->
pHead
=
pObj
->
next
;
}
break
;
if
(
pObj
->
next
)
{
pObj
->
next
->
prev
=
pObj
->
prev
;
}
if
(
pObj
)
{
// update the meta data
// free the resources associated
if
(
pObj
->
pStream
)
taos_close_stream
(
pObj
->
pStream
);
pObj
->
pStream
=
NULL
;
cTrace
(
"vgId:%d, id:%d CQ:%s is dropped"
,
pContext
->
vgId
,
pObj
->
s
id
,
pObj
->
sqlStr
);
cTrace
(
"vgId:%d, id:%d CQ:%s is dropped"
,
pContext
->
vgId
,
pObj
->
t
id
,
pObj
->
sqlStr
);
free
(
pObj
);
}
pthread_mutex_lock
(
&
pContext
->
mutex
);
}
...
...
@@ -252,7 +223,7 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
SCqContext
*
pContext
=
pObj
->
pContext
;
if
(
pObj
->
pStream
==
NULL
)
return
;
cTrace
(
"vgId:%d, id:%d CQ:%s stream result is ready"
,
pContext
->
vgId
,
pObj
->
s
id
,
pObj
->
sqlStr
);
cTrace
(
"vgId:%d, id:%d CQ:%s stream result is ready"
,
pContext
->
vgId
,
pObj
->
t
id
,
pObj
->
sqlStr
);
// construct data
int
size
=
sizeof
(
SWalHead
)
+
sizeof
(
SSubmitMsg
)
+
sizeof
(
SSubmitBlk
)
+
pObj
->
rowSize
;
...
...
@@ -269,11 +240,10 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row) {
SSubmitBlk
*
pBlk
=
(
SSubmitBlk
*
)
(
buffer
+
sizeof
(
SWalHead
)
+
sizeof
(
SSubmitMsg
));
// to do: fill in the SSubmitBlk strucuture
pBlk
->
tid
=
pObj
->
s
id
;
pBlk
->
tid
=
pObj
->
t
id
;
// write into vnode write queue
pContext
->
cqWrite
(
pContext
->
ahandle
,
pHead
,
TAOS_QTYPE_CQ
);
}
src/cq/test/cqtest.c
浏览文件 @
27fab2cf
...
...
@@ -29,16 +29,16 @@ int writeToQueue(void *pVnode, void *data, int type) {
}
int
main
(
int
argc
,
char
*
argv
[])
{
char
path
[
128
]
=
"~/cq"
;
int
num
=
3
;
for
(
int
i
=
1
;
i
<
argc
;
++
i
)
{
if
(
strcmp
(
argv
[
i
],
"-p"
)
==
0
&&
i
<
argc
-
1
)
{
strcpy
(
path
,
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
&&
i
<
argc
-
1
)
{
if
(
strcmp
(
argv
[
i
],
"-d"
)
==
0
&&
i
<
argc
-
1
)
{
ddebugFlag
=
atoi
(
argv
[
++
i
]);
}
else
if
(
strcmp
(
argv
[
i
],
"-n"
)
==
0
&&
i
<
argc
-
1
)
{
num
=
atoi
(
argv
[
++
i
]);
}
else
{
printf
(
"
\n
usage: %s [options]
\n
"
,
argv
[
0
]);
printf
(
" [-
p path]: wal file path default is:%s
\n
"
,
path
);
printf
(
" [-
n num]: number of streams, default:%d
\n
"
,
num
);
printf
(
" [-d debugFlag]: debug flag, default:%d
\n
"
,
ddebugFlag
);
printf
(
" [-h help]: print out this help
\n\n
"
);
exit
(
0
);
...
...
@@ -50,7 +50,6 @@ int main(int argc, char *argv[]) {
SCqCfg
cqCfg
;
strcpy
(
cqCfg
.
user
,
"root"
);
strcpy
(
cqCfg
.
pass
,
"taosdata"
);
strcpy
(
cqCfg
.
path
,
path
);
cqCfg
.
vgId
=
2
;
cqCfg
.
cqWrite
=
writeToQueue
;
...
...
@@ -60,9 +59,19 @@ int main(int argc, char *argv[]) {
exit
(
-
1
);
}
SSchema
*
pSchema
=
NULL
;
SSchema
schema
[
2
];
schema
[
0
].
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
strcpy
(
schema
[
0
].
name
,
"ts"
);
schema
[
0
].
colId
=
0
;
schema
[
0
].
bytes
=
8
;
schema
[
1
].
type
=
TSDB_DATA_TYPE_INT
;
strcpy
(
schema
[
1
].
name
,
"avgspeed"
);
schema
[
1
].
colId
=
1
;
schema
[
1
].
bytes
=
4
;
for
(
int
sid
=
1
;
sid
<
10
;
++
sid
)
{
cqCreate
(
pCq
,
1
,
"select avg(speed) from t1 sliding(1s) interval(5s)"
,
pS
chema
,
2
);
cqCreate
(
pCq
,
sid
,
"select avg(speed) from demo.t1 sliding(1s) interval(5s)"
,
s
chema
,
2
);
}
while
(
1
)
{
...
...
@@ -83,6 +92,8 @@ int main(int argc, char *argv[]) {
break
;
case
'q'
:
break
;
default:
printf
(
"invalid command:%c"
,
c
);
}
if
(
c
==
'q'
)
break
;
...
...
@@ -90,5 +101,7 @@ int main(int argc, char *argv[]) {
cqClose
(
pCq
);
taosCloseLog
();
return
0
;
}
src/inc/tcq.h
浏览文件 @
27fab2cf
...
...
@@ -24,18 +24,26 @@ typedef int (*FCqWrite)(void *ahandle, void *pHead, int type);
typedef
struct
{
int
vgId
;
char
path
[
TSDB_FILENAME_LEN
];
char
user
[
TSDB_USER_LEN
];
char
pass
[
TSDB_PASSWORD_LEN
];
FCqWrite
cqWrite
;
}
SCqCfg
;
// the following API shall be called by vnode
void
*
cqOpen
(
void
*
ahandle
,
const
SCqCfg
*
pCfg
);
void
cqClose
(
void
*
handle
);
// if vnode is master, vnode call this API to start CQ
void
cqStart
(
void
*
handle
);
// if vnode is slave/unsynced, vnode shall call this API to stop CQ
void
cqStop
(
void
*
handle
);
void
cqCreate
(
void
*
handle
,
int
sid
,
char
*
sqlStr
,
SSchema
*
pSchema
,
int
columns
);
void
cqDrop
(
void
*
handle
,
int
sid
);
// cqCreate is called by TSDB to start an instance of CQ
void
*
cqCreate
(
void
*
handle
,
int
sid
,
char
*
sqlStr
,
SSchema
*
pSchema
,
int
columns
);
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
void
cqDrop
(
void
*
handle
);
extern
int
cqDebugFlag
;
...
...
src/inc/tsdb.h
浏览文件 @
27fab2cf
...
...
@@ -38,9 +38,9 @@ extern "C" {
typedef
struct
{
// WAL handle
void
*
appH
;
void
*
cqH
;
int
(
*
walCallBack
)(
void
*
);
int
(
*
eventCallBack
)(
void
*
);
int
(
*
cqueryCallBack
)(
void
*
);
}
STsdbAppH
;
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
27fab2cf
...
...
@@ -194,9 +194,16 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode
->
wqueue
=
dnodeAllocateWqueue
(
pVnode
);
pVnode
->
rqueue
=
dnodeAllocateRqueue
(
pVnode
);
SCqCfg
cqCfg
;
sprintf
(
cqCfg
.
user
,
"root"
);
strcpy
(
cqCfg
.
pass
,
tsInternalPass
);
cqCfg
.
cqWrite
=
vnodeWriteToQueue
;
pVnode
->
cq
=
cqOpen
(
pVnode
,
&
cqCfg
);
STsdbAppH
appH
=
{
0
};
appH
.
appH
=
(
void
*
)
pVnode
;
appH
.
walCallBack
=
vnodeWalCallback
;
appH
.
cqH
=
pVnode
->
cq
;
sprintf
(
temp
,
"%s/tsdb"
,
rootDir
);
pVnode
->
tsdb
=
tsdbOpenRepo
(
temp
,
&
appH
);
...
...
@@ -210,12 +217,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode
->
wal
=
walOpen
(
temp
,
&
pVnode
->
walCfg
);
walRestore
(
pVnode
->
wal
,
pVnode
,
vnodeWriteToQueue
);
SCqCfg
cqCfg
;
sprintf
(
cqCfg
.
path
,
"%s/cq"
,
rootDir
);
strcpy
(
cqCfg
.
pass
,
tsInternalPass
);
cqCfg
.
cqWrite
=
vnodeWriteToQueue
;
pVnode
->
cq
=
cqOpen
(
pVnode
,
&
cqCfg
);
SSyncInfo
syncInfo
;
syncInfo
.
vgId
=
pVnode
->
vgId
;
syncInfo
.
version
=
pVnode
->
version
;
...
...
src/vnode/src/vnodeWrite.c
浏览文件 @
27fab2cf
...
...
@@ -114,7 +114,6 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
int16_t
numOfColumns
=
htons
(
pTable
->
numOfColumns
);
int16_t
numOfTags
=
htons
(
pTable
->
numOfTags
);
int32_t
sid
=
htonl
(
pTable
->
sid
);
int32_t
sqlDataLen
=
htonl
(
pTable
->
sqlDataLen
);
uint64_t
uid
=
htobe64
(
pTable
->
uid
);
SSchema
*
pSchema
=
(
SSchema
*
)
pTable
->
data
;
...
...
@@ -151,14 +150,6 @@ static int32_t vnodeProcessCreateTableMsg(SVnodeObj *pVnode, void *pCont, SRspRe
}
code
=
tsdbCreateTable
(
pVnode
->
tsdb
,
&
tCfg
);
if
(
code
==
0
&&
sqlDataLen
>
0
)
{
char
*
sqlStr
=
NULL
;
// to do: get the sqlStr
cqCreate
(
pVnode
->
cq
,
sid
,
sqlStr
,
pSchema
,
numOfColumns
);
}
tfree
(
pDestSchema
);
dTrace
(
"pVnode:%p vgId:%d, table:%s is created, result:%x"
,
pVnode
,
pVnode
->
vgId
,
pTable
->
tableId
,
code
);
...
...
@@ -176,7 +167,6 @@ static int32_t vnodeProcessDropTableMsg(SVnodeObj *pVnode, void *pCont, SRspRet
};
code
=
tsdbDropTable
(
pVnode
->
tsdb
,
tableId
);
cqDrop
(
pVnode
->
cq
,
tableId
.
tid
);
return
code
;
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录