Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
6b984059
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
6b984059
编写于
7月 17, 2020
作者:
H
Hongze Cheng
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into feature/2.0tsdb
上级
857788d4
342fa888
变更
33
隐藏空白更改
内联
并排
Showing
33 changed file
with
715 addition
and
212 deletion
+715
-212
src/client/src/tscAsync.c
src/client/src/tscAsync.c
+2
-2
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+16
-13
src/client/src/tscSql.c
src/client/src/tscSql.c
+23
-6
src/client/src/tscStream.c
src/client/src/tscStream.c
+50
-35
src/common/src/tdataformat.c
src/common/src/tdataformat.c
+1
-1
src/cq/src/cqMain.c
src/cq/src/cqMain.c
+34
-8
src/dnode/inc/dnodeModule.h
src/dnode/inc/dnodeModule.h
+0
-1
src/dnode/src/dnodeMain.c
src/dnode/src/dnodeMain.c
+0
-1
src/dnode/src/dnodeMgmt.c
src/dnode/src/dnodeMgmt.c
+3
-19
src/dnode/src/dnodeVRead.c
src/dnode/src/dnodeVRead.c
+2
-16
src/dnode/src/dnodeVWrite.c
src/dnode/src/dnodeVWrite.c
+1
-1
src/inc/dnode.h
src/inc/dnode.h
+0
-1
src/inc/taosmsg.h
src/inc/taosmsg.h
+4
-4
src/inc/tsdb.h
src/inc/tsdb.h
+0
-1
src/inc/tsync.h
src/inc/tsync.h
+1
-1
src/inc/vnode.h
src/inc/vnode.h
+6
-10
src/mnode/src/mnodeDnode.c
src/mnode/src/mnodeDnode.c
+27
-7
src/mnode/src/mnodeVgroup.c
src/mnode/src/mnodeVgroup.c
+9
-2
src/tsdb/src/tsdbMain.c
src/tsdb/src/tsdbMain.c
+29
-14
src/util/inc/tutil.h
src/util/inc/tutil.h
+2
-0
src/util/src/tsocket.c
src/util/src/tsocket.c
+8
-0
src/util/src/tutil.c
src/util/src/tutil.c
+26
-0
src/vnode/inc/vnodeInt.h
src/vnode/inc/vnodeInt.h
+3
-1
src/vnode/src/vnodeMain.c
src/vnode/src/vnodeMain.c
+88
-64
src/vnode/src/vnodeRead.c
src/vnode/src/vnodeRead.c
+18
-2
src/vnode/src/vnodeWrite.c
src/vnode/src/vnodeWrite.c
+6
-1
tests/pytest/fulltest.sh
tests/pytest/fulltest.sh
+1
-0
tests/pytest/query/queryInsertValue.py
tests/pytest/query/queryInsertValue.py
+65
-0
tests/pytest/query/queryNullValueTest.py
tests/pytest/query/queryNullValueTest.py
+181
-0
tests/pytest/regressiontest.sh
tests/pytest/regressiontest.sh
+1
-0
tests/script/sh/deploy.sh
tests/script/sh/deploy.sh
+2
-0
tests/script/sh/exec.sh
tests/script/sh/exec.sh
+3
-1
tests/script/unique/vnode/replica2_a_large.sim
tests/script/unique/vnode/replica2_a_large.sim
+103
-0
未找到文件。
src/client/src/tscAsync.c
浏览文件 @
6b984059
...
...
@@ -54,7 +54,7 @@ void doAsyncQuery(STscObj* pObj, SSqlObj* pSql, void (*fp)(), void* param, const
return
;
}
str
tolower
(
pSql
->
sqlstr
,
sqlstr
);
str
ntolower
(
pSql
->
sqlstr
,
sqlstr
,
sqlLen
);
tscDebugL
(
"%p SQL: %s"
,
pSql
,
pSql
->
sqlstr
);
pSql
->
cmd
.
curSql
=
pSql
->
sqlstr
;
...
...
@@ -564,8 +564,8 @@ void tscTableMetaCallBack(void *param, TAOS_RES *res, int code) {
tscDebug
(
"%p stream:%p meta is updated, start new query, command:%d"
,
pSql
,
pSql
->
pStream
,
pSql
->
cmd
.
command
);
if
(
!
pSql
->
cmd
.
parseFinished
)
{
tsParseSql
(
pSql
,
false
);
sem_post
(
&
pSql
->
rspSem
);
}
(
*
pSql
->
fp
)(
pSql
->
param
,
pSql
,
code
);
return
;
}
...
...
src/client/src/tscSQLParser.c
浏览文件 @
6b984059
...
...
@@ -4642,21 +4642,24 @@ typedef struct SDNodeDynConfOption {
}
SDNodeDynConfOption
;
int32_t
validateEp
(
char
*
ep
)
{
int32_t
validateEp
(
char
*
ep
)
{
char
buf
[
TSDB_EP_LEN
+
1
]
=
{
0
};
tstrncpy
(
buf
,
ep
,
TSDB_EP_LEN
);
char
*
pos
=
strchr
(
buf
,
':'
);
if
(
NULL
==
pos
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
char
*
pos
=
strchr
(
buf
,
':'
);
if
(
NULL
==
pos
)
{
int32_t
val
=
strtol
(
ep
,
NULL
,
10
);
if
(
val
<=
0
||
val
>
65536
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
}
}
else
{
uint16_t
port
=
atoi
(
pos
+
1
);
if
(
0
==
port
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
}
}
uint16_t
port
=
atoi
(
pos
+
1
);
if
(
0
==
port
)
{
return
TSDB_CODE_TSC_INVALID_SQL
;
}
return
TSDB_CODE_SUCCESS
;
return
TSDB_CODE_SUCCESS
;
}
int32_t
validateDNodeConfig
(
tDCLSQL
*
pOptions
)
{
...
...
@@ -4664,13 +4667,13 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) {
return
TSDB_CODE_TSC_INVALID_SQL
;
}
const
int
DNODE_DYNAMIC_CFG_OPTIONS_SIZE
=
1
7
;
const
int
DNODE_DYNAMIC_CFG_OPTIONS_SIZE
=
1
9
;
const
SDNodeDynConfOption
DNODE_DYNAMIC_CFG_OPTIONS
[]
=
{
{
"resetLog"
,
8
},
{
"resetQueryCache"
,
15
},
{
"debugFlag"
,
9
},
{
"mDebugFlag"
,
10
},
{
"dDebugFlag"
,
10
},
{
"sdbDebugFlag"
,
12
},
{
"vDebugFlag"
,
10
},
{
"cDebugFlag"
,
10
},
{
"httpDebugFlag"
,
13
},
{
"monitorDebugFlag"
,
16
},
{
"rpcDebugFlag"
,
12
},
{
"uDebugFlag"
,
10
},
{
"tmrDebugFlag"
,
12
},
{
"qDebugflag"
,
10
},
{
"sDebugflag"
,
10
},
{
"tsdbDebugFlag"
,
13
},
{
"monitor"
,
7
}};
{
"m
qttDebugFlag"
,
13
},
{
"wDebugFlag"
,
10
},
{
"m
onitor"
,
7
}};
SSQLToken
*
pOptionToken
=
&
pOptions
->
a
[
1
];
...
...
@@ -4694,7 +4697,7 @@ int32_t validateDNodeConfig(tDCLSQL* pOptions) {
SSQLToken
*
pValToken
=
&
pOptions
->
a
[
2
];
int32_t
val
=
strtol
(
pValToken
->
z
,
NULL
,
10
);
if
(
val
<
131
||
val
>
199
)
{
if
(
val
<
0
||
val
>
256
)
{
/* options value is out of valid range */
return
TSDB_CODE_TSC_INVALID_SQL
;
}
...
...
src/client/src/tscSql.c
浏览文件 @
6b984059
...
...
@@ -263,12 +263,29 @@ TAOS_RES* taos_query(TAOS *taos, const char *sqlstr) {
return
pSql
;
}
TAOS_RES
*
taos_query_c
(
TAOS
*
taos
,
const
char
*
sqlstr
,
uint32_t
sqlLen
)
{
char
*
buf
=
malloc
(
sqlLen
+
1
);
buf
[
sqlLen
]
=
0
;
strncpy
(
buf
,
sqlstr
,
sqlLen
);
TAOS_RES
*
res
=
taos_query
(
taos
,
buf
);
free
(
buf
);
return
res
;
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
{
terrno
=
TSDB_CODE_TSC_DISCONNECTED
;
return
NULL
;
}
if
(
sqlLen
>
tsMaxSQLStringLen
)
{
tscError
(
"sql string exceeds max length:%d"
,
tsMaxSQLStringLen
);
terrno
=
TSDB_CODE_TSC_INVALID_SQL
;
return
NULL
;
}
SSqlObj
*
pSql
=
calloc
(
1
,
sizeof
(
SSqlObj
));
if
(
pSql
==
NULL
)
{
tscError
(
"failed to malloc sqlObj"
);
terrno
=
TSDB_CODE_TSC_OUT_OF_MEMORY
;
return
NULL
;
}
doAsyncQuery
(
pObj
,
pSql
,
waitForQueryRsp
,
taos
,
sqlstr
,
sqlLen
);
tsem_wait
(
&
pSql
->
rspSem
);
return
pSql
;
}
int
taos_result_precision
(
TAOS_RES
*
res
)
{
SSqlObj
*
pSql
=
(
SSqlObj
*
)
res
;
...
...
src/client/src/tscStream.c
浏览文件 @
6b984059
...
...
@@ -70,6 +70,7 @@ static void tscProcessStreamLaunchQuery(SSchedMsg *pMsg) {
SSqlObj
*
pSql
=
pStream
->
pSql
;
pSql
->
fp
=
tscProcessStreamQueryCallback
;
pSql
->
fetchFp
=
tscProcessStreamQueryCallback
;
pSql
->
param
=
pStream
;
pSql
->
res
.
completed
=
false
;
...
...
@@ -471,6 +472,41 @@ static void setErrorInfo(SSqlObj* pSql, int32_t code, char* info) {
}
}
static
void
tscCreateStream
(
void
*
param
,
TAOS_RES
*
res
,
int
code
)
{
SSqlStream
*
pStream
=
(
SSqlStream
*
)
param
;
SSqlObj
*
pSql
=
pStream
->
pSql
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
setErrorInfo
(
pSql
,
code
,
pCmd
->
payload
);
tscError
(
"%p open stream failed, sql:%s, reason:%s, code:0x%08x"
,
pSql
,
pSql
->
sqlstr
,
pCmd
->
payload
,
code
);
pStream
->
fp
(
pStream
->
param
,
NULL
,
NULL
);
return
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMetaInfo
->
pTableMeta
);
pStream
->
isProject
=
isProjectStream
(
pQueryInfo
);
pStream
->
precision
=
tinfo
.
precision
;
pStream
->
ctime
=
taosGetTimestamp
(
pStream
->
precision
);
pStream
->
etime
=
pQueryInfo
->
window
.
ekey
;
tscAddIntoStreamList
(
pStream
);
tscSetSlidingWindowInfo
(
pSql
,
pStream
);
pStream
->
stime
=
tscGetStreamStartTimestamp
(
pSql
,
pStream
,
pStream
->
stime
);
int64_t
starttime
=
tscGetLaunchTimestamp
(
pStream
);
pCmd
->
command
=
TSDB_SQL_SELECT
;
taosTmrReset
(
tscProcessStreamTimer
,
starttime
,
pStream
,
tscTmr
,
&
pStream
->
pTimer
);
tscDebug
(
"%p stream:%p is opened, query on:%s, interval:%"
PRId64
", sliding:%"
PRId64
", first launched in:%"
PRId64
", sql:%s"
,
pSql
,
pStream
,
pTableMetaInfo
->
name
,
pStream
->
interval
,
pStream
->
slidingTime
,
starttime
,
pSql
->
sqlstr
);
}
TAOS_STREAM
*
taos_open_stream
(
TAOS
*
taos
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
int64_t
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
))
{
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
...
...
@@ -482,7 +518,6 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
}
pSql
->
signature
=
pSql
;
pSql
->
param
=
pSql
;
pSql
->
pTscObj
=
pObj
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
...
...
@@ -494,7 +529,14 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
tscFreeSqlObj
(
pSql
);
return
NULL
;
}
pStream
->
stime
=
stime
;
pStream
->
fp
=
fp
;
pStream
->
callback
=
callback
;
pStream
->
param
=
param
;
pStream
->
pSql
=
pSql
;
pSql
->
pStream
=
pStream
;
pSql
->
param
=
pStream
;
pSql
->
sqlstr
=
calloc
(
1
,
strlen
(
sqlstr
)
+
1
);
if
(
pSql
->
sqlstr
==
NULL
)
{
...
...
@@ -507,45 +549,18 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
tscDebugL
(
"%p SQL: %s"
,
pSql
,
pSql
->
sqlstr
);
tsem_init
(
&
pSql
->
rspSem
,
0
,
0
);
pSql
->
fp
=
tscCreateStream
;
pSql
->
fetchFp
=
tscCreateStream
;
int32_t
code
=
tsParseSql
(
pSql
,
true
);
if
(
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
sem_wait
(
&
pSql
->
rspSem
);
}
if
(
pRes
->
code
!=
TSDB_CODE_SUCCESS
)
{
setErrorInfo
(
pSql
,
pRes
->
code
,
pCmd
->
payload
);
tscError
(
"%p open stream failed, sql:%s, reason:%s, code:0x%08x"
,
pSql
,
sqlstr
,
pCmd
->
payload
,
pRes
->
code
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
tscCreateStream
(
pStream
,
pSql
,
code
);
}
else
if
(
code
!=
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
tscError
(
"%p open stream failed, sql:%s, code:%s"
,
pSql
,
sqlstr
,
tstrerror
(
pRes
->
code
));
tscFreeSqlObj
(
pSql
);
free
(
pStream
);
return
NULL
;
}
SQueryInfo
*
pQueryInfo
=
tscGetQueryInfoDetail
(
pCmd
,
0
);
STableMetaInfo
*
pTableMetaInfo
=
tscGetMetaInfo
(
pQueryInfo
,
0
);
STableComInfo
tinfo
=
tscGetTableInfo
(
pTableMetaInfo
->
pTableMeta
);
pStream
->
isProject
=
isProjectStream
(
pQueryInfo
);
pStream
->
fp
=
fp
;
pStream
->
callback
=
callback
;
pStream
->
param
=
param
;
pStream
->
pSql
=
pSql
;
pStream
->
precision
=
tinfo
.
precision
;
pStream
->
ctime
=
taosGetTimestamp
(
pStream
->
precision
);
pStream
->
etime
=
pQueryInfo
->
window
.
ekey
;
tscAddIntoStreamList
(
pStream
);
tscSetSlidingWindowInfo
(
pSql
,
pStream
);
pStream
->
stime
=
tscGetStreamStartTimestamp
(
pSql
,
pStream
,
stime
);
int64_t
starttime
=
tscGetLaunchTimestamp
(
pStream
);
pCmd
->
command
=
TSDB_SQL_SELECT
;
taosTmrReset
(
tscProcessStreamTimer
,
starttime
,
pStream
,
tscTmr
,
&
pStream
->
pTimer
);
tscDebug
(
"%p stream:%p is opened, query on:%s, interval:%"
PRId64
", sliding:%"
PRId64
", first launched in:%"
PRId64
", sql:%s"
,
pSql
,
pStream
,
pTableMetaInfo
->
name
,
pStream
->
interval
,
pStream
->
slidingTime
,
starttime
,
sqlstr
);
return
pStream
;
}
...
...
src/common/src/tdataformat.c
浏览文件 @
6b984059
...
...
@@ -318,7 +318,7 @@ SDataCols *tdNewDataCols(int maxRowSize, int maxCols, int maxRows) {
pCols
->
maxPoints
=
maxRows
;
pCols
->
bufSize
=
maxRowSize
*
maxRows
;
pCols
->
buf
=
malloc
(
pCols
->
bufSize
);
pCols
->
buf
=
calloc
(
1
,
pCols
->
bufSize
);
if
(
pCols
->
buf
==
NULL
)
{
free
(
pCols
);
return
NULL
;
...
...
src/cq/src/cqMain.c
浏览文件 @
6b984059
...
...
@@ -23,6 +23,7 @@
#include "taos.h"
#include "taosdef.h"
#include "taosmsg.h"
#include "ttimer.h"
#include "tcq.h"
#include "tdataformat.h"
#include "tglobal.h"
...
...
@@ -45,10 +46,12 @@ typedef struct {
struct
SCqObj
*
pHead
;
void
*
dbConn
;
int
master
;
void
*
tmrCtrl
;
pthread_mutex_t
mutex
;
}
SCqContext
;
typedef
struct
SCqObj
{
tmr_h
tmrId
;
uint64_t
uid
;
int32_t
tid
;
// table ID
int
rowSize
;
// bytes of a row
...
...
@@ -66,13 +69,14 @@ static void cqProcessStreamRes(void *param, TAOS_RES *tres, TAOS_ROW row);
static
void
cqCreateStream
(
SCqContext
*
pContext
,
SCqObj
*
pObj
);
void
*
cqOpen
(
void
*
ahandle
,
const
SCqCfg
*
pCfg
)
{
SCqContext
*
pContext
=
calloc
(
sizeof
(
SCqContext
),
1
);
if
(
pContext
==
NULL
)
{
terrno
=
TAOS_SYSTEM_ERROR
(
errno
);
return
NULL
;
}
pContext
->
tmrCtrl
=
taosTmrInit
(
0
,
0
,
0
,
"CQ"
);
tstrncpy
(
pContext
->
user
,
pCfg
->
user
,
sizeof
(
pContext
->
user
));
tstrncpy
(
pContext
->
pass
,
pCfg
->
pass
,
sizeof
(
pContext
->
pass
));
const
char
*
db
=
pCfg
->
db
;
...
...
@@ -99,6 +103,9 @@ void cqClose(void *handle) {
SCqContext
*
pContext
=
handle
;
if
(
handle
==
NULL
)
return
;
taosTmrCleanUp
(
pContext
->
tmrCtrl
);
pContext
->
tmrCtrl
=
NULL
;
// stop all CQs
cqStop
(
pContext
);
...
...
@@ -154,8 +161,10 @@ void cqStop(void *handle) {
taos_close_stream
(
pObj
->
pStream
);
pObj
->
pStream
=
NULL
;
cTrace
(
"vgId:%d, id:%d CQ:%s is closed"
,
pContext
->
vgId
,
pObj
->
tid
,
pObj
->
sqlStr
);
}
else
{
taosTmrStop
(
pObj
->
tmrId
);
pObj
->
tmrId
=
0
;
}
pObj
=
pObj
->
next
;
}
...
...
@@ -211,8 +220,13 @@ void cqDrop(void *handle) {
}
// free the resources associated
if
(
pObj
->
pStream
)
taos_close_stream
(
pObj
->
pStream
);
pObj
->
pStream
=
NULL
;
if
(
pObj
->
pStream
)
{
taos_close_stream
(
pObj
->
pStream
);
pObj
->
pStream
=
NULL
;
}
else
{
taosTmrStop
(
pObj
->
tmrId
);
pObj
->
tmrId
=
0
;
}
cTrace
(
"vgId:%d, id:%d CQ:%s is dropped"
,
pContext
->
vgId
,
pObj
->
tid
,
pObj
->
sqlStr
);
tdFreeSchema
(
pObj
->
pSchema
);
...
...
@@ -222,18 +236,30 @@ void cqDrop(void *handle) {
pthread_mutex_unlock
(
&
pContext
->
mutex
);
}
static
void
cqCreateStream
(
SCqContext
*
pContext
,
SCqObj
*
pObj
)
{
static
void
cqProcessCreateTimer
(
void
*
param
,
void
*
tmrId
)
{
SCqObj
*
pObj
=
(
SCqObj
*
)
param
;
SCqContext
*
pContext
=
pObj
->
pContext
;
if
(
pContext
->
dbConn
==
NULL
)
{
pContext
->
dbConn
=
taos_connect
(
"localhost"
,
pContext
->
user
,
pContext
->
pass
,
pContext
->
db
,
0
);
if
(
pContext
->
dbConn
==
NULL
)
{
cError
(
"vgId:%d, failed to connect to TDengine(%s)"
,
pContext
->
vgId
,
tstrerror
(
terrno
));
return
;
}
}
cqCreateStream
(
pContext
,
pObj
);
}
int64_t
lastKey
=
0
;
static
void
cqCreateStream
(
SCqContext
*
pContext
,
SCqObj
*
pObj
)
{
pObj
->
pContext
=
pContext
;
pObj
->
pStream
=
taos_open_stream
(
pContext
->
dbConn
,
pObj
->
sqlStr
,
cqProcessStreamRes
,
lastKey
,
pObj
,
NULL
);
if
(
pContext
->
dbConn
==
NULL
)
{
pObj
->
tmrId
=
taosTmrStart
(
cqProcessCreateTimer
,
1000
,
pObj
,
pContext
->
tmrCtrl
);
return
;
}
pObj
->
tmrId
=
0
;
pObj
->
pStream
=
taos_open_stream
(
pContext
->
dbConn
,
pObj
->
sqlStr
,
cqProcessStreamRes
,
0
,
pObj
,
NULL
);
if
(
pObj
->
pStream
)
{
pContext
->
num
++
;
cTrace
(
"vgId:%d, id:%d CQ:%s is openned"
,
pContext
->
vgId
,
pObj
->
tid
,
pObj
->
sqlStr
);
...
...
src/dnode/inc/dnodeModule.h
浏览文件 @
6b984059
...
...
@@ -22,7 +22,6 @@ extern "C" {
int32_t
dnodeInitModules
();
void
dnodeStartModules
();
void
dnodeStartStream
();
void
dnodeCleanupModules
();
void
dnodeProcessModuleStatus
(
uint32_t
moduleStatus
);
...
...
src/dnode/src/dnodeMain.c
浏览文件 @
6b984059
...
...
@@ -123,7 +123,6 @@ int32_t dnodeInitSystem() {
dnodeStartModules
();
dnodeSetRunStatus
(
TSDB_DNODE_RUN_STATUS_RUNING
);
dnodeStartStream
();
dInfo
(
"TDengine is initialized successfully"
);
...
...
src/dnode/src/dnodeMgmt.c
浏览文件 @
6b984059
...
...
@@ -354,23 +354,6 @@ static int32_t dnodeOpenVnodes() {
return
TSDB_CODE_SUCCESS
;
}
void
dnodeStartStream
()
{
int32_t
vnodeList
[
TSDB_MAX_VNODES
]
=
{
0
};
int32_t
numOfVnodes
=
0
;
int32_t
status
=
vnodeGetVnodeList
(
vnodeList
,
&
numOfVnodes
);
if
(
status
!=
TSDB_CODE_SUCCESS
)
{
dInfo
(
"get dnode list failed"
);
return
;
}
for
(
int32_t
i
=
0
;
i
<
numOfVnodes
;
++
i
)
{
vnodeStartStream
(
vnodeList
[
i
]);
}
dInfo
(
"streams started"
);
}
static
void
dnodeCloseVnodes
()
{
int32_t
vnodeList
[
TSDB_MAX_VNODES
]
=
{
0
};
int32_t
numOfVnodes
=
0
;
...
...
@@ -416,7 +399,7 @@ static void* dnodeParseVnodeMsg(SRpcMsg *rpcMsg) {
static
int32_t
dnodeProcessCreateVnodeMsg
(
SRpcMsg
*
rpcMsg
)
{
SMDCreateVnodeMsg
*
pCreate
=
dnodeParseVnodeMsg
(
rpcMsg
);
void
*
pVnode
=
vnodeAcquire
Vnode
(
pCreate
->
cfg
.
vgId
);
void
*
pVnode
=
vnodeAcquire
(
pCreate
->
cfg
.
vgId
);
if
(
pVnode
!=
NULL
)
{
dDebug
(
"vgId:%d, already exist, return success"
,
pCreate
->
cfg
.
vgId
);
vnodeRelease
(
pVnode
);
...
...
@@ -430,7 +413,7 @@ static int32_t dnodeProcessCreateVnodeMsg(SRpcMsg *rpcMsg) {
static
int32_t
dnodeProcessAlterVnodeMsg
(
SRpcMsg
*
rpcMsg
)
{
SMDAlterVnodeMsg
*
pAlter
=
dnodeParseVnodeMsg
(
rpcMsg
);
void
*
pVnode
=
vnodeAcquire
Vnode
(
pAlter
->
cfg
.
vgId
);
void
*
pVnode
=
vnodeAcquire
(
pAlter
->
cfg
.
vgId
);
if
(
pVnode
!=
NULL
)
{
dDebug
(
"vgId:%d, alter vnode msg is received"
,
pAlter
->
cfg
.
vgId
);
int32_t
code
=
vnodeAlter
(
pVnode
,
pAlter
);
...
...
@@ -723,6 +706,7 @@ static void dnodeSendStatusMsg(void *handle, void *tmrId) {
// fill cluster cfg parameters
pStatus
->
clusterCfg
.
numOfMnodes
=
htonl
(
tsNumOfMnodes
);
pStatus
->
clusterCfg
.
enableBalance
=
htonl
(
tsEnableBalance
);
pStatus
->
clusterCfg
.
mnodeEqualVnodeNum
=
htonl
(
tsMnodeEqualVnodeNum
);
pStatus
->
clusterCfg
.
offlineThreshold
=
htonl
(
tsOfflineThreshold
);
pStatus
->
clusterCfg
.
statusInterval
=
htonl
(
tsStatusInterval
);
...
...
src/dnode/src/dnodeVRead.c
浏览文件 @
6b984059
...
...
@@ -91,23 +91,21 @@ void dnodeDispatchToVnodeReadQueue(SRpcMsg *pMsg) {
int32_t
queuedMsgNum
=
0
;
int32_t
leftLen
=
pMsg
->
contLen
;
char
*
pCont
=
(
char
*
)
pMsg
->
pCont
;
void
*
pVnode
;
while
(
leftLen
>
0
)
{
SMsgHead
*
pHead
=
(
SMsgHead
*
)
pCont
;
pHead
->
vgId
=
htonl
(
pHead
->
vgId
);
pHead
->
contLen
=
htonl
(
pHead
->
contLen
);
pVnode
=
vnodeAcquireVnod
e
(
pHead
->
vgId
);
taos_queue
queue
=
vnodeAcquireRqueu
e
(
pHead
->
vgId
);
if
(
pVnod
e
==
NULL
)
{
if
(
queu
e
==
NULL
)
{
leftLen
-=
pHead
->
contLen
;
pCont
-=
pHead
->
contLen
;
continue
;
}
// put message into queue
taos_queue
queue
=
vnodeGetRqueue
(
pVnode
);
SReadMsg
*
pRead
=
(
SReadMsg
*
)
taosAllocateQitem
(
sizeof
(
SReadMsg
));
pRead
->
rpcMsg
=
*
pMsg
;
pRead
->
pCont
=
pCont
;
...
...
@@ -175,18 +173,6 @@ void dnodeFreeVnodeRqueue(void *rqueue) {
// dynamically adjust the number of threads
}
void
dnodePutItemIntoReadQueue
(
void
*
pVnode
,
void
*
qhandle
)
{
SReadMsg
*
pRead
=
(
SReadMsg
*
)
taosAllocateQitem
(
sizeof
(
SReadMsg
));
pRead
->
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_QUERY
;
pRead
->
pCont
=
qhandle
;
pRead
->
contLen
=
0
;
assert
(
pVnode
!=
NULL
);
taos_queue
queue
=
vnodeAcquireRqueue
(
pVnode
);
taosWriteQitem
(
queue
,
TAOS_QTYPE_QUERY
,
pRead
);
}
void
dnodeSendRpcReadRsp
(
void
*
pVnode
,
SReadMsg
*
pRead
,
int32_t
code
)
{
SRpcMsg
rpcRsp
=
{
.
handle
=
pRead
->
rpcMsg
.
handle
,
...
...
src/dnode/src/dnodeVWrite.c
浏览文件 @
6b984059
...
...
@@ -104,7 +104,7 @@ void dnodeDispatchToVnodeWriteQueue(SRpcMsg *pMsg) {
pHead
->
vgId
=
htonl
(
pHead
->
vgId
);
pHead
->
contLen
=
htonl
(
pHead
->
contLen
);
taos_queue
queue
=
vnode
Get
Wqueue
(
pHead
->
vgId
);
taos_queue
queue
=
vnode
Acquire
Wqueue
(
pHead
->
vgId
);
if
(
queue
)
{
// put message into queue
SWriteMsg
*
pWrite
=
(
SWriteMsg
*
)
taosAllocateQitem
(
sizeof
(
SWriteMsg
));
...
...
src/inc/dnode.h
浏览文件 @
6b984059
...
...
@@ -53,7 +53,6 @@ void *dnodeAllocateVnodeWqueue(void *pVnode);
void
dnodeFreeVnodeWqueue
(
void
*
queue
);
void
*
dnodeAllocateVnodeRqueue
(
void
*
pVnode
);
void
dnodeFreeVnodeRqueue
(
void
*
rqueue
);
void
dnodePutItemIntoReadQueue
(
void
*
pVnode
,
void
*
qhandle
);
void
dnodeSendRpcVnodeWriteRsp
(
void
*
pVnode
,
void
*
param
,
int32_t
code
);
int32_t
dnodeAllocateMnodePqueue
();
...
...
src/inc/taosmsg.h
浏览文件 @
6b984059
...
...
@@ -54,12 +54,11 @@ TAOS_DEFINE_MESSAGE_TYPE( TSDB_MSG_TYPE_MD_CREATE_TABLE, "create-table" )
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_MD_DROP_TABLE
,
"drop-table"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_MD_ALTER_TABLE
,
"alter-table"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_MD_CREATE_VNODE
,
"create-vnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_MD_ALTER_VNODE
,
"alter-vnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_MD_DROP_VNODE
,
"drop-vnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_MD_DROP_STABLE
,
"drop-stable"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_MD_ALTER_STREAM
,
"alter-stream"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_MD_CONFIG_DNODE
,
"config-dnode"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_
DUMMY4
,
"dummy4
"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_
MD_ALTER_VNODE
,
"alter-vnode
"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY5
,
"dummy5"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY6
,
"dummy6"
)
TAOS_DEFINE_MESSAGE_TYPE
(
TSDB_MSG_TYPE_DUMMY7
,
"dummy7"
)
...
...
@@ -564,15 +563,16 @@ typedef struct {
typedef
struct
{
int32_t
numOfMnodes
;
// tsNumOfMnodes
int32_t
enableBalance
;
// tsEnableBalance
int32_t
mnodeEqualVnodeNum
;
// tsMnodeEqualVnodeNum
int32_t
offlineThreshold
;
// tsOfflineThreshold
int32_t
statusInterval
;
// tsStatusInterval
int32_t
maxtablesPerVnode
;
int32_t
maxVgroupsPerDb
;
char
arbitrator
[
TSDB_EP_LEN
];
// tsArbitrator
char
timezone
[
64
];
// tsTimezone
char
locale
[
TSDB_LOCALE_LEN
];
// tsLocale
char
charset
[
TSDB_LOCALE_LEN
];
// tsCharset
int32_t
maxtablesPerVnode
;
int32_t
maxVgroupsPerDb
;
}
SClusterCfg
;
typedef
struct
{
...
...
src/inc/tsdb.h
浏览文件 @
6b984059
...
...
@@ -116,7 +116,6 @@ int tsdbCreateTable(TSDB_REPO_T *repo, STableCfg *pCfg);
int
tsdbDropTable
(
TSDB_REPO_T
*
pRepo
,
STableId
tableId
);
int
tsdbUpdateTableTagValue
(
TSDB_REPO_T
*
repo
,
SUpdateTableTagValMsg
*
pMsg
);
TSKEY
tsdbGetTableLastKey
(
TSDB_REPO_T
*
repo
,
uint64_t
uid
);
void
tsdbStartStream
(
TSDB_REPO_T
*
repo
);
uint32_t
tsdbGetFileInfo
(
TSDB_REPO_T
*
repo
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int32_t
*
size
);
...
...
src/inc/tsync.h
浏览文件 @
6b984059
...
...
@@ -79,7 +79,7 @@ typedef void (*FConfirmForward)(void *ahandle, void *mhandle, int32_t code);
typedef
void
(
*
FNotifyRole
)(
void
*
ahandle
,
int8_t
role
);
// when data file is synced successfully, notity app
typedef
void
(
*
FNotifyFileSynced
)(
void
*
ahandle
,
uint64_t
fversion
);
typedef
int
(
*
FNotifyFileSynced
)(
void
*
ahandle
,
uint64_t
fversion
);
typedef
struct
{
int32_t
vgId
;
// vgroup ID
...
...
src/inc/vnode.h
浏览文件 @
6b984059
...
...
@@ -22,10 +22,10 @@ extern "C" {
typedef
enum
_VN_STATUS
{
TAOS_VN_STATUS_INIT
,
TAOS_VN_STATUS_UPDATING
,
TAOS_VN_STATUS_READY
,
TAOS_VN_STATUS_CLOSING
,
TAOS_VN_STATUS_DELETING
,
TAOS_VN_STATUS_UPDATING
,
TAOS_VN_STATUS_RESET
,
}
EVnStatus
;
typedef
struct
{
...
...
@@ -44,17 +44,13 @@ typedef struct {
int32_t
vnodeCreate
(
SMDCreateVnodeMsg
*
pVnodeCfg
);
int32_t
vnodeDrop
(
int32_t
vgId
);
int32_t
vnodeOpen
(
int32_t
vgId
,
char
*
rootDir
);
int32_t
vnodeStartStream
(
int32_t
vgId
);
int32_t
vnodeAlter
(
void
*
pVnode
,
SMDCreateVnodeMsg
*
pVnodeCfg
);
int32_t
vnodeClose
(
int32_t
vgId
);
void
vnodeRelease
(
void
*
pVnode
);
void
*
vnodeAcquireVnode
(
int32_t
vgId
);
// add refcount
void
*
vnodeGetVnode
(
int32_t
vgId
);
// keep refcount unchanged
void
*
vnodeAcquireRqueue
(
void
*
);
void
*
vnodeGetRqueue
(
void
*
);
void
*
vnodeGetWqueue
(
int32_t
vgId
);
void
*
vnodeAcquire
(
int32_t
vgId
);
// add refcount
void
*
vnodeAcquireRqueue
(
int32_t
vgId
);
// add refCount, get read queue
void
*
vnodeAcquireWqueue
(
int32_t
vgId
);
// add recCount, get write queue
void
vnodeRelease
(
void
*
pVnode
);
// dec refCount
void
*
vnodeGetWal
(
void
*
pVnode
);
int32_t
vnodeProcessWrite
(
void
*
pVnode
,
int
qtype
,
void
*
pHead
,
void
*
item
);
...
...
src/mnode/src/mnodeDnode.c
浏览文件 @
6b984059
...
...
@@ -269,18 +269,37 @@ void mnodeUpdateDnode(SDnodeObj *pDnode) {
}
static
int32_t
mnodeProcessCfgDnodeMsg
(
SMnodeMsg
*
pMsg
)
{
if
(
strcmp
(
pMsg
->
pUser
->
user
,
TSDB_DEFAULT_USER
)
!=
0
)
{
mError
(
"failed to cfg dnode, no rights"
);
return
TSDB_CODE_MND_NO_RIGHTS
;
}
SCMCfgDnodeMsg
*
pCmCfgDnode
=
pMsg
->
rpcMsg
.
pCont
;
if
(
pCmCfgDnode
->
ep
[
0
]
==
0
)
{
strcpy
(
pCmCfgDnode
->
ep
,
tsLocalEp
);
}
else
{
// TODO temporary disabled for compiling: strcpy(pCmCfgDnode->ep, pCmCfgDnode->ep);
tstrncpy
(
pCmCfgDnode
->
ep
,
tsLocalEp
,
TSDB_EP_LEN
);
}
int32_t
dnodeId
=
0
;
char
*
pos
=
strchr
(
pCmCfgDnode
->
ep
,
':'
);
if
(
NULL
==
pos
)
{
dnodeId
=
strtol
(
pCmCfgDnode
->
ep
,
NULL
,
10
);
if
(
dnodeId
<=
0
||
dnodeId
>
65536
)
{
mError
(
"failed to cfg dnode, invalid dnodeId:%s"
,
pCmCfgDnode
->
ep
);
return
TSDB_CODE_MND_DNODE_NOT_EXIST
;
}
}
if
(
strcmp
(
pMsg
->
pUser
->
user
,
TSDB_DEFAULT_USER
)
!=
0
)
{
return
TSDB_CODE_MND_NO_RIGHTS
;
SRpcIpSet
ipSet
=
mnodeGetIpSetFromIp
(
pCmCfgDnode
->
ep
);
if
(
dnodeId
!=
0
)
{
SDnodeObj
*
pDnode
=
mnodeGetDnode
(
dnodeId
);
if
(
pDnode
==
NULL
)
{
mError
(
"failed to cfg dnode, invalid dnodeId:%d"
,
dnodeId
);
return
TSDB_CODE_MND_DNODE_NOT_EXIST
;
}
ipSet
=
mnodeGetIpSetFromIp
(
pDnode
->
dnodeEp
);
mnodeDecDnodeRef
(
pDnode
);
}
SRpcIpSet
ipSet
=
mnodeGetIpSetFromIp
(
pCmCfgDnode
->
ep
);
SMDCfgDnodeMsg
*
pMdCfgDnode
=
rpcMallocCont
(
sizeof
(
SMDCfgDnodeMsg
));
strcpy
(
pMdCfgDnode
->
ep
,
pCmCfgDnode
->
ep
);
strcpy
(
pMdCfgDnode
->
config
,
pCmCfgDnode
->
config
);
...
...
@@ -292,9 +311,9 @@ static int32_t mnodeProcessCfgDnodeMsg(SMnodeMsg *pMsg) {
.
pCont
=
pMdCfgDnode
,
.
contLen
=
sizeof
(
SMDCfgDnodeMsg
)
};
dnodeSendMsgToDnode
(
&
ipSet
,
&
rpcMdCfgDnodeMsg
);
mInfo
(
"dnode:%s, is configured by %s"
,
pCmCfgDnode
->
ep
,
pMsg
->
pUser
->
user
);
dnodeSendMsgToDnode
(
&
ipSet
,
&
rpcMdCfgDnodeMsg
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -305,6 +324,7 @@ static void mnodeProcessCfgDnodeMsgRsp(SRpcMsg *rpcMsg) {
static
bool
mnodeCheckClusterCfgPara
(
const
SClusterCfg
*
clusterCfg
)
{
if
(
clusterCfg
->
numOfMnodes
!=
htonl
(
tsNumOfMnodes
))
return
false
;
if
(
clusterCfg
->
enableBalance
!=
htonl
(
tsEnableBalance
))
return
false
;
if
(
clusterCfg
->
mnodeEqualVnodeNum
!=
htonl
(
tsMnodeEqualVnodeNum
))
return
false
;
if
(
clusterCfg
->
offlineThreshold
!=
htonl
(
tsOfflineThreshold
))
return
false
;
if
(
clusterCfg
->
statusInterval
!=
htonl
(
tsStatusInterval
))
return
false
;
...
...
src/mnode/src/mnodeVgroup.c
浏览文件 @
6b984059
...
...
@@ -593,7 +593,7 @@ static int32_t mnodeGetVgroupMeta(STableMetaMsg *pMeta, SShowObj *pShow, void *p
pShow
->
bytes
[
cols
]
=
4
;
pSchema
[
cols
].
type
=
TSDB_DATA_TYPE_INT
;
strcpy
(
pSchema
[
cols
].
name
,
"
maxTabl
es"
);
strcpy
(
pSchema
[
cols
].
name
,
"
onlineVnod
es"
);
pSchema
[
cols
].
bytes
=
htons
(
pShow
->
bytes
[
cols
]);
cols
++
;
...
...
@@ -692,8 +692,15 @@ static int32_t mnodeRetrieveVgroups(SShowObj *pShow, char *data, int32_t rows, v
*
(
int32_t
*
)
pWrite
=
taosIdPoolMaxSize
(
pVgroup
->
idPool
);
cols
++
;
int32_t
onlineVnodes
=
0
;
for
(
int32_t
i
=
0
;
i
<
pShow
->
maxReplica
;
++
i
)
{
if
(
pVgroup
->
vnodeGid
[
i
].
role
==
TAOS_SYNC_ROLE_SLAVE
||
pVgroup
->
vnodeGid
[
i
].
role
==
TAOS_SYNC_ROLE_MASTER
)
{
onlineVnodes
++
;
}
}
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int32_t
*
)
pWrite
=
tsMaxTablePerVnode
;
*
(
int32_t
*
)
pWrite
=
onlineVnodes
;
cols
++
;
for
(
int32_t
i
=
0
;
i
<
pShow
->
maxReplica
;
++
i
)
{
...
...
src/tsdb/src/tsdbMain.c
浏览文件 @
6b984059
...
...
@@ -69,6 +69,8 @@ static int tsdbEncodeCfg(void **buf, STsdbCfg *pCfg);
static
void
*
tsdbDecodeCfg
(
void
*
buf
,
STsdbCfg
*
pCfg
);
static
int
tsdbCheckTableSchema
(
STsdbRepo
*
pRepo
,
SSubmitBlk
*
pBlock
,
STable
*
pTable
);
static
int
tsdbScanAndConvertSubmitMsg
(
STsdbRepo
*
pRepo
,
SSubmitMsg
*
pMsg
);
static
void
tsdbStartStream
(
STsdbRepo
*
pRepo
);
static
void
tsdbStopStream
(
STsdbRepo
*
pRepo
);
// Function declaration
int32_t
tsdbCreateRepo
(
char
*
rootDir
,
STsdbCfg
*
pCfg
)
{
...
...
@@ -127,6 +129,7 @@ TSDB_REPO_T *tsdbOpenRepo(char *rootDir, STsdbAppH *pAppH) {
goto
_err
;
}
tsdbStartStream
(
pRepo
);
// pRepo->state = TSDB_REPO_STATE_ACTIVE;
tsdbDebug
(
"vgId:%d open tsdb repository succeed!"
,
REPO_ID
(
pRepo
));
...
...
@@ -145,6 +148,8 @@ void tsdbCloseRepo(TSDB_REPO_T *repo, int toCommit) {
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
int
vgId
=
REPO_ID
(
pRepo
);
tsdbStopStream
(
repo
);
if
(
toCommit
)
{
tsdbAsyncCommit
(
pRepo
);
if
(
pRepo
->
commit
)
pthread_join
(
pRepo
->
commitThread
,
NULL
);
...
...
@@ -265,19 +270,6 @@ uint32_t tsdbGetFileInfo(TSDB_REPO_T *repo, char *name, uint32_t *index, uint32_
return
magic
;
}
void
tsdbStartStream
(
TSDB_REPO_T
*
repo
)
{
STsdbRepo
*
pRepo
=
(
STsdbRepo
*
)
repo
;
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
for
(
int
i
=
0
;
i
<
pRepo
->
config
.
maxTables
;
i
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
i
];
if
(
pTable
&&
pTable
->
type
==
TSDB_STREAM_TABLE
)
{
pTable
->
cqhandle
=
(
*
pRepo
->
appH
.
cqCreateFunc
)(
pRepo
->
appH
.
cqH
,
TABLE_UID
(
pTable
),
TABLE_TID
(
pTable
),
pTable
->
sql
,
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
-
1
));
}
}
}
STsdbCfg
*
tsdbGetCfg
(
const
TSDB_REPO_T
*
repo
)
{
ASSERT
(
repo
!=
NULL
);
return
&
((
STsdbRepo
*
)
repo
)
->
config
;
...
...
@@ -1120,4 +1112,27 @@ TSKEY tsdbGetTableLastKey(TSDB_REPO_T *repo, uint64_t uid) {
return TSDB_GET_TABLE_LAST_KEY(pTable);
}
#endif
\ No newline at end of file
#endif
static
void
tsdbStartStream
(
STsdbRepo
*
pRepo
)
{
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
for
(
int
i
=
0
;
i
<
pRepo
->
config
.
maxTables
;
i
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
i
];
if
(
pTable
&&
pTable
->
type
==
TSDB_STREAM_TABLE
)
{
pTable
->
cqhandle
=
(
*
pRepo
->
appH
.
cqCreateFunc
)(
pRepo
->
appH
.
cqH
,
TABLE_UID
(
pTable
),
TABLE_TID
(
pTable
),
pTable
->
sql
,
tsdbGetTableSchemaImpl
(
pTable
,
false
,
false
,
-
1
));
}
}
}
static
void
tsdbStopStream
(
STsdbRepo
*
pRepo
)
{
STsdbMeta
*
pMeta
=
pRepo
->
tsdbMeta
;
for
(
int
i
=
0
;
i
<
pRepo
->
config
.
maxTables
;
i
++
)
{
STable
*
pTable
=
pMeta
->
tables
[
i
];
if
(
pTable
&&
pTable
->
type
==
TSDB_STREAM_TABLE
)
{
(
*
pRepo
->
appH
.
cqDropFunc
)(
pTable
->
cqhandle
);
}
}
}
src/util/inc/tutil.h
浏览文件 @
6b984059
...
...
@@ -133,6 +133,8 @@ char **strsplit(char *src, const char *delim, int32_t *num);
char
*
strtolower
(
char
*
dst
,
const
char
*
src
);
char
*
strntolower
(
char
*
dst
,
const
char
*
src
,
int32_t
n
);
int64_t
strnatoi
(
char
*
num
,
int32_t
len
);
//char* strreplace(const char* str, const char* pattern, const char* rep);
...
...
src/util/src/tsocket.c
浏览文件 @
6b984059
...
...
@@ -270,6 +270,14 @@ int taosOpenTcpClientSocket(uint32_t destIp, uint16_t destPort, uint32_t clientI
return
-
1
;
}
/* set REUSEADDR option, so the portnumber can be re-used */
int
reuse
=
1
;
if
(
taosSetSockOpt
(
sockFd
,
SOL_SOCKET
,
SO_REUSEADDR
,
(
void
*
)
&
reuse
,
sizeof
(
reuse
))
<
0
)
{
uError
(
"setsockopt SO_REUSEADDR failed: %d (%s)"
,
errno
,
strerror
(
errno
));
close
(
sockFd
);
return
-
1
;
};
if
(
clientIp
!=
0
)
{
memset
((
char
*
)
&
clientAddr
,
0
,
sizeof
(
clientAddr
));
clientAddr
.
sin_family
=
AF_INET
;
...
...
src/util/src/tutil.c
浏览文件 @
6b984059
...
...
@@ -234,6 +234,32 @@ char* strtolower(char *dst, const char *src) {
*
p
=
0
;
return
dst
;
}
char
*
strntolower
(
char
*
dst
,
const
char
*
src
,
int32_t
n
)
{
int
esc
=
0
;
char
quote
=
0
,
*
p
=
dst
,
c
;
assert
(
dst
!=
NULL
);
for
(
c
=
*
src
++
;
n
--
>
0
;
c
=
*
src
++
)
{
if
(
esc
)
{
esc
=
0
;
}
else
if
(
quote
)
{
if
(
c
==
'\\'
)
{
esc
=
1
;
}
else
if
(
c
==
quote
)
{
quote
=
0
;
}
}
else
if
(
c
>=
'A'
&&
c
<=
'Z'
)
{
c
-=
'A'
-
'a'
;
}
else
if
(
c
==
'\''
||
c
==
'"'
)
{
quote
=
c
;
}
*
p
++
=
c
;
}
*
p
=
0
;
return
dst
;
}
char
*
paGetToken
(
char
*
string
,
char
**
token
,
int32_t
*
tokenLen
)
{
char
quote
=
0
;
...
...
src/vnode/inc/vnodeInt.h
浏览文件 @
6b984059
...
...
@@ -37,7 +37,7 @@ extern int32_t vDebugFlag;
typedef
struct
{
int32_t
vgId
;
// global vnode group ID
int32_t
refCount
;
// reference count
int
status
;
int
8_t
status
;
int8_t
role
;
int8_t
accessState
;
int64_t
version
;
// current version
...
...
@@ -55,6 +55,8 @@ typedef struct {
SWalCfg
walCfg
;
void
*
qMgmt
;
char
*
rootDir
;
tsem_t
sem
;
int8_t
dropped
;
char
db
[
TSDB_DB_NAME_LEN
];
}
SVnodeObj
;
...
...
src/vnode/src/vnodeMain.c
浏览文件 @
6b984059
...
...
@@ -44,7 +44,7 @@ static int vnodeProcessTsdbStatus(void *arg, int status);
static
uint32_t
vnodeGetFileInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
,
uint32_t
eindex
,
int32_t
*
size
,
uint64_t
*
fversion
);
static
int
vnodeGetWalInfo
(
void
*
ahandle
,
char
*
name
,
uint32_t
*
index
);
static
void
vnodeNotifyRole
(
void
*
ahandle
,
int8_t
role
);
static
void
vnodeNotifyFileSynced
(
void
*
ahandle
,
uint64_t
fversion
);
static
int
vnodeNotifyFileSynced
(
void
*
ahandle
,
uint64_t
fversion
);
#ifndef _SYNC
tsync_h
syncStart
(
const
SSyncInfo
*
info
)
{
return
NULL
;
}
...
...
@@ -153,7 +153,7 @@ int32_t vnodeDrop(int32_t vgId) {
SVnodeObj
*
pVnode
=
*
ppVnode
;
vTrace
(
"vgId:%d, vnode will be dropped, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
pVnode
->
status
=
TAOS_VN_STATUS_DELETING
;
pVnode
->
dropped
=
1
;
vnodeCleanUp
(
pVnode
);
return
TSDB_CODE_SUCCESS
;
...
...
@@ -164,18 +164,11 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
// vnode in non-ready state and still needs to return success instead of TSDB_CODE_VND_INVALID_STATUS
// cfgVersion can be corrected by status msg
if
(
pVnode
->
status
!=
TAOS_VN_STATUS_READY
)
{
if
(
atomic_val_compare_exchange_8
(
&
pVnode
->
status
,
TAOS_VN_STATUS_READY
,
TAOS_VN_STATUS_UPDATING
)
!=
TAOS_VN_STATUS_READY
)
{
vDebug
(
"vgId:%d, vnode is not ready, do alter operation later"
,
pVnode
->
vgId
);
return
TSDB_CODE_SUCCESS
;
}
// the vnode may always fail to synchronize because of it in low cfgVersion
// so cannot use the following codes
// if (pVnode->syncCfg.replica > 1 && pVnode->role == TAOS_SYNC_ROLE_UNSYNCED)
// return TSDB_CODE_VND_NOT_SYNCED;
pVnode
->
status
=
TAOS_VN_STATUS_UPDATING
;
int32_t
code
=
vnodeSaveCfg
(
pVnodeCfg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pVnode
->
status
=
TAOS_VN_STATUS_READY
;
...
...
@@ -194,10 +187,12 @@ int32_t vnodeAlter(void *param, SMDCreateVnodeMsg *pVnodeCfg) {
return
code
;
}
code
=
tsdbConfigRepo
(
pVnode
->
tsdb
,
&
pVnode
->
tsdbCfg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pVnode
->
status
=
TAOS_VN_STATUS_READY
;
return
code
;
if
(
pVnode
->
tsdb
)
{
code
=
tsdbConfigRepo
(
pVnode
->
tsdb
,
&
pVnode
->
tsdbCfg
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pVnode
->
status
=
TAOS_VN_STATUS_READY
;
return
code
;
}
}
pVnode
->
status
=
TAOS_VN_STATUS_READY
;
...
...
@@ -223,6 +218,7 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
pVnode
->
tsdbCfg
.
tsdbId
=
pVnode
->
vgId
;
pVnode
->
rootDir
=
strdup
(
rootDir
);
pVnode
->
accessState
=
TSDB_VN_ALL_ACCCESS
;
tsem_init
(
&
pVnode
->
sem
,
0
,
0
);
int32_t
code
=
vnodeReadCfg
(
pVnode
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -302,10 +298,6 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
}
#endif
// start continuous query
if
(
pVnode
->
role
==
TAOS_SYNC_ROLE_MASTER
)
cqStart
(
pVnode
->
cq
);
pVnode
->
qMgmt
=
qOpenQueryMgmt
(
pVnode
->
vgId
);
pVnode
->
events
=
NULL
;
pVnode
->
status
=
TAOS_VN_STATUS_READY
;
...
...
@@ -316,22 +308,12 @@ int32_t vnodeOpen(int32_t vnode, char *rootDir) {
return
TSDB_CODE_SUCCESS
;
}
int32_t
vnodeStartStream
(
int32_t
vnode
)
{
SVnodeObj
*
pVnode
=
vnodeAcquireVnode
(
vnode
);
if
(
pVnode
!=
NULL
)
{
tsdbStartStream
(
pVnode
->
tsdb
);
vnodeRelease
(
pVnode
);
}
return
TSDB_CODE_SUCCESS
;
}
int32_t
vnodeClose
(
int32_t
vgId
)
{
SVnodeObj
**
ppVnode
=
(
SVnodeObj
**
)
taosHashGet
(
tsDnodeVnodesHash
,
(
const
char
*
)
&
vgId
,
sizeof
(
int32_t
));
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
return
0
;
SVnodeObj
*
pVnode
=
*
ppVnode
;
vDebug
(
"vgId:%d, vnode will be closed"
,
pVnode
->
vgId
);
pVnode
->
status
=
TAOS_VN_STATUS_CLOSING
;
vnodeCleanUp
(
pVnode
);
return
0
;
...
...
@@ -346,6 +328,8 @@ void vnodeRelease(void *pVnodeRaw) {
if
(
refCount
>
0
)
{
vDebug
(
"vgId:%d, release vnode, refCount:%d"
,
vgId
,
refCount
);
if
(
pVnode
->
status
==
TAOS_VN_STATUS_RESET
&&
refCount
==
2
)
tsem_post
(
&
pVnode
->
sem
);
return
;
}
...
...
@@ -356,11 +340,6 @@ void vnodeRelease(void *pVnodeRaw) {
tsdbCloseRepo
(
pVnode
->
tsdb
,
1
);
pVnode
->
tsdb
=
NULL
;
// stop continuous query
if
(
pVnode
->
cq
)
cqClose
(
pVnode
->
cq
);
pVnode
->
cq
=
NULL
;
if
(
pVnode
->
wal
)
walClose
(
pVnode
->
wal
);
pVnode
->
wal
=
NULL
;
...
...
@@ -375,20 +354,21 @@ void vnodeRelease(void *pVnodeRaw) {
tfree
(
pVnode
->
rootDir
);
if
(
pVnode
->
status
==
TAOS_VN_STATUS_DELETING
)
{
if
(
pVnode
->
dropped
)
{
char
rootDir
[
TSDB_FILENAME_LEN
]
=
{
0
};
sprintf
(
rootDir
,
"%s/vnode%d"
,
tsVnodeDir
,
vgId
);
taosMvDir
(
tsVnodeBakDir
,
rootDir
);
taosRemoveDir
(
rootDir
);
}
tsem_destroy
(
&
pVnode
->
sem
);
free
(
pVnode
);
int32_t
count
=
taosHashGetSize
(
tsDnodeVnodesHash
);
vDebug
(
"vgId:%d, vnode is released, vnodes:%d"
,
vgId
,
count
);
}
void
*
vnode
GetVnod
e
(
int32_t
vgId
)
{
void
*
vnode
Acquir
e
(
int32_t
vgId
)
{
SVnodeObj
**
ppVnode
=
(
SVnodeObj
**
)
taosHashGet
(
tsDnodeVnodesHash
,
(
const
char
*
)
&
vgId
,
sizeof
(
int32_t
));
if
(
ppVnode
==
NULL
||
*
ppVnode
==
NULL
)
{
terrno
=
TSDB_CODE_VND_INVALID_VGROUP_ID
;
...
...
@@ -396,35 +376,38 @@ void *vnodeGetVnode(int32_t vgId) {
return
NULL
;
}
return
*
ppVnode
;
}
void
*
vnodeAcquireVnode
(
int32_t
vgId
)
{
SVnodeObj
*
pVnode
=
vnodeGetVnode
(
vgId
);
if
(
pVnode
==
NULL
)
return
pVnode
;
SVnodeObj
*
pVnode
=
*
ppVnode
;
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
vDebug
(
"vgId:%d, get vnode, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
return
pVnode
;
}
void
*
vnodeAcquireRqueue
(
void
*
param
)
{
SVnodeObj
*
pVnode
=
param
;
void
*
vnodeAcquireRqueue
(
int32_t
vgId
)
{
SVnodeObj
*
pVnode
=
vnodeAcquire
(
vgId
)
;
if
(
pVnode
==
NULL
)
return
NULL
;
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
vDebug
(
"vgId:%d, get vnode rqueue, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
return
((
SVnodeObj
*
)
pVnode
)
->
rqueue
;
}
if
(
pVnode
->
status
==
TAOS_VN_STATUS_RESET
)
{
terrno
=
TSDB_CODE_VND_INVALID_STATUS
;
vInfo
(
"vgId:%d, status is in reset"
,
vgId
);
vnodeRelease
(
pVnode
);
return
NULL
;
}
void
*
vnodeGetRqueue
(
void
*
pVnode
)
{
return
((
SVnodeObj
*
)
pVnode
)
->
rqueue
;
return
pVnode
->
rqueue
;
}
void
*
vnode
Get
Wqueue
(
int32_t
vgId
)
{
SVnodeObj
*
pVnode
=
vnodeAcquire
Vnode
(
vgId
);
void
*
vnode
Acquire
Wqueue
(
int32_t
vgId
)
{
SVnodeObj
*
pVnode
=
vnodeAcquire
(
vgId
);
if
(
pVnode
==
NULL
)
return
NULL
;
if
(
pVnode
->
status
==
TAOS_VN_STATUS_RESET
)
{
terrno
=
TSDB_CODE_VND_INVALID_STATUS
;
vInfo
(
"vgId:%d, status is in reset"
,
vgId
);
vnodeRelease
(
pVnode
);
return
NULL
;
}
return
pVnode
->
wqueue
;
}
...
...
@@ -496,7 +479,7 @@ void vnodeBuildStatusMsg(void *param) {
void
vnodeSetAccess
(
SDMVgroupAccess
*
pAccess
,
int32_t
numOfVnodes
)
{
for
(
int32_t
i
=
0
;
i
<
numOfVnodes
;
++
i
)
{
pAccess
[
i
].
vgId
=
htonl
(
pAccess
[
i
].
vgId
);
SVnodeObj
*
pVnode
=
vnodeAcquire
Vnode
(
pAccess
[
i
].
vgId
);
SVnodeObj
*
pVnode
=
vnodeAcquire
(
pAccess
[
i
].
vgId
);
if
(
pVnode
!=
NULL
)
{
pVnode
->
accessState
=
pAccess
[
i
].
accessState
;
if
(
pVnode
->
accessState
!=
TSDB_VN_ALL_ACCCESS
)
{
...
...
@@ -510,11 +493,29 @@ void vnodeSetAccess(SDMVgroupAccess *pAccess, int32_t numOfVnodes) {
static
void
vnodeCleanUp
(
SVnodeObj
*
pVnode
)
{
// remove from hash, so new messages wont be consumed
taosHashRemove
(
tsDnodeVnodesHash
,
(
const
char
*
)
&
pVnode
->
vgId
,
sizeof
(
int32_t
));
int
i
=
0
;
if
(
pVnode
->
status
!=
TAOS_VN_STATUS_INIT
)
{
// it may be in updateing or reset state, then it shall wait
while
(
atomic_val_compare_exchange_8
(
&
pVnode
->
status
,
TAOS_VN_STATUS_READY
,
TAOS_VN_STATUS_CLOSING
)
!=
TAOS_VN_STATUS_READY
)
{
if
(
++
i
%
1000
==
0
)
{
sched_yield
();
}
}
}
// stop replication module
if
(
pVnode
->
sync
)
{
syncStop
(
pVnode
->
sync
)
;
void
*
sync
=
pVnode
->
sync
;
pVnode
->
sync
=
NULL
;
syncStop
(
sync
);
}
// stop continuous query
if
(
pVnode
->
cq
)
{
void
*
cq
=
pVnode
->
cq
;
pVnode
->
cq
=
NULL
;
cqClose
(
cq
);
}
vTrace
(
"vgId:%d, vnode will cleanup, refCount:%d"
,
pVnode
->
vgId
,
pVnode
->
refCount
);
...
...
@@ -561,18 +562,25 @@ static void vnodeNotifyRole(void *ahandle, int8_t role) {
cqStop
(
pVnode
->
cq
);
}
static
void
vnodeNotifyFileSynced
(
void
*
ahandle
,
uint64_t
fversion
)
{
SVnodeObj
*
pVnode
=
ahandle
;
vDebug
(
"vgId:%d, data file is synced, fversion:%"
PRId64
,
pVnode
->
vgId
,
fversion
);
pVnode
->
fversion
=
fversion
;
pVnode
->
version
=
fversion
;
vnodeSaveVersion
(
pVnode
);
static
int
vnodeResetTsdb
(
SVnodeObj
*
pVnode
)
{
char
rootDir
[
128
]
=
"
\0
"
;
sprintf
(
rootDir
,
"%s/tsdb"
,
pVnode
->
rootDir
);
// clsoe tsdb, then open tsdb
tsdbCloseRepo
(
pVnode
->
tsdb
,
0
);
if
(
atomic_val_compare_exchange_8
(
&
pVnode
->
status
,
TAOS_VN_STATUS_READY
,
TAOS_VN_STATUS_RESET
)
!=
TAOS_VN_STATUS_READY
)
return
-
1
;
void
*
tsdb
=
pVnode
->
tsdb
;
pVnode
->
tsdb
=
NULL
;
// acquire vnode
int32_t
refCount
=
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
if
(
refCount
>
2
)
tsem_wait
(
&
pVnode
->
sem
);
// close tsdb, then open tsdb
tsdbCloseRepo
(
tsdb
,
0
);
STsdbAppH
appH
=
{
0
};
appH
.
appH
=
(
void
*
)
pVnode
;
appH
.
notifyStatus
=
vnodeProcessTsdbStatus
;
...
...
@@ -580,6 +588,22 @@ static void vnodeNotifyFileSynced(void *ahandle, uint64_t fversion) {
appH
.
cqCreateFunc
=
cqCreate
;
appH
.
cqDropFunc
=
cqDrop
;
pVnode
->
tsdb
=
tsdbOpenRepo
(
rootDir
,
&
appH
);
pVnode
->
status
=
TAOS_VN_STATUS_READY
;
vnodeRelease
(
pVnode
);
return
0
;
}
static
int
vnodeNotifyFileSynced
(
void
*
ahandle
,
uint64_t
fversion
)
{
SVnodeObj
*
pVnode
=
ahandle
;
vDebug
(
"vgId:%d, data file is synced, fversion:%"
PRId64
,
pVnode
->
vgId
,
fversion
);
pVnode
->
fversion
=
fversion
;
pVnode
->
version
=
fversion
;
vnodeSaveVersion
(
pVnode
);
return
vnodeResetTsdb
(
pVnode
);
}
static
int32_t
vnodeSaveCfg
(
SMDCreateVnodeMsg
*
pVnodeCfg
)
{
...
...
src/vnode/src/vnodeRead.c
浏览文件 @
6b984059
...
...
@@ -26,6 +26,7 @@
#include "tsdb.h"
#include "vnode.h"
#include "vnodeInt.h"
#include "tqueue.h"
static
int32_t
(
*
vnodeProcessReadMsgFp
[
TSDB_MSG_TYPE_MAX
])(
SVnodeObj
*
pVnode
,
SReadMsg
*
pReadMsg
);
static
int32_t
vnodeProcessQueryMsg
(
SVnodeObj
*
pVnode
,
SReadMsg
*
pReadMsg
);
...
...
@@ -51,6 +52,11 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
return
TSDB_CODE_VND_INVALID_STATUS
;
}
// tsdb may be in reset state
if
(
pVnode
->
tsdb
==
NULL
)
return
TSDB_CODE_RPC_NOT_READY
;
if
(
pVnode
->
status
==
TAOS_VN_STATUS_CLOSING
)
return
TSDB_CODE_RPC_NOT_READY
;
// TODO: Later, let slave to support query
if
(
pVnode
->
syncCfg
.
replica
>
1
&&
pVnode
->
role
!=
TAOS_SYNC_ROLE_MASTER
)
{
vDebug
(
"vgId:%d, msgType:%s not processed, replica:%d role:%d"
,
pVnode
->
vgId
,
taosMsg
[
msgType
],
pVnode
->
syncCfg
.
replica
,
pVnode
->
role
);
...
...
@@ -60,6 +66,16 @@ int32_t vnodeProcessRead(void *param, SReadMsg *pReadMsg) {
return
(
*
vnodeProcessReadMsgFp
[
msgType
])(
pVnode
,
pReadMsg
);
}
static
void
vnodePutItemIntoReadQueue
(
SVnodeObj
*
pVnode
,
void
*
qhandle
)
{
SReadMsg
*
pRead
=
(
SReadMsg
*
)
taosAllocateQitem
(
sizeof
(
SReadMsg
));
pRead
->
rpcMsg
.
msgType
=
TSDB_MSG_TYPE_QUERY
;
pRead
->
pCont
=
qhandle
;
pRead
->
contLen
=
0
;
atomic_add_fetch_32
(
&
pVnode
->
refCount
,
1
);
taosWriteQitem
(
pVnode
->
rqueue
,
TAOS_QTYPE_QUERY
,
pRead
);
}
static
int32_t
vnodeProcessQueryMsg
(
SVnodeObj
*
pVnode
,
SReadMsg
*
pReadMsg
)
{
void
*
pCont
=
pReadMsg
->
pCont
;
int32_t
contLen
=
pReadMsg
->
contLen
;
...
...
@@ -131,7 +147,7 @@ static int32_t vnodeProcessQueryMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
if
(
handle
!=
NULL
)
{
vDebug
(
"vgId:%d, QInfo:%p, dnode query msg disposed, register qhandle and return to app"
,
vgId
,
*
handle
);
d
nodePutItemIntoReadQueue
(
pVnode
,
*
handle
);
v
nodePutItemIntoReadQueue
(
pVnode
,
*
handle
);
qReleaseQInfo
(
pVnode
->
qMgmt
,
(
void
**
)
&
handle
,
false
);
}
...
...
@@ -208,7 +224,7 @@ static int32_t vnodeProcessFetchMsg(SVnodeObj *pVnode, SReadMsg *pReadMsg) {
}
else
{
// if failed to dump result, free qhandle immediately
if
((
code
=
qDumpRetrieveResult
(
*
handle
,
(
SRetrieveTableRsp
**
)
&
pRet
->
rsp
,
&
pRet
->
len
))
==
TSDB_CODE_SUCCESS
)
{
if
(
qHasMoreResultsToRetrieve
(
*
handle
))
{
d
nodePutItemIntoReadQueue
(
pVnode
,
*
handle
);
v
nodePutItemIntoReadQueue
(
pVnode
,
*
handle
);
pRet
->
qhandle
=
*
handle
;
freeHandle
=
false
;
}
else
{
...
...
src/vnode/src/vnodeWrite.c
浏览文件 @
6b984059
...
...
@@ -59,13 +59,18 @@ int32_t vnodeProcessWrite(void *param1, int qtype, void *param2, void *item) {
return
TSDB_CODE_VND_NO_WRITE_AUTH
;
}
// tsdb may be in reset state
if
(
pVnode
->
tsdb
==
NULL
)
return
TSDB_CODE_RPC_NOT_READY
;
if
(
pVnode
->
status
==
TAOS_VN_STATUS_CLOSING
)
return
TSDB_CODE_RPC_NOT_READY
;
if
(
pHead
->
version
==
0
)
{
// from client or CQ
if
(
pVnode
->
status
!=
TAOS_VN_STATUS_READY
)
{
vDebug
(
"vgId:%d, msgType:%s not processed, vnode status is %d"
,
pVnode
->
vgId
,
taosMsg
[
pHead
->
msgType
],
pVnode
->
status
);
return
TSDB_CODE_VND_INVALID_STATUS
;
// it may be in deleting or closing state
}
if
(
pVnode
->
syncCfg
.
replica
>
1
&&
pVnode
->
role
!=
TAOS_SYNC_ROLE_MASTER
)
{
if
(
pVnode
->
role
!=
TAOS_SYNC_ROLE_MASTER
)
{
vDebug
(
"vgId:%d, msgType:%s not processed, replica:%d role:%d"
,
pVnode
->
vgId
,
taosMsg
[
pHead
->
msgType
],
pVnode
->
syncCfg
.
replica
,
pVnode
->
role
);
return
TSDB_CODE_RPC_NOT_READY
;
}
...
...
tests/pytest/fulltest.sh
浏览文件 @
6b984059
...
...
@@ -143,6 +143,7 @@ python3 ./test.py -f query/filterOtherTypes.py
python3 ./test.py
-f
query/querySort.py
python3 ./test.py
-f
query/queryJoin.py
python3 ./test.py
-f
query/select_last_crash.py
python3 ./test.py
-f
query/queryNullValueTest.py
#stream
python3 ./test.py
-f
stream/metric_1.py
...
...
tests/pytest/query/queryInsertValue.py
0 → 100644
浏览文件 @
6b984059
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import
sys
import
taos
from
util.log
import
*
from
util.cases
import
*
from
util.sql
import
*
import
numpy
as
np
from
util.dnodes
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
(),
logSql
)
self
.
numOfRecords
=
10
self
.
ts
=
1537146000000
def
restartTaosd
(
self
):
tdDnodes
.
stop
(
1
)
tdDnodes
.
start
(
1
)
tdSql
.
execute
(
"use db"
)
def
run
(
self
):
tdSql
.
prepare
()
print
(
"==============step1"
)
tdSql
.
execute
(
"create table st (ts timestamp, speed int) tags(areaid int, loc nchar(20))"
)
tdSql
.
execute
(
"create table t1 using st tags(1, 'beijing')"
)
tdSql
.
execute
(
"insert into t1 values(now, 1)"
)
tdSql
.
query
(
"select * from st"
)
tdSql
.
checkRows
(
1
)
tdSql
.
execute
(
"alter table st add column length int"
)
tdSql
.
execute
(
"insert into t1 values(now, 1, 2)"
)
tdSql
.
query
(
"select last(*) from st"
)
tdSql
.
checkData
(
0
,
2
,
2
);
self
.
restartTaosd
();
tdSql
.
query
(
"select last(*) from st"
)
tdSql
.
checkData
(
0
,
2
,
2
);
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tests/pytest/query/queryNullValueTest.py
0 → 100644
浏览文件 @
6b984059
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import
sys
import
taos
from
util.log
import
*
from
util.cases
import
*
from
util.sql
import
*
import
numpy
as
np
from
util.dnodes
import
*
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
(),
logSql
)
self
.
numOfRecords
=
10
self
.
ts
=
1537146000000
def
checkNullValue
(
self
,
result
):
mx
=
np
.
array
(
result
)
[
rows
,
cols
]
=
mx
.
shape
for
i
in
range
(
rows
):
for
j
in
range
(
cols
):
if
j
+
1
<
cols
and
mx
[
i
,
j
+
1
]
is
not
None
:
print
(
mx
[
i
,
j
+
1
])
return
False
return
True
def
restartTaosd
(
self
):
tdDnodes
.
stop
(
1
)
tdDnodes
.
start
(
1
)
tdSql
.
execute
(
"use db"
)
def
run
(
self
):
tdSql
.
prepare
()
print
(
"==============step1"
)
tdSql
.
execute
(
"create table meters (ts timestamp, col1 int) tags(tgcol1 int)"
)
tdSql
.
execute
(
"create table t0 using meters tags(NULL)"
)
for
i
in
range
(
self
.
numOfRecords
):
tdSql
.
execute
(
"insert into t0 values (%d, %d)"
%
(
self
.
ts
+
i
,
i
));
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
execute
(
"alter table meters add column col2 tinyint"
)
tdSql
.
execute
(
"alter table meters drop column col1"
)
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
"select col2 from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
execute
(
"alter table meters add column col1 int"
)
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
"select col1 from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
execute
(
"alter table meters add column col3 smallint"
)
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
"select col3 from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
execute
(
"alter table meters add column col4 bigint"
)
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
"select col4 from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
execute
(
"alter table meters add column col5 float"
)
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
"select col5 from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
execute
(
"alter table meters add column col6 double"
)
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
"select col6 from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
execute
(
"alter table meters add column col7 bool"
)
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
"select col7 from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
execute
(
"alter table meters add column col8 binary(20)"
)
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
"select col8 from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
execute
(
"alter table meters add column col9 nchar(20)"
)
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
"select col9 from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
execute
(
"alter table meters add tag tgcol2 tinyint"
)
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
"select tgcol2 from meters"
)
tdSql
.
checkRows
(
1
)
tdSql
.
execute
(
"alter table meters add tag tgcol3 smallint"
)
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
"select tgcol3 from meters"
)
tdSql
.
checkRows
(
1
)
tdSql
.
execute
(
"alter table meters add tag tgcol4 bigint"
)
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
"select tgcol4 from meters"
)
tdSql
.
checkRows
(
1
)
tdSql
.
execute
(
"alter table meters add tag tgcol5 float"
)
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
"select tgcol5 from meters"
)
tdSql
.
checkRows
(
1
)
tdSql
.
execute
(
"alter table meters add tag tgcol6 double"
)
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
"select tgcol6 from meters"
)
tdSql
.
checkRows
(
1
)
tdSql
.
execute
(
"alter table meters add tag tgcol7 bool"
)
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
"select tgcol7 from meters"
)
tdSql
.
checkRows
(
1
)
tdSql
.
execute
(
"alter table meters add tag tgcol8 binary(20)"
)
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
"select tgcol8 from meters"
)
tdSql
.
checkRows
(
1
)
tdSql
.
execute
(
"alter table meters add tag tgcol9 nchar(20)"
)
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
tdSql
.
query
(
"select tgcol9 from meters"
)
tdSql
.
checkRows
(
1
)
self
.
restartTaosd
()
tdSql
.
query
(
"select * from meters"
)
tdSql
.
checkRows
(
10
)
if
self
.
checkNullValue
(
tdSql
.
queryResult
)
is
False
:
tdLog
.
exit
(
"non None value is detected"
)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tests/pytest/regressiontest.sh
浏览文件 @
6b984059
...
...
@@ -140,6 +140,7 @@ python3 ./test.py -f query/queryJoin.py
python3 ./test.py
-f
query/filterCombo.py
python3 ./test.py
-f
query/queryNormal.py
python3 ./test.py
-f
query/select_last_crash.py
python3 ./test.py
-f
query/queryNullValueTest.py
#stream
python3 ./test.py
-f
stream/stream1.py
...
...
tests/script/sh/deploy.sh
浏览文件 @
6b984059
...
...
@@ -136,6 +136,8 @@ echo "defaultPass taosdata" >> $TAOS_CFG
echo
"numOfLogLines 20000000"
>>
$TAOS_CFG
echo
"mnodeEqualVnodeNum 0"
>>
$TAOS_CFG
echo
"clog 2"
>>
$TAOS_CFG
#echo "cache 1" >> $TAOS_CFG
#echo "block 2" >> $TAOS_CFG
echo
"statusInterval 1"
>>
$TAOS_CFG
echo
"numOfTotalVnodes 4"
>>
$TAOS_CFG
echo
"maxVgroupsPerDb 4"
>>
$TAOS_CFG
...
...
tests/script/sh/exec.sh
浏览文件 @
6b984059
...
...
@@ -88,7 +88,9 @@ if [ "$EXEC_OPTON" = "start" ]; then
echo
"ExcuteCmd:"
$EXE_DIR
/taosd
-c
$CFG_DIR
if
[
"
$SHELL_OPTION
"
=
"true"
]
;
then
nohup
valgrind
--log-file
=
${
LOG_DIR
}
/valgrind.log
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
$EXE_DIR
/taosd
-c
$CFG_DIR
>
/dev/null 2>&1 &
TT
=
`
date
+%s
`
mkdir
${
LOG_DIR
}
/
${
TT
}
nohup
valgrind
--log-file
=
${
LOG_DIR
}
/
${
TT
}
/valgrind.log
--tool
=
memcheck
--leak-check
=
full
--show-reachable
=
no
--track-origins
=
yes
--show-leak-kinds
=
all
-v
--workaround-gcc296-bugs
=
yes
$EXE_DIR
/taosd
-c
$CFG_DIR
>
/dev/null 2>&1 &
else
nohup
$EXE_DIR
/taosd
-c
$CFG_DIR
>
/dev/null 2>&1 &
fi
...
...
tests/script/unique/vnode/replica2_a_large.sim
0 → 100644
浏览文件 @
6b984059
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/deploy.sh -n dnode2 -i 2
system sh/deploy.sh -n dnode3 -i 3
system sh/deploy.sh -n dnode4 -i 4
system sh/cfg.sh -n dnode1 -c wallevel -v 2
system sh/cfg.sh -n dnode2 -c wallevel -v 2
system sh/cfg.sh -n dnode3 -c wallevel -v 2
system sh/cfg.sh -n dnode4 -c wallevel -v 2
system sh/cfg.sh -n dnode1 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode2 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode3 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode4 -c numOfMnodes -v 1
system sh/cfg.sh -n dnode1 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode2 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode3 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode4 -c mnodeEqualVnodeNum -v 4
system sh/cfg.sh -n dnode1 -c debugFlag -v 131
system sh/cfg.sh -n dnode2 -c debugFlag -v 131
system sh/cfg.sh -n dnode3 -c debugFlag -v 131
system sh/cfg.sh -n dnode4 -c debugFlag -v 131
system sh/cfg.sh -n dnode1 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode2 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode3 -c arbitrator -v $arbitrator
system sh/cfg.sh -n dnode4 -c arbitrator -v $arbitrator
print ============== step0: start tarbitrator
system sh/exec_tarbitrator.sh -s start
system sh/exec.sh -n dnode1 -s start
sleep 3000
sql connect
sql create dnode $hostname2
sql create dnode $hostname3
system sh/exec.sh -n dnode2 -s start -t
system sh/exec.sh -n dnode3 -s start -t
sleep 3000
print ========= step1
sql create database db replica 2
#sql create table db.tb1 (ts timestamp, i int)
#sql create table db.tb2 (ts timestamp, i int)
#sql create table db.tb3 (ts timestamp, i int)
#sql create table db.tb4 (ts timestamp, i int)
#sql insert into db.tb1 values(now, 1)
#sql select count(*) from db.tb1
sql create database db replica 2
sql create table db.tb (ts timestamp, i int)
sql insert into db.tb values(now, 1)
sql select count(*) from db.tb
$lastRows = $rows
print ======== step2
#run_back unique/vnode/back_insert_many.sim
run_back unique/vnode/back_insert.sim
sleep 3000
print ======== step3
$x = 0
loop:
print ======== step4
system sh/exec.sh -n dnode2 -s stop -x SIGINT
sleep 10000
system sh/exec.sh -n dnode2 -s start -t
sleep 10000
print ======== step5
system sh/exec.sh -n dnode3 -s stop -x SIGINT
sleep 10000
system sh/exec.sh -n dnode3 -s start -t
sleep 10000
print ======== step6
#sql select count(*) from db.tb1
#print select count(*) from db.tb1 ==> $data00 $lastRows
sql select count(*) from db.tb
print select count(*) from db.tb ==> $data00 $lastRows
if $data00 <= $lastRows then
return -1
endi
print ======== step7
$lastRows = $data00
print ======== loop Times $x
if $x < 10 then
$x = $x + 1
goto loop
endi
system sh/exec.sh -n dnode1 -s stop -x SIGINT
system sh/exec.sh -n dnode2 -s stop -x SIGINT
system sh/exec.sh -n dnode3 -s stop -x SIGINT
system sh/exec.sh -n dnode4 -s stop -x SIGINT
system sh/exec.sh -n dnode5 -s stop -x SIGINT
system sh/exec.sh -n dnode6 -s stop -x SIGINT
system sh/exec.sh -n dnode7 -s stop -x SIGINT
system sh/exec.sh -n dnode8 -s stop -x SIGINT
\ No newline at end of file
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录