Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
e1980381
TDengine
项目概览
taosdata
/
TDengine
1 年多 前同步成功
通知
1185
Star
22016
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
e1980381
编写于
5月 29, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
5月 29, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #2068 from taosdata/feature/2.0tsdb
TD-354
上级
6a7da09d
1acff35e
变更
8
显示空白变更内容
内联
并排
Showing
8 changed file
with
58 addition
and
44 deletion
+58
-44
src/cq/src/cqMain.c
src/cq/src/cqMain.c
+16
-19
src/cq/test/cqtest.c
src/cq/test/cqtest.c
+6
-11
src/inc/tcq.h
src/inc/tcq.h
+2
-1
src/inc/tsdb.h
src/inc/tsdb.h
+3
-1
src/tsdb/inc/tsdbMain.h
src/tsdb/inc/tsdbMain.h
+3
-1
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+4
-4
src/tsdb/src/tsdbMeta.c
src/tsdb/src/tsdbMeta.c
+16
-3
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+8
-4
未找到文件。
src/cq/src/cqMain.c
浏览文件 @
e1980381
...
...
@@ -15,17 +15,19 @@
#define _DEFAULT_SOURCE
#include <errno.h>
#include <pthread.h>
#include <stdlib.h>
#include <string.h>
#include <pthread.h>
#include
<errno.h>
#include
"taos.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "tcq.h"
#include "tdataformat.h"
#include "tglobal.h"
#include "tlog.h"
#include "twal.h"
#include "tcq.h"
#include "taos.h"
#define cError(...) { if (cqDebugFlag & DEBUG_ERROR) { taosPrintLog("ERROR CQ ", cqDebugFlag, __VA_ARGS__); }}
#define cWarn(...) { if (cqDebugFlag & DEBUG_WARN) { taosPrintLog("WARN CQ ", cqDebugFlag, __VA_ARGS__); }}
...
...
@@ -48,13 +50,12 @@ typedef struct {
typedef
struct
SCqObj
{
int
tid
;
// table ID
int
rowSize
;
// bytes of a row
char
*
sqlStr
;
// SQL string
int
columns
;
// number of columns
SSchema
*
pSchema
;
// pointer to schema array
void
*
pStream
;
char
*
sqlStr
;
// SQL string
STSchema
*
pSchema
;
// pointer to schema array
void
*
pStream
;
struct
SCqObj
*
prev
;
struct
SCqObj
*
next
;
SCqContext
*
pContext
;
SCqContext
*
pContext
;
}
SCqObj
;
int
cqDebugFlag
=
135
;
...
...
@@ -152,7 +153,7 @@ void cqStop(void *handle) {
pthread_mutex_unlock
(
&
pContext
->
mutex
);
}
void
*
cqCreate
(
void
*
handle
,
int
tid
,
char
*
sqlStr
,
S
Schema
*
pSchema
,
int
columns
)
{
void
*
cqCreate
(
void
*
handle
,
int
tid
,
char
*
sqlStr
,
S
TSchema
*
pSchema
)
{
SCqContext
*
pContext
=
handle
;
SCqObj
*
pObj
=
calloc
(
sizeof
(
SCqObj
),
1
);
...
...
@@ -162,11 +163,7 @@ void *cqCreate(void *handle, int tid, char *sqlStr, SSchema *pSchema, int column
pObj
->
sqlStr
=
malloc
(
strlen
(
sqlStr
)
+
1
);
strcpy
(
pObj
->
sqlStr
,
sqlStr
);
pObj
->
columns
=
columns
;
int
size
=
sizeof
(
SSchema
)
*
columns
;
pObj
->
pSchema
=
malloc
(
size
);
memcpy
(
pObj
->
pSchema
,
pSchema
,
size
);
pObj
->
pSchema
=
tdDupSchema
(
pSchema
);
cTrace
(
"vgId:%d, id:%d CQ:%s is created"
,
pContext
->
vgId
,
pObj
->
tid
,
pObj
->
sqlStr
);
...
...
src/cq/test/cqtest.c
浏览文件 @
e1980381
...
...
@@ -59,21 +59,16 @@ int main(int argc, char *argv[]) {
exit
(
-
1
);
}
SSchema
schema
[
2
];
schema
[
0
].
type
=
TSDB_DATA_TYPE_TIMESTAMP
;
strcpy
(
schema
[
0
].
name
,
"ts"
);
schema
[
0
].
colId
=
0
;
schema
[
0
].
bytes
=
8
;
schema
[
1
].
type
=
TSDB_DATA_TYPE_INT
;
strcpy
(
schema
[
1
].
name
,
"avgspeed"
);
schema
[
1
].
colId
=
1
;
schema
[
1
].
bytes
=
4
;
STSchema
*
pSchema
=
tdNewSchema
(
2
);
tdSchemaAddCol
(
pSchema
,
TSDB_DATA_TYPE_TIMESTAMP
,
0
,
8
);
tdSchemaAddCol
(
pSchema
,
TSDB_DATA_TYPE_INT
,
1
,
4
);
for
(
int
sid
=
1
;
sid
<
10
;
++
sid
)
{
cqCreate
(
pCq
,
sid
,
"select avg(speed) from demo.t1 sliding(1s) interval(5s)"
,
schema
,
2
);
cqCreate
(
pCq
,
sid
,
"select avg(speed) from demo.t1 sliding(1s) interval(5s)"
,
pSchema
);
}
tdFreeSchema
(
pSchema
);
while
(
1
)
{
char
c
=
getchar
();
...
...
src/inc/tcq.h
浏览文件 @
e1980381
...
...
@@ -19,6 +19,7 @@
extern
"C"
{
#endif
#include "tdataformat.h"
typedef
int
(
*
FCqWrite
)(
void
*
ahandle
,
void
*
pHead
,
int
type
);
...
...
@@ -40,7 +41,7 @@ void cqStart(void *handle);
void
cqStop
(
void
*
handle
);
// cqCreate is called by TSDB to start an instance of CQ
void
*
cqCreate
(
void
*
handle
,
int
sid
,
char
*
sqlStr
,
S
Schema
*
pSchema
,
int
columns
);
void
*
cqCreate
(
void
*
handle
,
int
sid
,
char
*
sqlStr
,
S
TSchema
*
pSchema
);
// cqDrop is called by TSDB to stop an instance of CQ, handle is the return value of cqCreate
void
cqDrop
(
void
*
handle
);
...
...
src/inc/tsdb.h
浏览文件 @
e1980381
...
...
@@ -43,6 +43,8 @@ typedef struct {
void
*
cqH
;
int
(
*
notifyStatus
)(
void
*
,
int
status
);
int
(
*
eventCallBack
)(
void
*
);
void
*
(
*
cqCreateFunc
)(
void
*
handle
,
int
sid
,
char
*
sqlStr
,
STSchema
*
pSchema
);
void
(
*
cqDropFunc
)(
void
*
handle
);
}
STsdbAppH
;
// --------- TSDB REPOSITORY CONFIGURATION DEFINITION
...
...
@@ -71,7 +73,7 @@ typedef void TsdbRepoT; // use void to hide implementation details from outside
int
tsdbCreateRepo
(
char
*
rootDir
,
STsdbCfg
*
pCfg
,
void
*
limiter
);
int32_t
tsdbDropRepo
(
TsdbRepoT
*
repo
);
TsdbRepoT
*
tsdbOpenRepo
(
char
*
tsdb
Dir
,
STsdbAppH
*
pAppH
);
TsdbRepoT
*
tsdbOpenRepo
(
char
*
root
Dir
,
STsdbAppH
*
pAppH
);
int32_t
tsdbCloseRepo
(
TsdbRepoT
*
repo
,
int
toCommit
);
int32_t
tsdbConfigRepo
(
TsdbRepoT
*
repo
,
STsdbCfg
*
pCfg
);
...
...
src/tsdb/inc/tsdbMain.h
浏览文件 @
e1980381
...
...
@@ -87,6 +87,7 @@ typedef struct STable {
struct
STable
*
prev
;
tstr
*
name
;
// NOTE: there a flexible string here
char
*
sql
;
void
*
cqhandle
;
}
STable
;
#define TSDB_GET_TABLE_LAST_KEY(tb) ((tb)->lastKey)
...
...
@@ -110,6 +111,7 @@ typedef struct {
SMetaFile
*
mfh
;
// meta file handle
int
maxRowBytes
;
int
maxCols
;
void
*
pRepo
;
}
STsdbMeta
;
// element put in skiplist for each table
...
...
@@ -118,7 +120,7 @@ typedef struct STableIndexElem {
STable
*
pTable
;
}
STableIndexElem
;
STsdbMeta
*
tsdbInitMeta
(
char
*
rootDir
,
int32_t
maxTables
);
STsdbMeta
*
tsdbInitMeta
(
char
*
rootDir
,
int32_t
maxTables
,
void
*
pRepo
);
int32_t
tsdbFreeMeta
(
STsdbMeta
*
pMeta
);
STSchema
*
tsdbGetTableSchema
(
STsdbMeta
*
pMeta
,
STable
*
pTable
);
STSchema
*
tsdbGetTableTagSchema
(
STsdbMeta
*
pMeta
,
STable
*
pTable
);
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
e1980381
...
...
@@ -189,9 +189,9 @@ _err:
*
* @return a TSDB repository handle on success, NULL for failure and the error number is set
*/
TsdbRepoT
*
tsdbOpenRepo
(
char
*
tsdb
Dir
,
STsdbAppH
*
pAppH
)
{
TsdbRepoT
*
tsdbOpenRepo
(
char
*
root
Dir
,
STsdbAppH
*
pAppH
)
{
char
dataDir
[
128
]
=
"
\0
"
;
if
(
access
(
tsdb
Dir
,
F_OK
|
W_OK
|
R_OK
)
<
0
)
{
if
(
access
(
root
Dir
,
F_OK
|
W_OK
|
R_OK
)
<
0
)
{
return
NULL
;
}
...
...
@@ -200,12 +200,12 @@ TsdbRepoT *tsdbOpenRepo(char *tsdbDir, STsdbAppH *pAppH) {
return
NULL
;
}
pRepo
->
rootDir
=
strdup
(
tsdb
Dir
);
pRepo
->
rootDir
=
strdup
(
root
Dir
);
tsdbRestoreCfg
(
pRepo
,
&
(
pRepo
->
config
));
if
(
pAppH
)
pRepo
->
appH
=
*
pAppH
;
pRepo
->
tsdbMeta
=
tsdbInitMeta
(
tsdbDir
,
pRepo
->
config
.
maxTables
);
pRepo
->
tsdbMeta
=
tsdbInitMeta
(
rootDir
,
pRepo
->
config
.
maxTables
,
pRepo
);
if
(
pRepo
->
tsdbMeta
==
NULL
)
{
free
(
pRepo
->
rootDir
);
free
(
pRepo
);
...
...
src/tsdb/src/tsdbMeta.c
浏览文件 @
e1980381
...
...
@@ -142,6 +142,7 @@ int tsdbRestoreTable(void *pHandle, void *cont, int contLen) {
void
tsdbOrgMeta
(
void
*
pHandle
)
{
STsdbMeta
*
pMeta
=
(
STsdbMeta
*
)
pHandle
;
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
pMeta
->
pRepo
;
for
(
int
i
=
1
;
i
<
pMeta
->
maxTables
;
i
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
i
];
...
...
@@ -149,13 +150,20 @@ void tsdbOrgMeta(void *pHandle) {
tsdbAddTableIntoIndex
(
pMeta
,
pTable
);
}
}
for
(
int
i
=
0
;
i
<
pMeta
->
maxTables
;
i
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
i
];
if
(
pTable
&&
pTable
->
type
==
TSDB_STREAM_TABLE
)
{
pTable
->
cqhandle
=
(
*
pRepo
->
appH
.
cqCreateFunc
)(
pRepo
->
appH
.
cqH
,
i
,
pTable
->
sql
,
tsdbGetTableSchema
(
pMeta
,
pTable
));
}
}
}
/**
* Initialize the meta handle
* ASSUMPTIONS: VALID PARAMETER
*/
STsdbMeta
*
tsdbInitMeta
(
char
*
rootDir
,
int32_t
maxTables
)
{
STsdbMeta
*
tsdbInitMeta
(
char
*
rootDir
,
int32_t
maxTables
,
void
*
pRepo
)
{
STsdbMeta
*
pMeta
=
(
STsdbMeta
*
)
malloc
(
sizeof
(
STsdbMeta
));
if
(
pMeta
==
NULL
)
return
NULL
;
...
...
@@ -165,6 +173,7 @@ STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) {
pMeta
->
tables
=
(
STable
**
)
calloc
(
maxTables
,
sizeof
(
STable
*
));
pMeta
->
maxRowBytes
=
0
;
pMeta
->
maxCols
=
0
;
pMeta
->
pRepo
=
pRepo
;
if
(
pMeta
->
tables
==
NULL
)
{
free
(
pMeta
);
return
NULL
;
...
...
@@ -189,13 +198,16 @@ STsdbMeta *tsdbInitMeta(char *rootDir, int32_t maxTables) {
}
int32_t
tsdbFreeMeta
(
STsdbMeta
*
pMeta
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
pMeta
->
pRepo
;
if
(
pMeta
==
NULL
)
return
0
;
tsdbCloseMetaFile
(
pMeta
->
mfh
);
for
(
int
i
=
1
;
i
<
pMeta
->
maxTables
;
i
++
)
{
if
(
pMeta
->
tables
[
i
]
!=
NULL
)
{
tsdbFreeTable
(
pMeta
->
tables
[
i
]);
STable
*
pTable
=
pMeta
->
tables
[
i
];
if
(
pTable
->
type
==
TSDB_STREAM_TABLE
)
(
*
pRepo
->
appH
.
cqDropFunc
)(
pTable
->
cqhandle
);
tsdbFreeTable
(
pTable
);
}
}
...
...
@@ -512,6 +524,7 @@ STable *tsdbGetTableByUid(STsdbMeta *pMeta, uint64_t uid) {
}
static
int
tsdbAddTableToMeta
(
STsdbMeta
*
pMeta
,
STable
*
pTable
,
bool
addIdx
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
pMeta
->
pRepo
;
if
(
pTable
->
type
==
TSDB_SUPER_TABLE
)
{
// add super table to the linked list
if
(
pMeta
->
superList
==
NULL
)
{
...
...
@@ -531,7 +544,7 @@ static int tsdbAddTableToMeta(STsdbMeta *pMeta, STable *pTable, bool addIdx) {
tsdbAddTableIntoIndex
(
pMeta
,
pTable
);
}
if
(
pTable
->
type
==
TSDB_STREAM_TABLE
&&
addIdx
)
{
// TODO
pTable
->
cqhandle
=
(
*
pRepo
->
appH
.
cqCreateFunc
)(
pRepo
->
appH
.
cqH
,
pTable
->
tableId
.
tid
,
pTable
->
sql
,
tsdbGetTableSchema
(
pMeta
,
pTable
));
}
pMeta
->
nTables
++
;
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
e1980381
...
...
@@ -220,6 +220,8 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
appH
.
appH
=
(
void
*
)
pVnode
;
appH
.
notifyStatus
=
vnodeProcessTsdbStatus
;
appH
.
cqH
=
pVnode
->
cq
;
appH
.
cqCreateFunc
=
cqCreate
;
appH
.
cqDropFunc
=
cqDrop
;
sprintf
(
temp
,
"%s/tsdb"
,
rootDir
);
pVnode
->
tsdb
=
tsdbOpenRepo
(
temp
,
&
appH
);
if
(
pVnode
->
tsdb
==
NULL
)
{
...
...
@@ -391,14 +393,14 @@ static void vnodeCleanUp(SVnodeObj *pVnode) {
pVnode
->
sync
=
NULL
;
}
if
(
pVnode
->
wal
)
walClose
(
pVnode
->
wal
);
pVnode
->
wal
=
NULL
;
if
(
pVnode
->
tsdb
)
tsdbCloseRepo
(
pVnode
->
tsdb
,
1
);
pVnode
->
tsdb
=
NULL
;
if
(
pVnode
->
wal
)
walClose
(
pVnode
->
wal
);
pVnode
->
wal
=
NULL
;
if
(
pVnode
->
cq
)
cqClose
(
pVnode
->
cq
);
pVnode
->
cq
=
NULL
;
...
...
@@ -467,6 +469,8 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
appH
.
appH
=
(
void
*
)
pVnode
;
appH
.
notifyStatus
=
vnodeProcessTsdbStatus
;
appH
.
cqH
=
pVnode
->
cq
;
appH
.
cqCreateFunc
=
cqCreate
;
appH
.
cqDropFunc
=
cqDrop
;
pVnode
->
tsdb
=
tsdbOpenRepo
(
rootDir
,
&
appH
);
}
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录