Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
4c9c2662
T
TDengine
项目概览
taosdata
/
TDengine
大约 1 年 前同步成功
通知
1185
Star
22015
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
4c9c2662
编写于
7月 16, 2020
作者:
S
Shengliang Guan
提交者:
GitHub
7月 16, 2020
浏览文件
操作
浏览文件
下载
差异文件
Merge pull request #2684 from taosdata/patch/cqstart
defer cq creation
上级
5931be60
b87f9ae0
变更
10
隐藏空白更改
内联
并排
Showing
10 changed file
with
114 addition
and
92 deletion
+114
-92
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+1
-1
src/client/src/tscStream.c
src/client/src/tscStream.c
+50
-35
src/cq/src/cqMain.c
src/cq/src/cqMain.c
+34
-8
src/dnode/inc/dnodeModule.h
src/dnode/inc/dnodeModule.h
+0
-1
src/dnode/src/dnodeMain.c
src/dnode/src/dnodeMain.c
+0
-1
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+0
-17
src/inc/tsdb.h
src/inc/tsdb.h
+0
-1
src/inc/vnode.h
src/inc/vnode.h
+0
-1
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+29
-14
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+0
-13
未找到文件。
src/client/src/tscAsync.c
浏览文件 @
4c9c2662
...
@@ -564,8 +564,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
...
@@ -564,8 +564,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscDebug
(
"%p stream:%p meta is updated, start new query, command:%d"
,
pSql
,
pSql
->
pStream
,
pSql
->
cmd
.
command
);
tscDebug
(
"%p stream:%p meta is updated, start new query, command:%d"
,
pSql
,
pSql
->
pStream
,
pSql
->
cmd
.
command
);
if
(
!
pSql
->
cmd
.
parseFinished
)
{
if
(
!
pSql
->
cmd
.
parseFinished
)
{
tsParseSql
(
pSql
,
false
);
tsParseSql
(
pSql
,
false
);
sem_post
(
&
pSql
->
rspSem
);
}
}
(
*
pSql
->
fp
)(
pSql
->
param
,
pSql
,
code
);
return
;
return
;
}
}
...
...
src/client/src/tscStream.c
浏览文件 @
4c9c2662
...
@@ -70,6 +70,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
...
@@ -70,6 +70,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
SSqlObj
*
pSql
=
pStream
->
pSql
;
SSqlObj
*
pSql
=
pStream
->
pSql
;
pSql
->
fp
=
tscProcessStreamQueryCallback
;
pSql
->
fp
=
tscProcessStreamQueryCallback
;
pSql
->
fetchFp
=
tscProcessStreamQueryCallback
;
pSql
->
param
=
pStream
;
pSql
->
param
=
pStream
;
pSql
->
res
.
completed
=
false
;
pSql
->
res
.
completed
=
false
;
...
@@ -471,6 +472,41 @@ static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) {
...
@@ -471,6 +472,41 @@ static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) {
}
}
}
}
static
void
tscCreateStream
(
void
*
param
,
TAOS_RES
*
res
,
int
code
)
{
SSqlStream
*
pStream
=
(
SSqlStream
*
)
param
;
SSqlObj
*
pSql
=
pStream
->
pSql
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
setErrorInfo
(
pSql
,
code
,
pCmd
->
payload
);
tscError
(
"%p open stream failed, sql:%s, reason:%s, code:0x%08x"
,
pSql
,
pSql
->
sqlstr
,
pCmd
->
payload
,
code
);
pStream
->
fp
(
pStream
->
param
,
NULL
,
NULL
);
return
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMetaInfo
->
pTableMeta
);
pStream
->
isProject
=
isProjectStream
(
pQueryInfo
);
pStream
->
precision
=
tinfo
.
precision
;
pStream
->
ctime
=
taosGetTimestamp
(
pStream
->
precision
);
pStream
->
etime
=
pQueryInfo
->
window
.
ekey
;
tscAddIntoStreamList
(
pStream
);
tscSetSlidingWindowInfo
(
pSql
,
pStream
);
pStream
->
stime
=
tscGetStreamStartTimestamp
(
pSql
,
pStream
,
pStream
->
stime
);
int64_t
starttime
=
tscGetLaunchTimestamp
(
pStream
);
pCmd
->
command
=
TSDB_SQL_SELECT
;
taosTmrReset
(
tscProcessStreamTimer
,
starttime
,
pStream
,
tscTmr
,
&
pStream
->
pTimer
);
tscDebug
(
"%p stream:%p is opened, query on:%s, interval:%"
PRId64
", sliding:%"
PRId64
", first launched in:%"
PRId64
", sql:%s"
,
pSql
,
pStream
,
pTableMetaInfo
->
name
,
pStream
->
interval
,
pStream
->
slidingTime
,
starttime
,
pSql
->
sqlstr
);
}
TAOS_STREAM
*
taos_open_stream
(
TAOS
*
taos
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
TAOS_STREAM
*
taos_open_stream
(
TAOS
*
taos
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
int64_t
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
))
{
int64_t
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
))
{
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
...
@@ -482,7 +518,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
...
@@ -482,7 +518,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
}
}
pSql
->
signature
=
pSql
;
pSql
->
signature
=
pSql
;
pSql
->
param
=
pSql
;
pSql
->
pTscObj
=
pObj
;
pSql
->
pTscObj
=
pObj
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
...
@@ -494,7 +529,14 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
...
@@ -494,7 +529,14 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
tscFreeSqlObj
(
pSql
);
tscFreeSqlObj
(
pSql
);
return
NULL
;
return
NULL
;
}
}
pStream
->
stime
=
stime
;
pStream
->
fp
=
fp
;
pStream
->
callback
=
callback
;
pStream
->
param
=
param
;
pStream
->
pSql
=
pSql
;
pSql
->
pStream
=
pStream
;
pSql
->
pStream
=
pStream
;
pSql
->
param
=
pStream
;
pSql
->
sqlstr
=
calloc
(
1
,
strlen
(
sqlstr
)
+
1
);
pSql
->
sqlstr
=
calloc
(
1
,
strlen
(
sqlstr
)
+
1
);
if
(
pSql
->
sqlstr
==
NULL
)
{
if
(
pSql
->
sqlstr
==
NULL
)
{
...
@@ -507,45 +549,18 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
...
@@ -507,45 +549,18 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
tscDebugL
(
"%p SQL: %s"
,
pSql
,
pSql
->
sqlstr
);
tscDebugL
(
"%p SQL: %s"
,
pSql
,
pSql
->
sqlstr
);
tsem_init
(
&
pSql
->
rspSem
,
0
,
0
);
tsem_init
(
&
pSql
->
rspSem
,
0
,
0
);
pSql
->
fp
=
tscCreateStream
;
pSql
->
fetchFp
=
tscCreateStream
;
int32_t
code
=
tsParseSql
(
pSql
,
true
);
int32_t
code
=
tsParseSql
(
pSql
,
true
);
if
(
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
if
(
code
==
TSDB_CODE_SUCCESS
)
{
sem_wait
(
&
pSql
->
rspSem
);
tscCreateStream
(
pStream
,
pSql
,
code
);
}
}
else
if
(
code
!=
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
tscError
(
"%p open stream failed, sql:%s, code:%s"
,
pSql
,
sqlstr
,
tstrerror
(
pRes
->
code
));
if
(
pRes
->
code
!=
TSDB_CODE_SUCCESS
)
{
setErrorInfo
(
pSql
,
pRes
->
code
,
pCmd
->
payload
);
tscError
(
"%p open stream failed, sql:%s, reason:%s, code:0x%08x"
,
pSql
,
sqlstr
,
pCmd
->
payload
,
pRes
->
code
);
tscFreeSqlObj
(
pSql
);
tscFreeSqlObj
(
pSql
);
free
(
pStream
);
return
NULL
;
return
NULL
;
}
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMetaInfo
->
pTableMeta
);
pStream
->
isProject
=
isProjectStream
(
pQueryInfo
);
pStream
->
fp
=
fp
;
pStream
->
callback
=
callback
;
pStream
->
param
=
param
;
pStream
->
pSql
=
pSql
;
pStream
->
precision
=
tinfo
.
precision
;
pStream
->
ctime
=
taosGetTimestamp
(
pStream
->
precision
);
pStream
->
etime
=
pQueryInfo
->
window
.
ekey
;
tscAddIntoStreamList
(
pStream
);
tscSetSlidingWindowInfo
(
pSql
,
pStream
);
pStream
->
stime
=
tscGetStreamStartTimestamp
(
pSql
,
pStream
,
stime
);
int64_t
starttime
=
tscGetLaunchTimestamp
(
pStream
);
pCmd
->
command
=
TSDB_SQL_SELECT
;
taosTmrReset
(
tscProcessStreamTimer
,
starttime
,
pStream
,
tscTmr
,
&
pStream
->
pTimer
);
tscDebug
(
"%p stream:%p is opened, query on:%s, interval:%"
PRId64
", sliding:%"
PRId64
", first launched in:%"
PRId64
", sql:%s"
,
pSql
,
pStream
,
pTableMetaInfo
->
name
,
pStream
->
interval
,
pStream
->
slidingTime
,
starttime
,
sqlstr
);
return
pStream
;
return
pStream
;
}
}
...
...
src/cq/src/cqMain.c
浏览文件 @
4c9c2662
...
@@ -23,6 +23,7 @@
...
@@ -23,6 +23,7 @@
#include "taos.h"
#include "taos.h"
#include "taosdef.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "taosmsg.h"
#include "ttimer.h"
#include "tcq.h"
#include "tcq.h"
#include "tdataformat.h"
#include "tdataformat.h"
#include "tglobal.h"
#include "tglobal.h"
...
@@ -45,10 +46,12 @@ typedef struct {
...
@@ -45,10 +46,12 @@ typedef struct {
struct
SCqObj
*
pHead
;
struct
SCqObj
*
pHead
;
void
*
dbConn
;
void
*
dbConn
;
int
master
;
int
master
;
void
*
tmrCtrl
;
pthread_mutex_t
mutex
;
pthread_mutex_t
mutex
;
}
SCqContext
;
}
SCqContext
;
typedef
struct
SCqObj
{
typedef
struct
SCqObj
{
tmr_h
tmrId
;
uint64_t
uid
;
uint64_t
uid
;
int32_t
tid
;
// table ID
int32_t
tid
;
// table ID
int
rowSize
;
// bytes of a row
int
rowSize
;
// bytes of a row
...
@@ -66,13 +69,14 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
...
@@ -66,13 +69,14 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
static
void
cqCreateStream
(
SCqContext
*
pContext
,
SCqObj
*
pObj
);
static
void
cqCreateStream
(
SCqContext
*
pContext
,
SCqObj
*
pObj
);
void
*
cqOpen
(
void
*
ahandle
,
const
SCqCfg
*
pCfg
)
{
void
*
cqOpen
(
void
*
ahandle
,
const
SCqCfg
*
pCfg
)
{
SCqContext
*
pContext
=
calloc
(
sizeof
(
SCqContext
),
1
);
SCqContext
*
pContext
=
calloc
(
sizeof
(
SCqContext
),
1
);
if
(
pContext
==
NULL
)
{
if
(
pContext
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
return
NULL
;
}
}
pContext
->
tmrCtrl
=
taosTmrInit
(
0
,
0
,
0
,
"CQ"
);
tstrncpy
(
pContext
->
user
,
pCfg
->
user
,
sizeof
(
pContext
->
user
));
tstrncpy
(
pContext
->
user
,
pCfg
->
user
,
sizeof
(
pContext
->
user
));
tstrncpy
(
pContext
->
pass
,
pCfg
->
pass
,
sizeof
(
pContext
->
pass
));
tstrncpy
(
pContext
->
pass
,
pCfg
->
pass
,
sizeof
(
pContext
->
pass
));
const
char
*
db
=
pCfg
->
db
;
const
char
*
db
=
pCfg
->
db
;
...
@@ -99,6 +103,9 @@ void cqClose(void *handle) {
...
@@ -99,6 +103,9 @@ void cqClose(void *handle) {
SCqContext
*
pContext
=
handle
;
SCqContext
*
pContext
=
handle
;
if
(
handle
==
NULL
)
return
;
if
(
handle
==
NULL
)
return
;
taosTmrCleanUp
(
pContext
->
tmrCtrl
);
pContext
->
tmrCtrl
=
NULL
;
// stop all CQs
// stop all CQs
cqStop
(
pContext
);
cqStop
(
pContext
);
...
@@ -154,8 +161,10 @@ void cqStop(void *handle) {
...
@@ -154,8 +161,10 @@ void cqStop(void *handle) {
taos_close_stream
(
pObj
->
pStream
);
taos_close_stream
(
pObj
->
pStream
);
pObj
->
pStream
=
NULL
;
pObj
->
pStream
=
NULL
;
cTrace
(
"vgId:%d, id:%d CQ:%s is closed"
,
pContext
->
vgId
,
pObj
->
tid
,
pObj
->
sqlStr
);
cTrace
(
"vgId:%d, id:%d CQ:%s is closed"
,
pContext
->
vgId
,
pObj
->
tid
,
pObj
->
sqlStr
);
}
else
{
taosTmrStop
(
pObj
->
tmrId
);
pObj
->
tmrId
=
0
;
}
}
pObj
=
pObj
->
next
;
pObj
=
pObj
->
next
;
}
}
...
@@ -211,8 +220,13 @@ void cqDrop(void *handle) {
...
@@ -211,8 +220,13 @@ void cqDrop(void *handle) {
}
}
// free the resources associated
// free the resources associated
if
(
pObj
->
pStream
)
taos_close_stream
(
pObj
->
pStream
);
if
(
pObj
->
pStream
)
{
pObj
->
pStream
=
NULL
;
taos_close_stream
(
pObj
->
pStream
);
pObj
->
pStream
=
NULL
;
}
else
{
taosTmrStop
(
pObj
->
tmrId
);
pObj
->
tmrId
=
0
;
}
cTrace
(
"vgId:%d, id:%d CQ:%s is dropped"
,
pContext
->
vgId
,
pObj
->
tid
,
pObj
->
sqlStr
);
cTrace
(
"vgId:%d, id:%d CQ:%s is dropped"
,
pContext
->
vgId
,
pObj
->
tid
,
pObj
->
sqlStr
);
tdFreeSchema
(
pObj
->
pSchema
);
tdFreeSchema
(
pObj
->
pSchema
);
...
@@ -222,18 +236,30 @@ void cqDrop(void *handle) {
...
@@ -222,18 +236,30 @@ void cqDrop(void *handle) {
pthread_mutex_unlock
(
&
pContext
->
mutex
);
pthread_mutex_unlock
(
&
pContext
->
mutex
);
}
}
static
void
cqCreateStream
(
SCqContext
*
pContext
,
SCqObj
*
pObj
)
{
static
void
cqProcessCreateTimer
(
void
*
param
,
void
*
tmrId
)
{
SCqObj
*
pObj
=
(
SCqObj
*
)
param
;
SCqContext
*
pContext
=
pObj
->
pContext
;
if
(
pContext
->
dbConn
==
NULL
)
{
if
(
pContext
->
dbConn
==
NULL
)
{
pContext
->
dbConn
=
taos_connect
(
"localhost"
,
pContext
->
user
,
pContext
->
pass
,
pContext
->
db
,
0
);
pContext
->
dbConn
=
taos_connect
(
"localhost"
,
pContext
->
user
,
pContext
->
pass
,
pContext
->
db
,
0
);
if
(
pContext
->
dbConn
==
NULL
)
{
if
(
pContext
->
dbConn
==
NULL
)
{
cError
(
"vgId:%d, failed to connect to TDengine(%s)"
,
pContext
->
vgId
,
tstrerror
(
terrno
));
cError
(
"vgId:%d, failed to connect to TDengine(%s)"
,
pContext
->
vgId
,
tstrerror
(
terrno
));
return
;
}
}
}
}
cqCreateStream
(
pContext
,
pObj
);
}
int64_t
lastKey
=
0
;
static
void
cqCreateStream
(
SCqContext
*
pContext
,
SCqObj
*
pObj
)
{
pObj
->
pContext
=
pContext
;
pObj
->
pContext
=
pContext
;
pObj
->
pStream
=
taos_open_stream
(
pContext
->
dbConn
,
pObj
->
sqlStr
,
cqProcessStreamRes
,
lastKey
,
pObj
,
NULL
);
if
(
pContext
->
dbConn
==
NULL
)
{
pObj
->
tmrId
=
taosTmrStart
(
cqProcessCreateTimer
,
1000
,
pObj
,
pContext
->
tmrCtrl
);
return
;
}
pObj
->
tmrId
=
0
;
pObj
->
pStream
=
taos_open_stream
(
pContext
->
dbConn
,
pObj
->
sqlStr
,
cqProcessStreamRes
,
0
,
pObj
,
NULL
);
if
(
pObj
->
pStream
)
{
if
(
pObj
->
pStream
)
{
pContext
->
num
++
;
pContext
->
num
++
;
cTrace
(
"vgId:%d, id:%d CQ:%s is openned"
,
pContext
->
vgId
,
pObj
->
tid
,
pObj
->
sqlStr
);
cTrace
(
"vgId:%d, id:%d CQ:%s is openned"
,
pContext
->
vgId
,
pObj
->
tid
,
pObj
->
sqlStr
);
...
...
src/dnode/inc/dnodeModule.h
浏览文件 @
4c9c2662
...
@@ -22,7 +22,6 @@ extern "C" {
...
@@ -22,7 +22,6 @@ extern "C" {
int32_t
dnodeInitModules
();
int32_t
dnodeInitModules
();
void
dnodeStartModules
();
void
dnodeStartModules
();
void
dnodeStartStream
();
void
dnodeCleanupModules
();
void
dnodeCleanupModules
();
void
dnodeProcessModuleStatus
(
uint32_t
moduleStatus
);
void
dnodeProcessModuleStatus
(
uint32_t
moduleStatus
);
...
...
src/dnode/src/dnodeMain.c
浏览文件 @
4c9c2662
...
@@ -123,7 +123,6 @@ int32_t dnodeInitSystem() {
...
@@ -123,7 +123,6 @@ int32_t dnodeInitSystem() {
dnodeStartModules
();
dnodeStartModules
();
dnodeSetRunStatus
(
TSDB_DNODE_RUN_STATUS_RUNING
);
dnodeSetRunStatus
(
TSDB_DNODE_RUN_STATUS_RUNING
);
dnodeStartStream
();
dInfo
(
"TDengine is initialized successfully"
);
dInfo
(
"TDengine is initialized successfully"
);
...
...
src/dnode/src/dnodeMgmt.c
浏览文件 @
4c9c2662
...
@@ -354,23 +354,6 @@ static int32_t dnodeOpenVnodes() {
...
@@ -354,23 +354,6 @@ static int32_t dnodeOpenVnodes() {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
void
dnodeStartStream
()
{
int32_t
vnodeList
[
TSDB_MAX_VNODES
]
=
{
0
};
int32_t
numOfVnodes
=
0
;
int32_t
status
=
vnodeGetVnodeList
(
vnodeList
,
&
numOfVnodes
);
if
(
status
!=
TSDB_CODE_SUCCESS
)
{
dInfo
(
"get dnode list failed"
);
return
;
}
for
(
int32_t
i
=
0
;
i
<
numOfVnodes
;
++
i
)
{
vnodeStartStream
(
vnodeList
[
i
]);
}
dInfo
(
"streams started"
);
}
static
void
dnodeCloseVnodes
()
{
static
void
dnodeCloseVnodes
()
{
int32_t
vnodeList
[
TSDB_MAX_VNODES
]
=
{
0
};
int32_t
vnodeList
[
TSDB_MAX_VNODES
]
=
{
0
};
int32_t
numOfVnodes
=
0
;
int32_t
numOfVnodes
=
0
;
...
...
src/inc/tsdb.h
浏览文件 @
4c9c2662
...
@@ -117,7 +117,6 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
...
@@ -117,7 +117,6 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
int
tsdbDropTable
(
TSDB_REPO_T
*
pRepo
,
STableId
tableId
);
int
tsdbDropTable
(
TSDB_REPO_T
*
pRepo
,
STableId
tableId
);
int
tsdbUpdateTableTagValue
(
TSDB_REPO_T
*
repo
,
SUpdateTableTagValMsg
*
pMsg
);
int
tsdbUpdateTableTagValue
(
TSDB_REPO_T
*
repo
,
SUpdateTableTagValMsg
*
pMsg
);
TSKEY
tsdbGetTableLastKey
(
TSDB_REPO_T
*
repo
,
uint64_t
uid
);
TSKEY
tsdbGetTableLastKey
(
TSDB_REPO_T
*
repo
,
uint64_t
uid
);
void
tsdbStartStream
(
TSDB_REPO_T
*
repo
);
uint32_t
tsdbGetFileInfo
(
TSDB_REPO_T
*
repo
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int32_t
*
size
);
uint32_t
tsdbGetFileInfo
(
TSDB_REPO_T
*
repo
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int32_t
*
size
);
...
...
src/inc/vnode.h
浏览文件 @
4c9c2662
...
@@ -44,7 +44,6 @@ typedef struct {
...
@@ -44,7 +44,6 @@ typedef struct {
int32_t
vnodeCreate
(
SMDCreateVnodeMsg
*
pVnodeCfg
);
int32_t
vnodeCreate
(
SMDCreateVnodeMsg
*
pVnodeCfg
);
int32_t
vnodeDrop
(
int32_t
vgId
);
int32_t
vnodeDrop
(
int32_t
vgId
);
int32_t
vnodeOpen
(
int32_t
vgId
,
char
*
rootDir
);
int32_t
vnodeOpen
(
int32_t
vgId
,
char
*
rootDir
);
int32_t
vnodeStartStream
(
int32_t
vgId
);
int32_t
vnodeAlter
(
void
*
pVnode
,
SMDCreateVnodeMsg
*
pVnodeCfg
);
int32_t
vnodeAlter
(
void
*
pVnode
,
SMDCreateVnodeMsg
*
pVnodeCfg
);
int32_t
vnodeClose
(
int32_t
vgId
);
int32_t
vnodeClose
(
int32_t
vgId
);
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
4c9c2662
...
@@ -69,6 +69,8 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg);
...
@@ -69,6 +69,8 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg);
static
void
*
tsdbDecodeCfg
(
void
*
buf
,
STsdbCfg
*
pCfg
);
static
void
*
tsdbDecodeCfg
(
void
*
buf
,
STsdbCfg
*
pCfg
);
static
int
tsdbCheckTableSchema
(
STsdbRepo
*
pRepo
,
SSubmitBlk
*
pBlock
,
STable
*
pTable
);
static
int
tsdbCheckTableSchema
(
STsdbRepo
*
pRepo
,
SSubmitBlk
*
pBlock
,
STable
*
pTable
);
static
int
tsdbScanAndConvertSubmitMsg
(
STsdbRepo
*
pRepo
,
SSubmitMsg
*
pMsg
);
static
int
tsdbScanAndConvertSubmitMsg
(
STsdbRepo
*
pRepo
,
SSubmitMsg
*
pMsg
);
static
void
tsdbStartStream
(
STsdbRepo
*
pRepo
);
static
void
tsdbStopStream
(
STsdbRepo
*
pRepo
);
// Function declaration
// Function declaration
int32_t
tsdbCreateRepo
(
char
*
rootDir
,
STsdbCfg
*
pCfg
)
{
int32_t
tsdbCreateRepo
(
char
*
rootDir
,
STsdbCfg
*
pCfg
)
{
...
@@ -127,6 +129,7 @@ TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) {
...
@@ -127,6 +129,7 @@ TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) {
goto
_err
;
goto
_err
;
}
}
tsdbStartStream
(
pRepo
);
// pRepo->state = TSDB_REPO_STATE_ACTIVE;
// pRepo->state = TSDB_REPO_STATE_ACTIVE;
tsdbDebug
(
"vgId:%d open tsdb repository succeed!"
,
REPO_ID
(
pRepo
));
tsdbDebug
(
"vgId:%d open tsdb repository succeed!"
,
REPO_ID
(
pRepo
));
...
@@ -145,6 +148,8 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
...
@@ -145,6 +148,8 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
int
vgId
=
REPO_ID
(
pRepo
);
int
vgId
=
REPO_ID
(
pRepo
);
tsdbStopStream
(
repo
);
if
(
toCommit
)
{
if
(
toCommit
)
{
tsdbAsyncCommit
(
pRepo
);
tsdbAsyncCommit
(
pRepo
);
if
(
pRepo
->
commit
)
pthread_join
(
pRepo
->
commitThread
,
NULL
);
if
(
pRepo
->
commit
)
pthread_join
(
pRepo
->
commitThread
,
NULL
);
...
@@ -265,19 +270,6 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
...
@@ -265,19 +270,6 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
return
magic
;
return
magic
;
}
}
void
tsdbStartStream
(
TSDB_REPO_T
*
repo
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
for
(
int
i
=
0
;
i
<
pRepo
->
config
.
maxTables
;
i
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
i
];
if
(
pTable
&&
pTable
->
type
==
TSDB_STREAM_TABLE
)
{
pTable
->
cqhandle
=
(
*
pRepo
->
appH
.
cqCreateFunc
)(
pRepo
->
appH
.
cqH
,
TABLE_UID
(
pTable
),
TABLE_TID
(
pTable
),
pTable
->
sql
,
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
-
1
));
}
}
}
STsdbCfg
*
tsdbGetCfg
(
const
TSDB_REPO_T
*
repo
)
{
STsdbCfg
*
tsdbGetCfg
(
const
TSDB_REPO_T
*
repo
)
{
ASSERT
(
repo
!=
NULL
);
ASSERT
(
repo
!=
NULL
);
return
&
((
STsdbRepo
*
)
repo
)
->
config
;
return
&
((
STsdbRepo
*
)
repo
)
->
config
;
...
@@ -1120,4 +1112,27 @@ TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) {
...
@@ -1120,4 +1112,27 @@ TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) {
return TSDB_GET_TABLE_LAST_KEY(pTable);
return TSDB_GET_TABLE_LAST_KEY(pTable);
}
}
#endif
#endif
\ No newline at end of file
static
void
tsdbStartStream
(
STsdbRepo
*
pRepo
)
{
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
for
(
int
i
=
0
;
i
<
pRepo
->
config
.
maxTables
;
i
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
i
];
if
(
pTable
&&
pTable
->
type
==
TSDB_STREAM_TABLE
)
{
pTable
->
cqhandle
=
(
*
pRepo
->
appH
.
cqCreateFunc
)(
pRepo
->
appH
.
cqH
,
TABLE_UID
(
pTable
),
TABLE_TID
(
pTable
),
pTable
->
sql
,
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
-
1
));
}
}
}
static
void
tsdbStopStream
(
STsdbRepo
*
pRepo
)
{
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
for
(
int
i
=
0
;
i
<
pRepo
->
config
.
maxTables
;
i
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
i
];
if
(
pTable
&&
pTable
->
type
==
TSDB_STREAM_TABLE
)
{
(
*
pRepo
->
appH
.
cqDropFunc
)(
pTable
->
cqhandle
);
}
}
}
src/vnode/src/vnodeMain.c
浏览文件 @
4c9c2662
...
@@ -303,10 +303,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
...
@@ -303,10 +303,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
}
}
#endif
#endif
// start continuous query
if
(
pVnode
->
role
==
TAOS_SYNC_ROLE_MASTER
)
cqStart
(
pVnode
->
cq
);
pVnode
->
qMgmt
=
qOpenQueryMgmt
(
pVnode
->
vgId
);
pVnode
->
qMgmt
=
qOpenQueryMgmt
(
pVnode
->
vgId
);
pVnode
->
events
=
NULL
;
pVnode
->
events
=
NULL
;
pVnode
->
status
=
TAOS_VN_STATUS_READY
;
pVnode
->
status
=
TAOS_VN_STATUS_READY
;
...
@@ -317,15 +313,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
...
@@ -317,15 +313,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
}
int32_t
vnodeStartStream
(
int32_t
vnode
)
{
SVnodeObj
*
pVnode
=
vnodeAcquireVnode
(
vnode
);
if
(
pVnode
!=
NULL
)
{
tsdbStartStream
(
pVnode
->
tsdb
);
vnodeRelease
(
pVnode
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
vnodeClose
(
int32_t
vgId
)
{
int32_t
vnodeClose
(
int32_t
vgId
)
{
SVnodeObj
**
ppVnode
=
(
SVnodeObj
**
)
taosHashGet
(
tsDnodeVnodesHash
,
(
const
char
*
)
&
vgId
,
sizeof
(
int32_t
));
SVnodeObj
**
ppVnode
=
(
SVnodeObj
**
)
taosHashGet
(
tsDnodeVnodesHash
,
(
const
char
*
)
&
vgId
,
sizeof
(
int32_t
));
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
return
0
;
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
return
0
;
...
...
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录