Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
慢慢CG
TDengine
提交
cc9759f3
T
TDengine
项目概览
慢慢CG
/
TDengine
与 Fork 源项目一致
Fork自
taosdata / TDengine
通知
1
Star
0
Fork
0
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
0
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
0
Issue
0
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
提交
cc9759f3
编写于
6月 03, 2021
作者:
B
bryanchang0603
浏览文件
操作
浏览文件
下载
差异文件
Merge branch 'develop' into test/TD-4463
上级
b5b17291
b141d319
变更
39
展开全部
隐藏空白更改
内联
并排
Showing
39 changed file
with
2017 addition
and
977 deletion
+2017
-977
.appveyor.yml
.appveyor.yml
+49
-49
cmake/env.inc
cmake/env.inc
+8
-6
cmake/input.inc
cmake/input.inc
+6
-0
src/client/inc/tsclient.h
src/client/inc/tsclient.h
+1
-0
src/client/src/tscSQLParser.c
src/client/src/tscSQLParser.c
+89
-5
src/client/src/tscStream.c
src/client/src/tscStream.c
+94
-9
src/common/inc/tdataformat.h
src/common/inc/tdataformat.h
+1
-1
src/common/inc/tglobal.h
src/common/inc/tglobal.h
+2
-1
src/common/src/tdataformat.c
src/common/src/tdataformat.c
+10
-5
src/common/src/tglobal.c
src/common/src/tglobal.c
+3
-2
src/cq/src/cqMain.c
src/cq/src/cqMain.c
+5
-2
src/inc/taosdef.h
src/inc/taosdef.h
+2
-0
src/inc/taosmsg.h
src/inc/taosmsg.h
+1
-0
src/inc/tfs.h
src/inc/tfs.h
+2
-0
src/inc/tsdb.h
src/inc/tsdb.h
+3
-0
src/inc/ttokendef.h
src/inc/ttokendef.h
+56
-48
src/mnode/src/mnodeDnode.c
src/mnode/src/mnodeDnode.c
+1
-1
src/mnode/src/mnodeTable.c
src/mnode/src/mnodeTable.c
+14
-3
src/query/inc/sql.y
src/query/inc/sql.y
+34
-0
src/query/src/qSqlParser.c
src/query/src/qSqlParser.c
+1
-1
src/query/src/sql.c
src/query/src/sql.c
+1062
-670
src/sync/src/syncMain.c
src/sync/src/syncMain.c
+6
-1
src/tsdb/CMakeLists.txt
src/tsdb/CMakeLists.txt
+4
-0
src/tsdb/inc/tsdbCommit.h
src/tsdb/inc/tsdbCommit.h
+7
-0
src/tsdb/inc/tsdbCommitQueue.h
src/tsdb/inc/tsdbCommitQueue.h
+3
-1
src/tsdb/inc/tsdbCompact.h
src/tsdb/inc/tsdbCompact.h
+28
-0
src/tsdb/inc/tsdbint.h
src/tsdb/inc/tsdbint.h
+2
-0
src/tsdb/src/tsdbCommit.c
src/tsdb/src/tsdbCommit.c
+172
-156
src/tsdb/src/tsdbCommitQueue.c
src/tsdb/src/tsdbCommitQueue.c
+18
-7
src/tsdb/src/tsdbCompact.c
src/tsdb/src/tsdbCompact.c
+9
-1
src/tsdb/src/tsdbMemTable.c
src/tsdb/src/tsdbMemTable.c
+1
-1
src/tsdb/src/tsdbReadImpl.c
src/tsdb/src/tsdbReadImpl.c
+2
-2
src/util/src/ttokenizer.c
src/util/src/ttokenizer.c
+2
-1
tests/pytest/fulltest.sh
tests/pytest/fulltest.sh
+2
-0
tests/pytest/functions/function_session.py
tests/pytest/functions/function_session.py
+86
-0
tests/pytest/functions/function_stateWindow.py
tests/pytest/functions/function_stateWindow.py
+109
-0
tests/pytest/query/queryInsertValue.py
tests/pytest/query/queryInsertValue.py
+1
-1
tests/pytest/util/dnodes.py
tests/pytest/util/dnodes.py
+3
-3
tests/script/general/parser/alter_column.sim
tests/script/general/parser/alter_column.sim
+118
-0
未找到文件。
.appveyor.yml
浏览文件 @
cc9759f3
version
:
1.0.{build}
image
:
-
Visual Studio
2015
-
macos
environment
:
matrix
:
-
ARCH
:
amd64
-
ARCH
:
x86
matrix
:
exclude
:
-
image
:
macos
ARCH
:
x86
for
:
-
matrix
:
only
:
-
image
:
Visual Studio
2015
clone_folder
:
c:\dev\TDengine
clone_depth
:
1
init
:
-
call "C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\vcvarsall.bat" %ARCH%
before_build
:
-
cd c:\dev\TDengine
-
md build
build_script
:
-
cd build
-
cmake -G "NMake Makefiles" ..
-
nmake install
-
matrix
:
only
:
- image
:
macos
clone_depth
:
1
build_script
:
-
mkdir debug
-
cd debug
-
cmake .. > /dev/null
-
make > /dev/null
notifications
:
-
provider
:
Email
to
:
-
sangshuduo@gmail.com
on_build_success
:
true
on_build_failure
:
true
on_build_status_changed
:
true
version
:
1.0.{build}
image
:
-
Visual Studio
2015
-
macos
environment
:
matrix
:
-
ARCH
:
amd64
-
ARCH
:
x86
matrix
:
exclude
:
-
image
:
macos
ARCH
:
x86
for
:
-
matrix
:
only
:
-
image
:
Visual Studio
2015
clone_folder
:
c:\dev\TDengine
clone_depth
:
1
init
:
-
call "C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\vcvarsall.bat" %ARCH%
before_build
:
-
cd c:\dev\TDengine
-
md build
build_script
:
-
cd build
-
cmake -G "NMake Makefiles" ..
-DBUILD_JDBC=false
-
nmake install
-
matrix
:
only
:
-
image
:
macos
clone_depth
:
1
build_script
:
-
mkdir debug
-
cd debug
-
cmake .. > /dev/null
-
make > /dev/null
notifications
:
-
provider
:
Email
to
:
-
sangshuduo@gmail.com
on_build_success
:
true
on_build_failure
:
true
on_build_status_changed
:
true
cmake/env.inc
浏览文件 @
cc9759f3
...
...
@@ -14,11 +14,13 @@ MESSAGE(STATUS "Project binary files output path: " ${PROJECT_BINARY_DIR})
MESSAGE
(
STATUS
"Project executable files output path: "
$
{
EXECUTABLE_OUTPUT_PATH
})
MESSAGE
(
STATUS
"Project library files output path: "
$
{
LIBRARY_OUTPUT_PATH
})
FIND_PROGRAM
(
TD_MVN_INSTALLED
mvn
)
IF
(
TD_MVN_INSTALLED
)
MESSAGE
(
STATUS
"MVN is installed and JDBC will be compiled"
)
ELSE
()
MESSAGE
(
STATUS
"MVN is not installed and JDBC is not compiled"
)
IF
(
TD_BUILD_JDBC
)
FIND_PROGRAM
(
TD_MVN_INSTALLED
mvn
)
IF
(
TD_MVN_INSTALLED
)
MESSAGE
(
STATUS
"MVN is installed and JDBC will be compiled"
)
ELSE
()
MESSAGE
(
STATUS
"MVN is not installed and JDBC is not compiled"
)
ENDIF
()
ENDIF
()
#
...
...
@@ -55,4 +57,4 @@ ELSE ()
SET
(
CMAKE_BUILD_TYPE
"Debug"
)
MESSAGE
(
STATUS
"Build Debug Version as default"
)
ENDIF
()
ENDIF
()
\ No newline at end of file
ENDIF
()
cmake/input.inc
浏览文件 @
cc9759f3
...
...
@@ -77,3 +77,9 @@ IF (${JEMALLOC_ENABLED} MATCHES "true")
SET
(
TD_JEMALLOC_ENABLED
TRUE
)
MESSAGE
(
STATUS
"build with jemalloc enabled"
)
ENDIF
()
SET
(
TD_BUILD_JDBC
TRUE
)
IF
(
$
{
BUILD_JDBC
}
MATCHES
"false"
)
SET
(
TD_BUILD_JDBC
FALSE
)
ENDIF
()
src/client/inc/tsclient.h
浏览文件 @
cc9759f3
...
...
@@ -283,6 +283,7 @@ typedef struct SSqlStream {
int64_t
ctime
;
// stream created time
int64_t
stime
;
// stream next executed time
int64_t
etime
;
// stream end query time, when time is larger then etime, the stream will be closed
int64_t
ltime
;
// stream last row time in stream table
SInterval
interval
;
void
*
pTimer
;
...
...
src/client/src/tscSQLParser.c
浏览文件 @
cc9759f3
...
...
@@ -5142,6 +5142,10 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
const
char
*
msg18
=
"primary timestamp column cannot be dropped"
;
const
char
*
msg19
=
"invalid new tag name"
;
const
char
*
msg20
=
"table is not super table"
;
const
char
*
msg21
=
"only binary/nchar column length could be modified"
;
const
char
*
msg22
=
"new column length should be bigger than old one"
;
const
char
*
msg23
=
"only column length coulbe be modified"
;
const
char
*
msg24
=
"invalid binary/nchar column length"
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
...
...
@@ -5172,13 +5176,13 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
}
if
(
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_ADD_TAG_COLUMN
||
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_DROP_TAG_COLUMN
||
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN
)
{
if
(
UTIL_TABLE_IS_NORMAL
_TABLE
(
pTableMetaInfo
))
{
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_CHANGE_TAG_COLUMN
||
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN
)
{
if
(
!
UTIL_TABLE_IS_SUPER
_TABLE
(
pTableMetaInfo
))
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg3
);
}
}
else
if
((
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_UPDATE_TAG_VAL
)
&&
(
UTIL_TABLE_IS_SUPER_TABLE
(
pTableMetaInfo
)))
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg4
);
}
else
if
((
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_ADD_COLUMN
||
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_DROP_COLUMN
)
&&
}
else
if
((
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_ADD_COLUMN
||
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_DROP_COLUMN
||
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_CHANGE_COLUMN
)
&&
UTIL_TABLE_IS_CHILD_TABLE
(
pTableMetaInfo
))
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg6
);
}
...
...
@@ -5394,6 +5398,85 @@ int32_t setAlterTableInfo(SSqlObj* pSql, struct SSqlInfo* pInfo) {
tstrncpy
(
name1
,
pItem
->
pVar
.
pz
,
sizeof
(
name1
));
TAOS_FIELD
f
=
tscCreateField
(
TSDB_DATA_TYPE_INT
,
name1
,
tDataTypes
[
TSDB_DATA_TYPE_INT
].
bytes
);
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
}
else
if
(
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_CHANGE_COLUMN
)
{
if
(
taosArrayGetSize
(
pAlterSQL
->
pAddColumns
)
>=
2
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg16
);
}
TAOS_FIELD
*
pItem
=
taosArrayGet
(
pAlterSQL
->
pAddColumns
,
0
);
if
(
pItem
->
type
!=
TSDB_DATA_TYPE_BINARY
&&
pItem
->
type
!=
TSDB_DATA_TYPE_NCHAR
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg21
);
}
SColumnIndex
columnIndex
=
COLUMN_INDEX_INITIALIZER
;
SStrToken
name
=
{.
type
=
TK_STRING
,
.
z
=
pItem
->
name
,
.
n
=
(
uint32_t
)
strlen
(
pItem
->
name
)};
if
(
getColumnIndexByName
(
pCmd
,
&
name
,
pQueryInfo
,
&
columnIndex
)
!=
TSDB_CODE_SUCCESS
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg17
);
}
SSchema
*
pColSchema
=
tscGetTableColumnSchema
(
pTableMetaInfo
->
pTableMeta
,
columnIndex
.
columnIndex
);
if
(
pColSchema
->
type
!=
TSDB_DATA_TYPE_BINARY
&&
pColSchema
->
type
!=
TSDB_DATA_TYPE_NCHAR
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg21
);
}
if
(
pItem
->
type
!=
pColSchema
->
type
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg23
);
}
if
((
pItem
->
type
==
TSDB_DATA_TYPE_BINARY
&&
(
pItem
->
bytes
<=
0
||
pItem
->
bytes
>
TSDB_MAX_BINARY_LEN
))
||
(
pItem
->
type
==
TSDB_DATA_TYPE_NCHAR
&&
(
pItem
->
bytes
<=
0
||
pItem
->
bytes
>
TSDB_MAX_NCHAR_LEN
)))
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg24
);
}
if
(
pItem
->
bytes
<=
pColSchema
->
bytes
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg22
);
}
TAOS_FIELD
f
=
tscCreateField
(
pColSchema
->
type
,
name
.
z
,
pItem
->
bytes
);
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
}
else
if
(
pAlterSQL
->
type
==
TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN
)
{
if
(
taosArrayGetSize
(
pAlterSQL
->
pAddColumns
)
>=
2
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg16
);
}
TAOS_FIELD
*
pItem
=
taosArrayGet
(
pAlterSQL
->
pAddColumns
,
0
);
if
(
pItem
->
type
!=
TSDB_DATA_TYPE_BINARY
&&
pItem
->
type
!=
TSDB_DATA_TYPE_NCHAR
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg21
);
}
SColumnIndex
columnIndex
=
COLUMN_INDEX_INITIALIZER
;
SStrToken
name
=
{.
type
=
TK_STRING
,
.
z
=
pItem
->
name
,
.
n
=
(
uint32_t
)
strlen
(
pItem
->
name
)};
if
(
getColumnIndexByName
(
pCmd
,
&
name
,
pQueryInfo
,
&
columnIndex
)
!=
TSDB_CODE_SUCCESS
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg17
);
}
SSchema
*
pColSchema
=
tscGetTableColumnSchema
(
pTableMetaInfo
->
pTableMeta
,
columnIndex
.
columnIndex
);
if
(
columnIndex
.
columnIndex
<
tscGetNumOfColumns
(
pTableMetaInfo
->
pTableMeta
))
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg10
);
}
if
(
pColSchema
->
type
!=
TSDB_DATA_TYPE_BINARY
&&
pColSchema
->
type
!=
TSDB_DATA_TYPE_NCHAR
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg21
);
}
if
(
pItem
->
type
!=
pColSchema
->
type
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg23
);
}
if
((
pItem
->
type
==
TSDB_DATA_TYPE_BINARY
&&
(
pItem
->
bytes
<=
0
||
pItem
->
bytes
>
TSDB_MAX_BINARY_LEN
))
||
(
pItem
->
type
==
TSDB_DATA_TYPE_NCHAR
&&
(
pItem
->
bytes
<=
0
||
pItem
->
bytes
>
TSDB_MAX_NCHAR_LEN
)))
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg24
);
}
if
(
pItem
->
bytes
<=
pColSchema
->
bytes
)
{
return
invalidOperationMsg
(
tscGetErrorMsgPayload
(
pCmd
),
msg22
);
}
TAOS_FIELD
f
=
tscCreateField
(
pColSchema
->
type
,
name
.
z
,
pItem
->
bytes
);
tscFieldInfoAppend
(
&
pQueryInfo
->
fieldsInfo
,
&
f
);
}
return
TSDB_CODE_SUCCESS
;
...
...
@@ -7186,8 +7269,9 @@ static int32_t getTableNameFromSqlNode(SSqlNode* pSqlNode, SArray* tableNameList
}
SName
name
=
{
0
};
if
(
tscSetTableFullName
(
&
name
,
t
,
pSql
)
!=
TSDB_CODE_SUCCESS
)
{
return
invalidOperationMsg
(
msgBuf
,
msg1
);
int32_t
code
=
tscSetTableFullName
(
&
name
,
t
,
pSql
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
return
code
;
}
taosArrayPush
(
tableNameList
,
&
name
);
...
...
src/client/src/tscStream.c
浏览文件 @
cc9759f3
...
...
@@ -24,6 +24,7 @@
#include "tutil.h"
#include "tscProfile.h"
#include "tscSubquery.h"
static
void
tscProcessStreamQueryCallback
(
void
*
param
,
TAOS_RES
*
tres
,
int
numOfRows
);
static
void
tscProcessStreamRetrieveResult
(
void
*
param
,
TAOS_RES
*
res
,
int
numOfRows
);
...
...
@@ -47,8 +48,8 @@ static bool isProjectStream(SQueryInfo* pQueryInfo) {
static
int64_t
tscGetRetryDelayTime
(
SSqlStream
*
pStream
,
int64_t
slidingTime
,
int16_t
prec
)
{
float
retryRangeFactor
=
0
.
3
f
;
int64_t
retryDelta
=
(
int64_t
)(
ts
StreamCompRetry
Delay
*
retryRangeFactor
);
retryDelta
=
((
rand
()
%
retryDelta
)
+
ts
StreamCompRetry
Delay
)
*
1000L
;
int64_t
retryDelta
=
(
int64_t
)(
ts
RetryStreamComp
Delay
*
retryRangeFactor
);
retryDelta
=
((
rand
()
%
retryDelta
)
+
ts
RetryStreamComp
Delay
)
*
1000L
;
if
(
pStream
->
interval
.
intervalUnit
!=
'n'
&&
pStream
->
interval
.
intervalUnit
!=
'y'
)
{
// change to ms
...
...
@@ -575,6 +576,14 @@ static void tscCreateStream(void *param, TAOS_RES *res, int code) {
pStream
->
stime
=
tscGetStreamStartTimestamp
(
pSql
,
pStream
,
pStream
->
stime
);
// set stime with ltime if ltime > stime
const
char
*
dstTable
=
pStream
->
dstTable
?
pStream
->
dstTable
:
""
;
tscDebug
(
" CQ table=%s ltime is %"
PRId64
,
dstTable
,
pStream
->
ltime
);
if
(
pStream
->
ltime
!=
INT64_MIN
&&
pStream
->
ltime
>
pStream
->
stime
)
{
tscWarn
(
" CQ set stream %s stime=%"
PRId64
" replace with ltime=%"
PRId64
" if ltime>0 "
,
dstTable
,
pStream
->
stime
,
pStream
->
ltime
);
pStream
->
stime
=
pStream
->
ltime
;
}
int64_t
starttime
=
tscGetLaunchTimestamp
(
pStream
);
pCmd
->
command
=
TSDB_SQL_SELECT
;
...
...
@@ -590,7 +599,66 @@ void tscSetStreamDestTable(SSqlStream* pStream, const char* dstTable) {
pStream
->
dstTable
=
dstTable
;
}
TAOS_STREAM
*
taos_open_stream
(
TAOS
*
taos
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
// fetchFp call back
void
fetchFpStreamLastRow
(
void
*
param
,
TAOS_RES
*
res
,
int
num
)
{
SSqlStream
*
pStream
=
(
SSqlStream
*
)
param
;
SSqlObj
*
pSql
=
res
;
// get row data set to ltime
tscSetSqlOwner
(
pSql
);
TAOS_ROW
row
=
doSetResultRowData
(
pSql
);
if
(
row
&&
row
[
0
]
)
{
pStream
->
ltime
=
*
((
int64_t
*
)
row
[
0
]);
const
char
*
dstTable
=
pStream
->
dstTable
?
pStream
->
dstTable
:
""
;
tscDebug
(
" CQ stream table=%s last row time=%"
PRId64
" ."
,
dstTable
,
pStream
->
ltime
);
}
tscClearSqlOwner
(
pSql
);
// no condition call
tscCreateStream
(
param
,
pStream
->
pSql
,
TSDB_CODE_SUCCESS
);
taos_free_result
(
res
);
}
// fp callback
void
fpStreamLastRow
(
void
*
param
,
TAOS_RES
*
res
,
int
code
)
{
// check result successful
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
tscCreateStream
(
param
,
res
,
TSDB_CODE_SUCCESS
);
taos_free_result
(
res
);
return
;
}
// asynchronous fetch last row data
taos_fetch_rows_a
(
res
,
fetchFpStreamLastRow
,
param
);
}
void
cbParseSql
(
void
*
param
,
TAOS_RES
*
res
,
int
code
)
{
// check result successful
SSqlStream
*
pStream
=
(
SSqlStream
*
)
param
;
SSqlObj
*
pSql
=
pStream
->
pSql
;
SSqlCmd
*
pCmd
=
&
pSql
->
cmd
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
pSql
->
res
.
code
=
code
;
tscDebug
(
"0x%"
PRIx64
" open stream parse sql failed, sql:%s, reason:%s, code:%s"
,
pSql
->
self
,
pSql
->
sqlstr
,
pCmd
->
payload
,
tstrerror
(
code
));
pStream
->
fp
(
pStream
->
param
,
NULL
,
NULL
);
return
;
}
// check dstTable valid
if
(
pStream
->
dstTable
==
NULL
||
strlen
(
pStream
->
dstTable
)
==
0
)
{
tscDebug
(
" cbParseSql dstTable is empty."
);
tscCreateStream
(
param
,
res
,
code
);
return
;
}
// query stream last row time async
char
sql
[
128
]
=
""
;
sprintf
(
sql
,
"select last_row(*) from %s;"
,
pStream
->
dstTable
);
taos_query_a
(
pSql
->
pTscObj
,
sql
,
fpStreamLastRow
,
param
);
return
;
}
TAOS_STREAM
*
taos_open_stream_withname
(
TAOS
*
taos
,
const
char
*
dstTable
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
int64_t
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
))
{
STscObj
*
pObj
=
(
STscObj
*
)
taos
;
if
(
pObj
==
NULL
||
pObj
->
signature
!=
pObj
)
return
NULL
;
...
...
@@ -613,11 +681,16 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
return
NULL
;
}
pStream
->
stime
=
stime
;
pStream
->
fp
=
fp
;
pStream
->
ltime
=
INT64_MIN
;
pStream
->
stime
=
stime
;
pStream
->
fp
=
fp
;
pStream
->
callback
=
callback
;
pStream
->
param
=
param
;
pStream
->
pSql
=
pSql
;
pStream
->
param
=
param
;
pStream
->
pSql
=
pSql
;
pSql
->
pStream
=
pStream
;
pSql
->
param
=
pStream
;
pSql
->
maxRetry
=
TSDB_MAX_REPLICA
;
tscSetStreamDestTable
(
pStream
,
dstTable
);
pSql
->
pStream
=
pStream
;
pSql
->
param
=
pStream
;
...
...
@@ -640,10 +713,17 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
tscDebugL
(
"0x%"
PRIx64
" SQL: %s"
,
pSql
->
self
,
pSql
->
sqlstr
);
pSql
->
fp
=
cbParseSql
;
pSql
->
fetchFp
=
cbParseSql
;
registerSqlObj
(
pSql
);
int32_t
code
=
tsParseSql
(
pSql
,
true
);
if
(
code
==
TSDB_CODE_SUCCESS
)
{
tscCreateStream
(
pStream
,
pSql
,
code
);
}
else
if
(
code
!=
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
cbParseSql
(
pStream
,
pSql
,
code
);
}
else
if
(
code
==
TSDB_CODE_TSC_ACTION_IN_PROGRESS
)
{
tscDebug
(
" CQ taso_open_stream IN Process. sql=%s"
,
sqlstr
);
}
else
{
tscError
(
"0x%"
PRIx64
" open stream failed, sql:%s, code:%s"
,
pSql
->
self
,
sqlstr
,
tstrerror
(
code
));
taosReleaseRef
(
tscObjRef
,
pSql
->
self
);
free
(
pStream
);
...
...
@@ -653,6 +733,11 @@ TAOS_STREAM *taos_open_stream(TAOS *taos, const char *sqlstr, void (*fp)(void *p
return
pStream
;
}
TAOS_STREAM
*
taos_open_stream
(
TAOS
*
taos
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
int64_t
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
))
{
return
taos_open_stream_withname
(
taos
,
""
,
sqlstr
,
fp
,
stime
,
param
,
callback
);
}
void
taos_close_stream
(
TAOS_STREAM
*
handle
)
{
SSqlStream
*
pStream
=
(
SSqlStream
*
)
handle
;
...
...
src/common/inc/tdataformat.h
浏览文件 @
cc9759f3
...
...
@@ -319,7 +319,7 @@ int tdInitDataCols(SDataCols *pCols, STSchema *pSchema);
SDataCols
*
tdDupDataCols
(
SDataCols
*
pCols
,
bool
keepData
);
SDataCols
*
tdFreeDataCols
(
SDataCols
*
pCols
);
void
tdAppendDataRowToDataCol
(
SDataRow
row
,
STSchema
*
pSchema
,
SDataCols
*
pCols
);
int
tdMergeDataCols
(
SDataCols
*
target
,
SDataCols
*
s
rc
,
int
rowsToMerge
);
int
tdMergeDataCols
(
SDataCols
*
target
,
SDataCols
*
s
ource
,
int
rowsToMerge
,
int
*
pOffset
);
// ----------------- K-V data row structure
/*
...
...
src/common/inc/tglobal.h
浏览文件 @
cc9759f3
...
...
@@ -39,6 +39,7 @@ extern int8_t tsEnableTelemetryReporting;
extern
char
tsEmail
[];
extern
char
tsArbitrator
[];
extern
int8_t
tsArbOnline
;
extern
int64_t
tsArbOnlineTimestamp
;
extern
int32_t
tsDnodeId
;
// common
...
...
@@ -75,7 +76,7 @@ extern int32_t tsMinSlidingTime;
extern
int32_t
tsMinIntervalTime
;
extern
int32_t
tsMaxStreamComputDelay
;
extern
int32_t
tsStreamCompStartDelay
;
extern
int32_t
ts
StreamCompRetry
Delay
;
extern
int32_t
ts
RetryStreamComp
Delay
;
extern
float
tsStreamComputDelayRatio
;
// the delayed computing ration of the whole time window
extern
int32_t
tsProjectExecInterval
;
extern
int64_t
tsMaxRetentWindow
;
...
...
src/common/src/tdataformat.c
浏览文件 @
cc9759f3
...
...
@@ -441,30 +441,35 @@ void tdAppendDataRowToDataCol(SDataRow row, STSchema *pSchema, SDataCols *pCols)
pCols
->
numOfRows
++
;
}
int
tdMergeDataCols
(
SDataCols
*
target
,
SDataCols
*
source
,
int
rowsToMerge
)
{
int
tdMergeDataCols
(
SDataCols
*
target
,
SDataCols
*
source
,
int
rowsToMerge
,
int
*
pOffset
)
{
ASSERT
(
rowsToMerge
>
0
&&
rowsToMerge
<=
source
->
numOfRows
);
ASSERT
(
target
->
numOfCols
==
source
->
numOfCols
);
int
offset
=
0
;
if
(
pOffset
==
NULL
)
{
pOffset
=
&
offset
;
}
SDataCols
*
pTarget
=
NULL
;
if
(
dataColsKeyLast
(
target
)
<
dataColsKeyFirst
(
source
))
{
// No overlap
if
(
(
target
->
numOfRows
==
0
)
||
(
dataColsKeyLast
(
target
)
<
dataColsKeyFirst
(
source
)
))
{
// No overlap
ASSERT
(
target
->
numOfRows
+
rowsToMerge
<=
target
->
maxPoints
);
for
(
int
i
=
0
;
i
<
rowsToMerge
;
i
++
)
{
for
(
int
j
=
0
;
j
<
source
->
numOfCols
;
j
++
)
{
if
(
source
->
cols
[
j
].
len
>
0
)
{
dataColAppendVal
(
target
->
cols
+
j
,
tdGetColDataOfRow
(
source
->
cols
+
j
,
i
),
target
->
numOfRows
,
dataColAppendVal
(
target
->
cols
+
j
,
tdGetColDataOfRow
(
source
->
cols
+
j
,
i
+
(
*
pOffset
)
),
target
->
numOfRows
,
target
->
maxPoints
);
}
}
target
->
numOfRows
++
;
}
(
*
pOffset
)
+=
rowsToMerge
;
}
else
{
pTarget
=
tdDupDataCols
(
target
,
true
);
if
(
pTarget
==
NULL
)
goto
_err
;
int
iter1
=
0
;
int
iter2
=
0
;
tdMergeTwoDataCols
(
target
,
pTarget
,
&
iter1
,
pTarget
->
numOfRows
,
source
,
&
iter2
,
source
->
numOfRows
,
tdMergeTwoDataCols
(
target
,
pTarget
,
&
iter1
,
pTarget
->
numOfRows
,
source
,
pOffset
,
source
->
numOfRows
,
pTarget
->
numOfRows
+
rowsToMerge
);
}
...
...
src/common/src/tglobal.c
浏览文件 @
cc9759f3
...
...
@@ -42,6 +42,7 @@ int32_t tsNumOfMnodes = 3;
int8_t
tsEnableVnodeBak
=
1
;
int8_t
tsEnableTelemetryReporting
=
1
;
int8_t
tsArbOnline
=
0
;
int64_t
tsArbOnlineTimestamp
=
TSDB_ARB_DUMMY_TIME
;
char
tsEmail
[
TSDB_FQDN_LEN
]
=
{
0
};
int32_t
tsDnodeId
=
0
;
...
...
@@ -93,7 +94,7 @@ int32_t tsMaxStreamComputDelay = 20000;
int32_t
tsStreamCompStartDelay
=
10000
;
// the stream computing delay time after executing failed, change accordingly
int32_t
ts
StreamCompRetryDelay
=
1
0
;
int32_t
ts
RetryStreamCompDelay
=
10
*
100
0
;
// The delayed computing ration. 10% of the whole computing time window by default.
float
tsStreamComputDelayRatio
=
0
.
1
f
;
...
...
@@ -710,7 +711,7 @@ static void doInitGlobalConfig(void) {
taosInitConfigOption
(
cfg
);
cfg
.
option
=
"retryStreamCompDelay"
;
cfg
.
ptr
=
&
ts
StreamCompRetry
Delay
;
cfg
.
ptr
=
&
ts
RetryStreamComp
Delay
;
cfg
.
valType
=
TAOS_CFG_VTYPE_INT32
;
cfg
.
cfgType
=
TSDB_CFG_CTYPE_B_CONFIG
|
TSDB_CFG_CTYPE_B_SHOW
;
cfg
.
minValue
=
10
;
...
...
src/cq/src/cqMain.c
浏览文件 @
cc9759f3
...
...
@@ -437,6 +437,10 @@ static void cqProcessCreateTimer(void *param, void *tmrId) {
taosReleaseRef
(
cqObjRef
,
(
int64_t
)
param
);
}
// inner implement in tscStream.c
TAOS_STREAM
*
taos_open_stream_withname
(
TAOS
*
taos
,
const
char
*
desName
,
const
char
*
sqlstr
,
void
(
*
fp
)(
void
*
param
,
TAOS_RES
*
,
TAOS_ROW
row
),
int64_t
stime
,
void
*
param
,
void
(
*
callback
)(
void
*
));
static
void
cqCreateStream
(
SCqContext
*
pContext
,
SCqObj
*
pObj
)
{
pObj
->
pContext
=
pContext
;
...
...
@@ -449,11 +453,10 @@ static void cqCreateStream(SCqContext *pContext, SCqObj *pObj) {
pObj
->
tmrId
=
0
;
if
(
pObj
->
pStream
==
NULL
)
{
pObj
->
pStream
=
taos_open_stream
(
pContext
->
dbConn
,
pObj
->
sqlStr
,
cqProcessStreamRes
,
INT64_MIN
,
(
void
*
)
pObj
->
rid
,
NULL
);
pObj
->
pStream
=
taos_open_stream
_withname
(
pContext
->
dbConn
,
pObj
->
dstTable
,
pObj
->
sqlStr
,
cqProcessStreamRes
,
INT64_MIN
,
(
void
*
)
pObj
->
rid
,
NULL
);
// TODO the pObj->pStream may be released if error happens
if
(
pObj
->
pStream
)
{
tscSetStreamDestTable
(
pObj
->
pStream
,
pObj
->
dstTable
);
pContext
->
num
++
;
cDebug
(
"vgId:%d, id:%d CQ:%s is opened"
,
pContext
->
vgId
,
pObj
->
tid
,
pObj
->
sqlStr
);
}
else
{
...
...
src/inc/taosdef.h
浏览文件 @
cc9759f3
...
...
@@ -375,6 +375,8 @@ do { \
#define TSDB_MAX_WAL_SIZE (1024*1024*3)
#define TSDB_ARB_DUMMY_TIME 4765104000000 // 2121-01-01 00:00:00.000, :P
typedef
enum
{
TAOS_QTYPE_RPC
=
0
,
TAOS_QTYPE_FWD
=
1
,
...
...
src/inc/taosmsg.h
浏览文件 @
cc9759f3
...
...
@@ -161,6 +161,7 @@ enum _mgmt_table {
#define TSDB_ALTER_TABLE_ADD_COLUMN 5
#define TSDB_ALTER_TABLE_DROP_COLUMN 6
#define TSDB_ALTER_TABLE_CHANGE_COLUMN 7
#define TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN 8
#define TSDB_FILL_NONE 0
#define TSDB_FILL_NULL 1
...
...
src/inc/tfs.h
浏览文件 @
cc9759f3
...
...
@@ -31,6 +31,8 @@ typedef struct {
#define TFS_UNDECIDED_ID -1
#define TFS_PRIMARY_LEVEL 0
#define TFS_PRIMARY_ID 0
#define TFS_MIN_LEVEL 0
#define TFS_MAX_LEVEL (TSDB_MAX_TIERS - 1)
// FS APIs ====================================
typedef
struct
{
...
...
src/inc/tsdb.h
浏览文件 @
cc9759f3
...
...
@@ -409,6 +409,9 @@ void tsdbDecCommitRef(int vgId);
int
tsdbSyncSend
(
void
*
pRepo
,
SOCKET
socketFd
);
int
tsdbSyncRecv
(
void
*
pRepo
,
SOCKET
socketFd
);
// For TSDB Compact
int
tsdbCompact
(
STsdbRepo
*
pRepo
);
#ifdef __cplusplus
}
#endif
...
...
src/inc/ttokendef.h
浏览文件 @
cc9759f3
...
...
@@ -156,54 +156,62 @@
#define TK_SYNCDB 137
#define TK_ADD 138
#define TK_COLUMN 139
#define TK_TAG 140
#define TK_CHANGE 141
#define TK_SET 142
#define TK_KILL 143
#define TK_CONNECTION 144
#define TK_STREAM 145
#define TK_COLON 146
#define TK_ABORT 147
#define TK_AFTER 148
#define TK_ATTACH 149
#define TK_BEFORE 150
#define TK_BEGIN 151
#define TK_CASCADE 152
#define TK_CLUSTER 153
#define TK_CONFLICT 154
#define TK_COPY 155
#define TK_DEFERRED 156
#define TK_DELIMITERS 157
#define TK_DETACH 158
#define TK_EACH 159
#define TK_END 160
#define TK_EXPLAIN 161
#define TK_FAIL 162
#define TK_FOR 163
#define TK_IGNORE 164
#define TK_IMMEDIATE 165
#define TK_INITIALLY 166
#define TK_INSTEAD 167
#define TK_MATCH 168
#define TK_KEY 169
#define TK_OF 170
#define TK_RAISE 171
#define TK_REPLACE 172
#define TK_RESTRICT 173
#define TK_ROW 174
#define TK_STATEMENT 175
#define TK_TRIGGER 176
#define TK_VIEW 177
#define TK_SEMI 178
#define TK_NONE 179
#define TK_PREV 180
#define TK_LINEAR 181
#define TK_IMPORT 182
#define TK_TBNAME 183
#define TK_JOIN 184
#define TK_INSERT 185
#define TK_INTO 186
#define TK_VALUES 187
#define TK_MODIFY 140
#define TK_TAG 141
#define TK_CHANGE 142
#define TK_SET 143
#define TK_KILL 144
#define TK_CONNECTION 145
#define TK_STREAM 146
#define TK_COLON 147
#define TK_ABORT 148
#define TK_AFTER 149
#define TK_ATTACH 150
#define TK_BEFORE 151
#define TK_BEGIN 152
#define TK_CASCADE 153
#define TK_CLUSTER 154
#define TK_CONFLICT 155
#define TK_COPY 156
#define TK_DEFERRED 157
#define TK_DELIMITERS 158
#define TK_DETACH 159
#define TK_EACH 160
#define TK_END 161
#define TK_EXPLAIN 162
#define TK_FAIL 163
#define TK_FOR 164
#define TK_IGNORE 165
#define TK_IMMEDIATE 166
#define TK_INITIALLY 167
#define TK_INSTEAD 168
#define TK_MATCH 169
#define TK_KEY 170
#define TK_OF 171
#define TK_RAISE 172
#define TK_REPLACE 173
#define TK_RESTRICT 174
#define TK_ROW 175
#define TK_STATEMENT 176
#define TK_TRIGGER 177
#define TK_VIEW 178
#define TK_SEMI 179
#define TK_NONE 180
#define TK_PREV 181
#define TK_LINEAR 182
#define TK_IMPORT 183
#define TK_TBNAME 184
#define TK_JOIN 185
#define TK_INSERT 186
#define TK_INTO 187
#define TK_VALUES 188
#define TK_SPACE 300
#define TK_COMMENT 301
...
...
src/mnode/src/mnodeDnode.c
浏览文件 @
cc9759f3
...
...
@@ -941,7 +941,7 @@ static int32_t mnodeRetrieveDnodes(SShowObj *pShow, char *data, int32_t rows, vo
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
*
(
int64_t
*
)
pWrite
=
0
;
*
(
int64_t
*
)
pWrite
=
tsArbOnlineTimestamp
;
cols
++
;
pWrite
=
data
+
pShow
->
offset
[
cols
]
*
rows
+
pShow
->
bytes
[
cols
]
*
numOfRows
;
...
...
src/mnode/src/mnodeTable.c
浏览文件 @
cc9759f3
...
...
@@ -3214,7 +3214,15 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
}
else
if
(
pAlter
->
type
==
TSDB_ALTER_TABLE_DROP_COLUMN
)
{
code
=
mnodeDropSuperTableColumn
(
pMsg
,
pAlter
->
schema
[
0
].
name
);
}
else
if
(
pAlter
->
type
==
TSDB_ALTER_TABLE_CHANGE_COLUMN
)
{
code
=
mnodeChangeSuperTableColumn
(
pMsg
,
pAlter
->
schema
[
0
].
name
,
pAlter
->
schema
[
1
].
name
);
//code = mnodeChangeSuperTableColumn(pMsg, pAlter->schema[0].name, pAlter->schema[1].name);
(
void
)
mnodeChangeSuperTableColumn
;
mError
(
"change table[%s] column[%s] length to [%d] is not processed"
,
pAlter
->
tableFname
,
pAlter
->
schema
[
0
].
name
,
pAlter
->
schema
[
0
].
bytes
);
code
=
TSDB_CODE_SUCCESS
;
}
else
if
(
pAlter
->
type
==
TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN
)
{
//code = mnodeChangeSuperTableColumn(pMsg, pAlter->schema[0].name, pAlter->schema[1].name);
(
void
)
mnodeChangeSuperTableColumn
;
mError
(
"change table[%s] tag[%s] length to [%d] is not processed"
,
pAlter
->
tableFname
,
pAlter
->
schema
[
0
].
name
,
pAlter
->
schema
[
0
].
bytes
);
code
=
TSDB_CODE_SUCCESS
;
}
else
{
}
}
else
{
...
...
@@ -3226,7 +3234,10 @@ static int32_t mnodeProcessAlterTableMsg(SMnodeMsg *pMsg) {
}
else
if
(
pAlter
->
type
==
TSDB_ALTER_TABLE_DROP_COLUMN
)
{
code
=
mnodeDropNormalTableColumn
(
pMsg
,
pAlter
->
schema
[
0
].
name
);
}
else
if
(
pAlter
->
type
==
TSDB_ALTER_TABLE_CHANGE_COLUMN
)
{
code
=
mnodeChangeNormalTableColumn
(
pMsg
,
pAlter
->
schema
[
0
].
name
,
pAlter
->
schema
[
1
].
name
);
//code = mnodeChangeNormalTableColumn(pMsg, pAlter->schema[0].name, pAlter->schema[1].name);
(
void
)
mnodeChangeNormalTableColumn
;
mError
(
"change table[%s] column[%s] length to [%d] is not processed"
,
pAlter
->
tableFname
,
pAlter
->
schema
[
0
].
name
,
pAlter
->
schema
[
0
].
bytes
);
code
=
TSDB_CODE_SUCCESS
;
}
else
{
}
}
...
...
@@ -3417,4 +3428,4 @@ int32_t mnodeCompactTables() {
mnodeCompactChildTables
();
return
0
;
}
\ No newline at end of file
}
src/query/inc/sql.y
浏览文件 @
cc9759f3
...
...
@@ -759,6 +759,12 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) DROP COLUMN ids(A). {
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
cmd ::= ALTER TABLE ids(X) cpxName(F) MODIFY COLUMN columnlist(A). {
X.n += F.n;
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_CHANGE_COLUMN, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
//////////////////////////////////ALTER TAGS statement/////////////////////////////////////
cmd ::= ALTER TABLE ids(X) cpxName(Y) ADD TAG columnlist(A). {
X.n += Y.n;
...
...
@@ -799,6 +805,11 @@ cmd ::= ALTER TABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
cmd ::= ALTER TABLE ids(X) cpxName(F) MODIFY TAG columnlist(A). {
X.n += F.n;
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN, -1);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
///////////////////////////////////ALTER STABLE statement//////////////////////////////////
cmd ::= ALTER STABLE ids(X) cpxName(F) ADD COLUMN columnlist(A). {
...
...
@@ -817,6 +828,12 @@ cmd ::= ALTER STABLE ids(X) cpxName(F) DROP COLUMN ids(A). {
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
cmd ::= ALTER STABLE ids(X) cpxName(F) MODIFY COLUMN columnlist(A). {
X.n += F.n;
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_CHANGE_COLUMN, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
//////////////////////////////////ALTER TAGS statement/////////////////////////////////////
cmd ::= ALTER STABLE ids(X) cpxName(Y) ADD TAG columnlist(A). {
X.n += Y.n;
...
...
@@ -846,6 +863,23 @@ cmd ::= ALTER STABLE ids(X) cpxName(F) CHANGE TAG ids(Y) ids(Z). {
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
cmd ::= ALTER STABLE ids(X) cpxName(F) SET TAG ids(Y) EQ tagitem(Z). {
X.n += F.n;
toTSDBType(Y.type);
SArray* A = tVariantListAppendToken(NULL, &Y, -1);
A = tVariantListAppend(A, &Z, -1);
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, NULL, A, TSDB_ALTER_TABLE_UPDATE_TAG_VAL, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
cmd ::= ALTER STABLE ids(X) cpxName(F) MODIFY TAG columnlist(A). {
X.n += F.n;
SAlterTableInfo* pAlterTable = tSetAlterTableInfo(&X, A, NULL, TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN, TSDB_SUPER_TABLE);
setSqlInfo(pInfo, pAlterTable, NULL, TSDB_SQL_ALTER_TABLE);
}
////////////////////////////////////////kill statement///////////////////////////////////////
cmd ::= KILL CONNECTION INTEGER(Y). {setKillSql(pInfo, TSDB_SQL_KILL_CONNECTION, &Y);}
cmd ::= KILL STREAM INTEGER(X) COLON(Z) INTEGER(Y). {X.n += (Z.n + Y.n); setKillSql(pInfo, TSDB_SQL_KILL_STREAM, &X);}
...
...
src/query/src/qSqlParser.c
浏览文件 @
cc9759f3
...
...
@@ -893,7 +893,7 @@ SAlterTableInfo *tSetAlterTableInfo(SStrToken *pTableName, SArray *pCols, SArray
pAlterTable
->
type
=
type
;
pAlterTable
->
tableType
=
tableType
;
if
(
type
==
TSDB_ALTER_TABLE_ADD_COLUMN
||
type
==
TSDB_ALTER_TABLE_ADD_TAG_COLUMN
)
{
if
(
type
==
TSDB_ALTER_TABLE_ADD_COLUMN
||
type
==
TSDB_ALTER_TABLE_ADD_TAG_COLUMN
||
type
==
TSDB_ALTER_TABLE_CHANGE_COLUMN
||
type
==
TSDB_ALTER_TABLE_MODIFY_TAG_COLUMN
)
{
pAlterTable
->
pAddColumns
=
pCols
;
assert
(
pVals
==
NULL
);
}
else
{
...
...
src/query/src/sql.c
浏览文件 @
cc9759f3
此差异已折叠。
点击以展开。
src/sync/src/syncMain.c
浏览文件 @
cc9759f3
...
...
@@ -1150,7 +1150,12 @@ static void syncSetupPeerConnection(SSyncPeer *pPeer) {
pPeer
->
peerFd
=
connFd
;
pPeer
->
role
=
TAOS_SYNC_ROLE_UNSYNCED
;
pPeer
->
pConn
=
syncAllocateTcpConn
(
tsTcpPool
,
pPeer
->
rid
,
connFd
);
if
(
pPeer
->
isArb
)
tsArbOnline
=
1
;
if
(
pPeer
->
isArb
)
{
tsArbOnline
=
1
;
if
(
tsArbOnlineTimestamp
==
TSDB_ARB_DUMMY_TIME
)
{
tsArbOnlineTimestamp
=
taosGetTimestampMs
();
}
}
}
else
{
sDebug
(
"%s, failed to setup peer connection to server since %s, try later"
,
pPeer
->
id
,
strerror
(
errno
));
taosCloseSocket
(
connFd
);
...
...
src/tsdb/CMakeLists.txt
浏览文件 @
cc9759f3
...
...
@@ -6,6 +6,10 @@ AUX_SOURCE_DIRECTORY(src SRC)
ADD_LIBRARY
(
tsdb
${
SRC
}
)
TARGET_LINK_LIBRARIES
(
tsdb tfs common tutil
)
IF
(
TD_TSDB_PLUGINS
)
TARGET_LINK_LIBRARIES
(
tsdb tsdbPlugins
)
ENDIF
()
IF
(
TD_LINUX
)
# Someone has no gtest directory, so comment it
# ADD_SUBDIRECTORY(tests)
...
...
src/tsdb/inc/tsdbCommit.h
浏览文件 @
cc9759f3
...
...
@@ -29,10 +29,17 @@ typedef struct {
int64_t
size
;
}
SKVRecord
;
#define TSDB_DEFAULT_BLOCK_ROWS(maxRows) ((maxRows)*4 / 5)
void
tsdbGetRtnSnap
(
STsdbRepo
*
pRepo
,
SRtn
*
pRtn
);
int
tsdbEncodeKVRecord
(
void
**
buf
,
SKVRecord
*
pRecord
);
void
*
tsdbDecodeKVRecord
(
void
*
buf
,
SKVRecord
*
pRecord
);
void
*
tsdbCommitData
(
STsdbRepo
*
pRepo
);
int
tsdbApplyRtnOnFSet
(
STsdbRepo
*
pRepo
,
SDFileSet
*
pSet
,
SRtn
*
pRtn
);
int
tsdbWriteBlockInfoImpl
(
SDFile
*
pHeadf
,
STable
*
pTable
,
SArray
*
pSupA
,
SArray
*
pSubA
,
void
**
ppBuf
,
SBlockIdx
*
pIdx
);
int
tsdbWriteBlockIdx
(
SDFile
*
pHeadf
,
SArray
*
pIdxA
,
void
**
ppBuf
);
int
tsdbWriteBlockImpl
(
STsdbRepo
*
pRepo
,
STable
*
pTable
,
SDFile
*
pDFile
,
SDataCols
*
pDataCols
,
SBlock
*
pBlock
,
bool
isLast
,
bool
isSuper
,
void
**
ppBuf
,
void
**
ppCBuf
);
int
tsdbApplyRtn
(
STsdbRepo
*
pRepo
);
static
FORCE_INLINE
int
tsdbGetFidLevel
(
int
fid
,
SRtn
*
pRtn
)
{
...
...
src/tsdb/inc/tsdbCommitQueue.h
浏览文件 @
cc9759f3
...
...
@@ -16,6 +16,8 @@
#ifndef _TD_TSDB_COMMIT_QUEUE_H_
#define _TD_TSDB_COMMIT_QUEUE_H_
int
tsdbScheduleCommit
(
STsdbRepo
*
pRepo
);
typedef
enum
{
COMMIT_REQ
,
COMPACT_REQ
}
TSDB_REQ_T
;
int
tsdbScheduleCommit
(
STsdbRepo
*
pRepo
,
TSDB_REQ_T
req
);
#endif
/* _TD_TSDB_COMMIT_QUEUE_H_ */
\ No newline at end of file
src/tsdb/inc/tsdbCompact.h
0 → 100644
浏览文件 @
cc9759f3
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_TSDB_COMPACT_H_
#define _TD_TSDB_COMPACT_H_
#ifdef __cplusplus
extern
"C"
{
#endif
void
*
tsdbCompactImpl
(
STsdbRepo
*
pRepo
);
#ifdef __cplusplus
}
#endif
#endif
/* _TD_TSDB_COMPACT_H_ */
\ No newline at end of file
src/tsdb/inc/tsdbint.h
浏览文件 @
cc9759f3
...
...
@@ -64,6 +64,8 @@ extern "C" {
#include "tsdbReadImpl.h"
// Commit
#include "tsdbCommit.h"
// Compact
#include "tsdbCompact.h"
// Commit Queue
#include "tsdbCommitQueue.h"
// Main definitions
...
...
src/tsdb/src/tsdbCommit.c
浏览文件 @
cc9759f3
...
...
@@ -51,7 +51,7 @@ typedef struct {
#define TSDB_COMMIT_LAST_FILE(ch) TSDB_DFILE_IN_SET(TSDB_COMMIT_WRITE_FSET(ch), TSDB_FILE_LAST)
#define TSDB_COMMIT_BUF(ch) TSDB_READ_BUF(&((ch)->readh))
#define TSDB_COMMIT_COMP_BUF(ch) TSDB_READ_COMP_BUF(&((ch)->readh))
#define TSDB_COMMIT_DEFAULT_ROWS(ch)
(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock * 4 / 5
)
#define TSDB_COMMIT_DEFAULT_ROWS(ch)
TSDB_DEFAULT_BLOCK_ROWS(TSDB_COMMIT_REPO(ch)->config.maxRowsPerFileBlock
)
#define TSDB_COMMIT_TXN_VERSION(ch) FS_TXN_VERSION(REPO_FS(TSDB_COMMIT_REPO(ch)))
static
int
tsdbCommitMeta
(
STsdbRepo
*
pRepo
);
...
...
@@ -72,7 +72,6 @@ static int tsdbCommitToTable(SCommitH *pCommith, int tid);
static
int
tsdbSetCommitTable
(
SCommitH
*
pCommith
,
STable
*
pTable
);
static
int
tsdbComparKeyBlock
(
const
void
*
arg1
,
const
void
*
arg2
);
static
int
tsdbWriteBlockInfo
(
SCommitH
*
pCommih
);
static
int
tsdbWriteBlockIdx
(
SCommitH
*
pCommih
);
static
int
tsdbCommitMemData
(
SCommitH
*
pCommith
,
SCommitIter
*
pIter
,
TSKEY
keyLimit
,
bool
toData
);
static
int
tsdbMergeMemData
(
SCommitH
*
pCommith
,
SCommitIter
*
pIter
,
int
bidx
);
static
int
tsdbMoveBlock
(
SCommitH
*
pCommith
,
int
bidx
);
...
...
@@ -86,7 +85,6 @@ static void tsdbCloseCommitFile(SCommitH *pCommith, bool hasError);
static
bool
tsdbCanAddSubBlock
(
SCommitH
*
pCommith
,
SBlock
*
pBlock
,
SMergeInfo
*
pInfo
);
static
void
tsdbLoadAndMergeFromCache
(
SDataCols
*
pDataCols
,
int
*
iter
,
SCommitIter
*
pCommitIter
,
SDataCols
*
pTarget
,
TSKEY
maxKey
,
int
maxRows
,
int8_t
update
);
static
int
tsdbApplyRtnOnFSet
(
STsdbRepo
*
pRepo
,
SDFileSet
*
pSet
,
SRtn
*
pRtn
);
void
*
tsdbCommitData
(
STsdbRepo
*
pRepo
)
{
if
(
pRepo
->
imem
==
NULL
)
{
...
...
@@ -117,6 +115,151 @@ _err:
return
NULL
;
}
int
tsdbApplyRtnOnFSet
(
STsdbRepo
*
pRepo
,
SDFileSet
*
pSet
,
SRtn
*
pRtn
)
{
SDiskID
did
;
SDFileSet
nSet
;
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
int
level
;
ASSERT
(
pSet
->
fid
>=
pRtn
->
minFid
);
level
=
tsdbGetFidLevel
(
pSet
->
fid
,
pRtn
);
tfsAllocDisk
(
level
,
&
(
did
.
level
),
&
(
did
.
id
));
if
(
did
.
level
==
TFS_UNDECIDED_LEVEL
)
{
terrno
=
TSDB_CODE_TDB_NO_AVAIL_DISK
;
return
-
1
;
}
if
(
did
.
level
>
TSDB_FSET_LEVEL
(
pSet
))
{
// Need to move the FSET to higher level
tsdbInitDFileSet
(
&
nSet
,
did
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
FS_TXN_VERSION
(
pfs
));
if
(
tsdbCopyDFileSet
(
pSet
,
&
nSet
)
<
0
)
{
tsdbError
(
"vgId:%d failed to copy FSET %d from level %d to level %d since %s"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
TSDB_FSET_LEVEL
(
pSet
),
did
.
level
,
tstrerror
(
terrno
));
return
-
1
;
}
if
(
tsdbUpdateDFileSet
(
pfs
,
&
nSet
)
<
0
)
{
return
-
1
;
}
tsdbInfo
(
"vgId:%d FSET %d is copied from level %d disk id %d to level %d disk id %d"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
TSDB_FSET_LEVEL
(
pSet
),
TSDB_FSET_ID
(
pSet
),
did
.
level
,
did
.
id
);
}
else
{
// On a correct level
if
(
tsdbUpdateDFileSet
(
pfs
,
pSet
)
<
0
)
{
return
-
1
;
}
}
return
0
;
}
int
tsdbWriteBlockInfoImpl
(
SDFile
*
pHeadf
,
STable
*
pTable
,
SArray
*
pSupA
,
SArray
*
pSubA
,
void
**
ppBuf
,
SBlockIdx
*
pIdx
)
{
size_t
nSupBlocks
;
size_t
nSubBlocks
;
uint32_t
tlen
;
SBlockInfo
*
pBlkInfo
;
int64_t
offset
;
SBlock
*
pBlock
;
memset
(
pIdx
,
0
,
sizeof
(
*
pIdx
));
nSupBlocks
=
taosArrayGetSize
(
pSupA
);
nSubBlocks
=
(
pSubA
==
NULL
)
?
0
:
taosArrayGetSize
(
pSubA
);
if
(
nSupBlocks
<=
0
)
{
// No data (data all deleted)
return
0
;
}
tlen
=
(
uint32_t
)(
sizeof
(
SBlockInfo
)
+
sizeof
(
SBlock
)
*
(
nSupBlocks
+
nSubBlocks
)
+
sizeof
(
TSCKSUM
));
if
(
tsdbMakeRoom
(
ppBuf
,
tlen
)
<
0
)
return
-
1
;
pBlkInfo
=
*
ppBuf
;
pBlkInfo
->
delimiter
=
TSDB_FILE_DELIMITER
;
pBlkInfo
->
tid
=
TABLE_TID
(
pTable
);
pBlkInfo
->
uid
=
TABLE_UID
(
pTable
);
memcpy
((
void
*
)(
pBlkInfo
->
blocks
),
taosArrayGet
(
pSupA
,
0
),
nSupBlocks
*
sizeof
(
SBlock
));
if
(
nSubBlocks
>
0
)
{
memcpy
((
void
*
)(
pBlkInfo
->
blocks
+
nSupBlocks
),
taosArrayGet
(
pSubA
,
0
),
nSubBlocks
*
sizeof
(
SBlock
));
for
(
int
i
=
0
;
i
<
nSupBlocks
;
i
++
)
{
pBlock
=
pBlkInfo
->
blocks
+
i
;
if
(
pBlock
->
numOfSubBlocks
>
1
)
{
pBlock
->
offset
+=
(
sizeof
(
SBlockInfo
)
+
sizeof
(
SBlock
)
*
nSupBlocks
);
}
}
}
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pBlkInfo
,
tlen
);
if
(
tsdbAppendDFile
(
pHeadf
,
(
void
*
)
pBlkInfo
,
tlen
,
&
offset
)
<
0
)
{
return
-
1
;
}
tsdbUpdateDFileMagic
(
pHeadf
,
POINTER_SHIFT
(
pBlkInfo
,
tlen
-
sizeof
(
TSCKSUM
)));
// Set pIdx
pBlock
=
taosArrayGetLast
(
pSupA
);
pIdx
->
tid
=
TABLE_TID
(
pTable
);
pIdx
->
uid
=
TABLE_UID
(
pTable
);
pIdx
->
hasLast
=
pBlock
->
last
?
1
:
0
;
pIdx
->
maxKey
=
pBlock
->
keyLast
;
pIdx
->
numOfBlocks
=
(
uint32_t
)
nSupBlocks
;
pIdx
->
len
=
tlen
;
pIdx
->
offset
=
(
uint32_t
)
offset
;
return
0
;
}
int
tsdbWriteBlockIdx
(
SDFile
*
pHeadf
,
SArray
*
pIdxA
,
void
**
ppBuf
)
{
SBlockIdx
*
pBlkIdx
;
size_t
nidx
=
taosArrayGetSize
(
pIdxA
);
int
tlen
=
0
,
size
;
int64_t
offset
;
if
(
nidx
<=
0
)
{
// All data are deleted
pHeadf
->
info
.
offset
=
0
;
pHeadf
->
info
.
len
=
0
;
return
0
;
}
for
(
size_t
i
=
0
;
i
<
nidx
;
i
++
)
{
pBlkIdx
=
(
SBlockIdx
*
)
taosArrayGet
(
pIdxA
,
i
);
size
=
tsdbEncodeSBlockIdx
(
NULL
,
pBlkIdx
);
if
(
tsdbMakeRoom
(
ppBuf
,
tlen
+
size
)
<
0
)
return
-
1
;
void
*
ptr
=
POINTER_SHIFT
(
*
ppBuf
,
tlen
);
tsdbEncodeSBlockIdx
(
&
ptr
,
pBlkIdx
);
tlen
+=
size
;
}
tlen
+=
sizeof
(
TSCKSUM
);
if
(
tsdbMakeRoom
(
ppBuf
,
tlen
)
<
0
)
return
-
1
;
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)(
*
ppBuf
),
tlen
);
if
(
tsdbAppendDFile
(
pHeadf
,
*
ppBuf
,
tlen
,
&
offset
)
<
tlen
)
{
return
-
1
;
}
tsdbUpdateDFileMagic
(
pHeadf
,
POINTER_SHIFT
(
*
ppBuf
,
tlen
-
sizeof
(
TSCKSUM
)));
pHeadf
->
info
.
offset
=
(
uint32_t
)
offset
;
pHeadf
->
info
.
len
=
tlen
;
return
0
;
}
// =================== Commit Meta Data
static
int
tsdbCommitMeta
(
STsdbRepo
*
pRepo
)
{
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
...
...
@@ -446,7 +589,8 @@ static int tsdbCommitToFile(SCommitH *pCommith, SDFileSet *pSet, int fid) {
}
}
if
(
tsdbWriteBlockIdx
(
pCommith
)
<
0
)
{
if
(
tsdbWriteBlockIdx
(
TSDB_COMMIT_HEAD_FILE
(
pCommith
),
pCommith
->
aBlkIdx
,
(
void
**
)(
&
(
TSDB_COMMIT_BUF
(
pCommith
))))
<
0
)
{
tsdbError
(
"vgId:%d failed to write SBlockIdx part to FSET %d since %s"
,
REPO_ID
(
pRepo
),
fid
,
tstrerror
(
terrno
));
tsdbCloseCommitFile
(
pCommith
,
true
);
// revert the file change
...
...
@@ -754,23 +898,21 @@ static int tsdbComparKeyBlock(const void *arg1, const void *arg2) {
}
}
static
int
tsdbWriteBlock
(
SCommitH
*
pCommith
,
SDFile
*
pDFile
,
SDataCols
*
pDataCols
,
SBlock
*
pBlock
,
bool
isLast
,
bool
isSuper
)
{
STsdbRepo
*
pRepo
=
TSDB_COMMIT_REPO
(
pCommith
);
int
tsdbWriteBlockImpl
(
STsdbRepo
*
pRepo
,
STable
*
pTable
,
SDFile
*
pDFile
,
SDataCols
*
pDataCols
,
SBlock
*
pBlock
,
bool
isLast
,
bool
isSuper
,
void
**
ppBuf
,
void
**
ppCBuf
)
{
STsdbCfg
*
pCfg
=
REPO_CFG
(
pRepo
);
SBlockData
*
pBlockData
;
int64_t
offset
=
0
;
STable
*
pTable
=
TSDB_COMMIT_TABLE
(
pCommith
);
int
rowsToWrite
=
pDataCols
->
numOfRows
;
ASSERT
(
rowsToWrite
>
0
&&
rowsToWrite
<=
pCfg
->
maxRowsPerFileBlock
);
ASSERT
((
!
isLast
)
||
rowsToWrite
<
pCfg
->
minRowsPerFileBlock
);
// Make buffer space
if
(
tsdbMakeRoom
(
(
void
**
)(
&
TSDB_COMMIT_BUF
(
pCommith
))
,
TSDB_BLOCK_STATIS_SIZE
(
pDataCols
->
numOfCols
))
<
0
)
{
if
(
tsdbMakeRoom
(
ppBuf
,
TSDB_BLOCK_STATIS_SIZE
(
pDataCols
->
numOfCols
))
<
0
)
{
return
-
1
;
}
pBlockData
=
(
SBlockData
*
)
TSDB_COMMIT_BUF
(
pCommith
);
pBlockData
=
(
SBlockData
*
)
(
*
ppBuf
);
// Get # of cols not all NULL(not including key column)
int
nColsNotAllNull
=
0
;
...
...
@@ -816,23 +958,23 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
void
*
tptr
;
// Make room
if
(
tsdbMakeRoom
(
(
void
**
)(
&
TSDB_COMMIT_BUF
(
pCommith
))
,
lsize
+
tlen
+
COMP_OVERFLOW_BYTES
+
sizeof
(
TSCKSUM
))
<
0
)
{
if
(
tsdbMakeRoom
(
ppBuf
,
lsize
+
tlen
+
COMP_OVERFLOW_BYTES
+
sizeof
(
TSCKSUM
))
<
0
)
{
return
-
1
;
}
pBlockData
=
(
SBlockData
*
)
TSDB_COMMIT_BUF
(
pCommith
);
pBlockData
=
(
SBlockData
*
)
(
*
ppBuf
);
pBlockCol
=
pBlockData
->
cols
+
tcol
;
tptr
=
POINTER_SHIFT
(
pBlockData
,
lsize
);
if
(
pCfg
->
compression
==
TWO_STAGE_COMP
&&
tsdbMakeRoom
(
(
void
**
)(
&
TSDB_COMMIT_COMP_BUF
(
pCommith
))
,
tlen
+
COMP_OVERFLOW_BYTES
)
<
0
)
{
tsdbMakeRoom
(
ppCBuf
,
tlen
+
COMP_OVERFLOW_BYTES
)
<
0
)
{
return
-
1
;
}
// Compress or just copy
if
(
pCfg
->
compression
)
{
flen
=
(
*
(
tDataTypes
[
pDataCol
->
type
].
compFunc
))((
char
*
)
pDataCol
->
pData
,
tlen
,
rowsToWrite
,
tptr
,
tlen
+
COMP_OVERFLOW_BYTES
,
pCfg
->
compression
,
TSDB_COMMIT_COMP_BUF
(
pCommith
),
tlen
+
COMP_OVERFLOW_BYTES
);
tlen
+
COMP_OVERFLOW_BYTES
,
pCfg
->
compression
,
*
ppCBuf
,
tlen
+
COMP_OVERFLOW_BYTES
);
}
else
{
flen
=
tlen
;
memcpy
(
tptr
,
pDataCol
->
pData
,
flen
);
...
...
@@ -888,117 +1030,33 @@ static int tsdbWriteBlock(SCommitH *pCommith, SDFile *pDFile, SDataCols *pDataCo
return
0
;
}
static
int
tsdbWriteBlockInfo
(
SCommitH
*
pCommih
)
{
SDFile
*
pHeadf
=
TSDB_COMMIT_HEAD_FILE
(
pCommih
);
SBlockIdx
blkIdx
;
STable
*
pTable
=
TSDB_COMMIT_TABLE
(
pCommih
);
SBlock
*
pBlock
;
size_t
nSupBlocks
;
size_t
nSubBlocks
;
uint32_t
tlen
;
SBlockInfo
*
pBlkInfo
;
int64_t
offset
;
nSupBlocks
=
taosArrayGetSize
(
pCommih
->
aSupBlk
);
nSubBlocks
=
taosArrayGetSize
(
pCommih
->
aSubBlk
);
if
(
nSupBlocks
<=
0
)
{
// No data (data all deleted)
return
0
;
}
tlen
=
(
uint32_t
)(
sizeof
(
SBlockInfo
)
+
sizeof
(
SBlock
)
*
(
nSupBlocks
+
nSubBlocks
)
+
sizeof
(
TSCKSUM
));
// Write SBlockInfo part
if
(
tsdbMakeRoom
((
void
**
)(
&
(
TSDB_COMMIT_BUF
(
pCommih
))),
tlen
)
<
0
)
return
-
1
;
pBlkInfo
=
TSDB_COMMIT_BUF
(
pCommih
);
pBlkInfo
->
delimiter
=
TSDB_FILE_DELIMITER
;
pBlkInfo
->
tid
=
TABLE_TID
(
pTable
);
pBlkInfo
->
uid
=
TABLE_UID
(
pTable
);
memcpy
((
void
*
)(
pBlkInfo
->
blocks
),
taosArrayGet
(
pCommih
->
aSupBlk
,
0
),
nSupBlocks
*
sizeof
(
SBlock
));
if
(
nSubBlocks
>
0
)
{
memcpy
((
void
*
)(
pBlkInfo
->
blocks
+
nSupBlocks
),
taosArrayGet
(
pCommih
->
aSubBlk
,
0
),
nSubBlocks
*
sizeof
(
SBlock
));
for
(
int
i
=
0
;
i
<
nSupBlocks
;
i
++
)
{
pBlock
=
pBlkInfo
->
blocks
+
i
;
if
(
pBlock
->
numOfSubBlocks
>
1
)
{
pBlock
->
offset
+=
(
sizeof
(
SBlockInfo
)
+
sizeof
(
SBlock
)
*
nSupBlocks
);
}
}
}
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
pBlkInfo
,
tlen
);
if
(
tsdbAppendDFile
(
pHeadf
,
TSDB_COMMIT_BUF
(
pCommih
),
tlen
,
&
offset
)
<
0
)
{
return
-
1
;
}
tsdbUpdateDFileMagic
(
pHeadf
,
POINTER_SHIFT
(
pBlkInfo
,
tlen
-
sizeof
(
TSCKSUM
)));
// Set blkIdx
pBlock
=
taosArrayGet
(
pCommih
->
aSupBlk
,
nSupBlocks
-
1
);
static
int
tsdbWriteBlock
(
SCommitH
*
pCommith
,
SDFile
*
pDFile
,
SDataCols
*
pDataCols
,
SBlock
*
pBlock
,
bool
isLast
,
bool
isSuper
)
{
return
tsdbWriteBlockImpl
(
TSDB_COMMIT_REPO
(
pCommith
),
TSDB_COMMIT_TABLE
(
pCommith
),
pDFile
,
pDataCols
,
pBlock
,
isLast
,
isSuper
,
(
void
**
)(
&
(
TSDB_COMMIT_BUF
(
pCommith
))),
(
void
**
)(
&
(
TSDB_COMMIT_COMP_BUF
(
pCommith
))));
}
blkIdx
.
tid
=
TABLE_TID
(
pTable
);
blkIdx
.
uid
=
TABLE_UID
(
pTable
);
blkIdx
.
hasLast
=
pBlock
->
last
?
1
:
0
;
blkIdx
.
maxKey
=
pBlock
->
keyLast
;
blkIdx
.
numOfBlocks
=
(
uint32_t
)
nSupBlocks
;
blkIdx
.
len
=
tlen
;
blkIdx
.
offset
=
(
uint32_t
)
offset
;
ASSERT
(
blkIdx
.
numOfBlocks
>
0
);
static
int
tsdbWriteBlockInfo
(
SCommitH
*
pCommih
)
{
SDFile
*
pHeadf
=
TSDB_COMMIT_HEAD_FILE
(
pCommih
);
SBlockIdx
blkIdx
;
STable
*
pTable
=
TSDB_COMMIT_TABLE
(
pCommih
);
if
(
t
aosArrayPush
(
pCommih
->
aBlkIdx
,
(
void
*
)(
&
blkIdx
))
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
if
(
t
sdbWriteBlockInfoImpl
(
pHeadf
,
pTable
,
pCommih
->
aSupBlk
,
pCommih
->
aSubBlk
,
(
void
**
)(
&
(
TSDB_COMMIT_BUF
(
pCommih
))),
&
blkIdx
)
<
0
)
{
return
-
1
;
}
return
0
;
}
static
int
tsdbWriteBlockIdx
(
SCommitH
*
pCommih
)
{
SBlockIdx
*
pBlkIdx
=
NULL
;
SDFile
*
pHeadf
=
TSDB_COMMIT_HEAD_FILE
(
pCommih
);
size_t
nidx
=
taosArrayGetSize
(
pCommih
->
aBlkIdx
);
int
tlen
=
0
,
size
=
0
;
int64_t
offset
=
0
;
if
(
nidx
<=
0
)
{
// All data are deleted
pHeadf
->
info
.
offset
=
0
;
pHeadf
->
info
.
len
=
0
;
if
(
blkIdx
.
numOfBlocks
==
0
)
{
return
0
;
}
for
(
size_t
i
=
0
;
i
<
nidx
;
i
++
)
{
pBlkIdx
=
(
SBlockIdx
*
)
taosArrayGet
(
pCommih
->
aBlkIdx
,
i
);
size
=
tsdbEncodeSBlockIdx
(
NULL
,
pBlkIdx
);
if
(
tsdbMakeRoom
((
void
**
)(
&
TSDB_COMMIT_BUF
(
pCommih
)),
tlen
+
size
)
<
0
)
return
-
1
;
void
*
ptr
=
POINTER_SHIFT
(
TSDB_COMMIT_BUF
(
pCommih
),
tlen
);
tsdbEncodeSBlockIdx
(
&
ptr
,
pBlkIdx
);
tlen
+=
size
;
}
tlen
+=
sizeof
(
TSCKSUM
);
if
(
tsdbMakeRoom
((
void
**
)(
&
TSDB_COMMIT_BUF
(
pCommih
)),
tlen
)
<
0
)
return
-
1
;
taosCalcChecksumAppend
(
0
,
(
uint8_t
*
)
TSDB_COMMIT_BUF
(
pCommih
),
tlen
);
if
(
tsdbAppendDFile
(
pHeadf
,
TSDB_COMMIT_BUF
(
pCommih
),
tlen
,
&
offset
)
<
tlen
)
{
tsdbError
(
"vgId:%d failed to write block index part to file %s since %s"
,
TSDB_COMMIT_REPO_ID
(
pCommih
),
TSDB_FILE_FULL_NAME
(
pHeadf
),
tstrerror
(
terrno
));
if
(
taosArrayPush
(
pCommih
->
aBlkIdx
,
(
void
*
)(
&
blkIdx
))
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
tsdbUpdateDFileMagic
(
pHeadf
,
POINTER_SHIFT
(
TSDB_COMMIT_BUF
(
pCommih
),
tlen
-
sizeof
(
TSCKSUM
)));
pHeadf
->
info
.
offset
=
(
uint32_t
)
offset
;
pHeadf
->
info
.
len
=
tlen
;
return
0
;
}
...
...
@@ -1454,45 +1512,3 @@ int tsdbApplyRtn(STsdbRepo *pRepo) {
return
0
;
}
static
int
tsdbApplyRtnOnFSet
(
STsdbRepo
*
pRepo
,
SDFileSet
*
pSet
,
SRtn
*
pRtn
)
{
SDiskID
did
;
SDFileSet
nSet
;
STsdbFS
*
pfs
=
REPO_FS
(
pRepo
);
int
level
;
ASSERT
(
pSet
->
fid
>=
pRtn
->
minFid
);
level
=
tsdbGetFidLevel
(
pSet
->
fid
,
pRtn
);
tfsAllocDisk
(
level
,
&
(
did
.
level
),
&
(
did
.
id
));
if
(
did
.
level
==
TFS_UNDECIDED_LEVEL
)
{
terrno
=
TSDB_CODE_TDB_NO_AVAIL_DISK
;
return
-
1
;
}
if
(
did
.
level
>
TSDB_FSET_LEVEL
(
pSet
))
{
// Need to move the FSET to higher level
tsdbInitDFileSet
(
&
nSet
,
did
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
FS_TXN_VERSION
(
pfs
));
if
(
tsdbCopyDFileSet
(
pSet
,
&
nSet
)
<
0
)
{
tsdbError
(
"vgId:%d failed to copy FSET %d from level %d to level %d since %s"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
TSDB_FSET_LEVEL
(
pSet
),
did
.
level
,
tstrerror
(
terrno
));
return
-
1
;
}
if
(
tsdbUpdateDFileSet
(
pfs
,
&
nSet
)
<
0
)
{
return
-
1
;
}
tsdbInfo
(
"vgId:%d FSET %d is copied from level %d disk id %d to level %d disk id %d"
,
REPO_ID
(
pRepo
),
pSet
->
fid
,
TSDB_FSET_LEVEL
(
pSet
),
TSDB_FSET_ID
(
pSet
),
did
.
level
,
did
.
id
);
}
else
{
// On a correct level
if
(
tsdbUpdateDFileSet
(
pfs
,
pSet
)
<
0
)
{
return
-
1
;
}
}
return
0
;
}
\ No newline at end of file
src/tsdb/src/tsdbCommitQueue.c
浏览文件 @
cc9759f3
...
...
@@ -26,8 +26,9 @@ typedef struct {
}
SCommitQueue
;
typedef
struct
{
TSDB_REQ_T
req
;
STsdbRepo
*
pRepo
;
}
S
Commit
Req
;
}
SReq
;
static
void
*
tsdbLoopCommit
(
void
*
arg
);
...
...
@@ -90,16 +91,17 @@ void tsdbDestroyCommitQueue() {
pthread_mutex_destroy
(
&
(
pQueue
->
lock
));
}
int
tsdbScheduleCommit
(
STsdbRepo
*
pRepo
)
{
int
tsdbScheduleCommit
(
STsdbRepo
*
pRepo
,
TSDB_REQ_T
req
)
{
SCommitQueue
*
pQueue
=
&
tsCommitQueue
;
SListNode
*
pNode
=
(
SListNode
*
)
calloc
(
1
,
sizeof
(
SListNode
)
+
sizeof
(
S
Commit
Req
));
SListNode
*
pNode
=
(
SListNode
*
)
calloc
(
1
,
sizeof
(
SListNode
)
+
sizeof
(
SReq
));
if
(
pNode
==
NULL
)
{
terrno
=
TSDB_CODE_TDB_OUT_OF_MEMORY
;
return
-
1
;
}
((
SCommitReq
*
)
pNode
->
data
)
->
pRepo
=
pRepo
;
((
SReq
*
)
pNode
->
data
)
->
req
=
req
;
((
SReq
*
)
pNode
->
data
)
->
pRepo
=
pRepo
;
pthread_mutex_lock
(
&
(
pQueue
->
lock
));
...
...
@@ -154,6 +156,7 @@ static void *tsdbLoopCommit(void *arg) {
SCommitQueue
*
pQueue
=
&
tsCommitQueue
;
SListNode
*
pNode
=
NULL
;
STsdbRepo
*
pRepo
=
NULL
;
TSDB_REQ_T
req
;
while
(
true
)
{
pthread_mutex_lock
(
&
(
pQueue
->
lock
));
...
...
@@ -174,14 +177,22 @@ static void *tsdbLoopCommit(void *arg) {
pthread_mutex_unlock
(
&
(
pQueue
->
lock
));
pRepo
=
((
SCommitReq
*
)
pNode
->
data
)
->
pRepo
;
req
=
((
SReq
*
)
pNode
->
data
)
->
req
;
pRepo
=
((
SReq
*
)
pNode
->
data
)
->
pRepo
;
// check if need to apply new config
if
(
pRepo
->
config_changed
)
{
if
(
pRepo
->
config_changed
)
{
tsdbApplyRepoConfig
(
pRepo
);
}
tsdbCommitData
(
pRepo
);
if
(
req
==
COMMIT_REQ
)
{
tsdbCommitData
(
pRepo
);
}
else
if
(
req
==
COMPACT_REQ
)
{
tsdbCompactImpl
(
pRepo
);
}
else
{
ASSERT
(
0
);
}
listNodeFree
(
pNode
);
}
...
...
src/tsdb/src/tsdbCompact.c
浏览文件 @
cc9759f3
...
...
@@ -11,4 +11,12 @@
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
*/
#include "tsdb.h"
#ifndef _TSDB_PLUGINS
int
tsdbCompact
(
STsdbRepo
*
pRepo
)
{
return
0
;
}
void
*
tsdbCompactImpl
(
STsdbRepo
*
pRepo
)
{
return
NULL
;
}
#endif
\ No newline at end of file
src/tsdb/src/tsdbMemTable.c
浏览文件 @
cc9759f3
...
...
@@ -288,7 +288,7 @@ int tsdbAsyncCommit(STsdbRepo *pRepo) {
if
(
tsdbLockRepo
(
pRepo
)
<
0
)
return
-
1
;
pRepo
->
imem
=
pRepo
->
mem
;
pRepo
->
mem
=
NULL
;
tsdbScheduleCommit
(
pRepo
);
tsdbScheduleCommit
(
pRepo
,
COMMIT_REQ
);
if
(
tsdbUnlockRepo
(
pRepo
)
<
0
)
return
-
1
;
return
0
;
...
...
src/tsdb/src/tsdbReadImpl.c
浏览文件 @
cc9759f3
...
...
@@ -258,7 +258,7 @@ int tsdbLoadBlockData(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo) {
for
(
int
i
=
1
;
i
<
pBlock
->
numOfSubBlocks
;
i
++
)
{
iBlock
++
;
if
(
tsdbLoadBlockDataImpl
(
pReadh
,
iBlock
,
pReadh
->
pDCols
[
1
])
<
0
)
return
-
1
;
if
(
tdMergeDataCols
(
pReadh
->
pDCols
[
0
],
pReadh
->
pDCols
[
1
],
pReadh
->
pDCols
[
1
]
->
numOfRows
)
<
0
)
return
-
1
;
if
(
tdMergeDataCols
(
pReadh
->
pDCols
[
0
],
pReadh
->
pDCols
[
1
],
pReadh
->
pDCols
[
1
]
->
numOfRows
,
NULL
)
<
0
)
return
-
1
;
}
ASSERT
(
pReadh
->
pDCols
[
0
]
->
numOfRows
==
pBlock
->
numOfRows
);
...
...
@@ -284,7 +284,7 @@ int tsdbLoadBlockDataCols(SReadH *pReadh, SBlock *pBlock, SBlockInfo *pBlkInfo,
for
(
int
i
=
1
;
i
<
pBlock
->
numOfSubBlocks
;
i
++
)
{
iBlock
++
;
if
(
tsdbLoadBlockDataColsImpl
(
pReadh
,
iBlock
,
pReadh
->
pDCols
[
1
],
colIds
,
numOfColsIds
)
<
0
)
return
-
1
;
if
(
tdMergeDataCols
(
pReadh
->
pDCols
[
0
],
pReadh
->
pDCols
[
1
],
pReadh
->
pDCols
[
1
]
->
numOfRows
)
<
0
)
return
-
1
;
if
(
tdMergeDataCols
(
pReadh
->
pDCols
[
0
],
pReadh
->
pDCols
[
1
],
pReadh
->
pDCols
[
1
]
->
numOfRows
,
NULL
)
<
0
)
return
-
1
;
}
ASSERT
(
pReadh
->
pDCols
[
0
]
->
numOfRows
==
pBlock
->
numOfRows
);
...
...
src/util/src/ttokenizer.c
浏览文件 @
cc9759f3
...
...
@@ -218,7 +218,8 @@ static SKeyword keywordTable[] = {
{
"DISTINCT"
,
TK_DISTINCT
},
{
"PARTITIONS"
,
TK_PARTITIONS
},
{
"TOPIC"
,
TK_TOPIC
},
{
"TOPICS"
,
TK_TOPICS
}
{
"TOPICS"
,
TK_TOPICS
},
{
"MODIFY"
,
TK_MODIFY
}
};
static
const
char
isIdChar
[]
=
{
...
...
tests/pytest/fulltest.sh
浏览文件 @
cc9759f3
...
...
@@ -314,6 +314,8 @@ python3 ./test.py -f query/last_row_cache.py
python3 ./test.py
-f
account/account_create.py
python3 ./test.py
-f
alter/alter_table.py
python3 ./test.py
-f
query/queryGroupbySort.py
python3 ./test.py
-f
functions/function_session.py
python3 ./test.py
-f
functions/function_stateWindow.py
python3 ./test.py
-f
insert/unsignedInt.py
python3 ./test.py
-f
insert/unsignedBigint.py
...
...
tests/pytest/functions/function_session.py
0 → 100644
浏览文件 @
cc9759f3
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import
sys
import
taos
from
util.log
import
*
from
util.cases
import
*
from
util.sql
import
*
#import numpy as np
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
())
self
.
rowNum
=
10
self
.
ts
=
1537146000000
def
run
(
self
):
tdSql
.
prepare
()
tdSql
.
execute
(
'''create table test(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20))'''
)
tdSql
.
execute
(
"create table test1 using test tags('beijing')"
)
for
i
in
range
(
self
.
rowNum
):
tdSql
.
execute
(
"insert into test1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
%
(
self
.
ts
+
i
,
i
+
1
,
i
+
1
,
i
+
1
,
i
+
1
,
i
+
0.1
,
i
+
0.1
,
i
%
2
,
i
+
1
,
i
+
1
,
i
+
1
,
i
+
1
,
i
+
1
,
i
+
1
))
# operation not allowed on super table
tdSql
.
error
(
"select count(*) from test session(ts, 1s)"
)
# operation not allowde on col pro
tdSql
.
error
(
"select * from test1 session(ts, 1s)"
)
# operation not allowed on col except primary ts
tdSql
.
error
(
"select * from test1 session(col1, 1s)"
)
tdSql
.
query
(
"select count(*) from test1 session(ts, 1s)"
)
tdSql
.
checkRows
(
1
)
tdSql
.
checkData
(
0
,
1
,
10
)
# append more data
for
i
in
range
(
self
.
rowNum
):
tdSql
.
execute
(
"insert into test1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
%
(
self
.
ts
+
2000
,
i
+
1
,
i
+
1
,
i
+
1
,
i
+
1
,
i
+
0.1
,
i
+
0.1
,
i
%
2
,
i
+
1
,
i
+
1
,
i
+
1
,
i
+
1
,
i
+
1
,
i
+
1
))
tdSql
.
query
(
"select count(*) from test1 session(ts, 1s)"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
1
,
10
)
tdSql
.
checkData
(
1
,
1
,
1
)
tdSql
.
query
(
"select count(*) from test1 session(ts, 1m)"
)
tdSql
.
checkRows
(
1
)
tdSql
.
checkData
(
0
,
1
,
11
)
tdSql
.
query
(
"select first(col1) from test1 session(ts, 1s)"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
1
,
1
)
tdSql
.
checkData
(
1
,
1
,
1
)
tdSql
.
query
(
"select first(col1), last(col2) from test1 session(ts, 1s)"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
1
,
1
)
tdSql
.
checkData
(
0
,
2
,
10
)
tdSql
.
checkData
(
1
,
1
,
1
)
tdSql
.
checkData
(
1
,
1
,
1
)
# add more function
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tests/pytest/functions/function_stateWindow.py
0 → 100644
浏览文件 @
cc9759f3
###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import
sys
import
taos
from
util.log
import
*
from
util.cases
import
*
from
util.sql
import
*
#import numpy as np
class
TDTestCase
:
def
init
(
self
,
conn
,
logSql
):
tdLog
.
debug
(
"start to execute %s"
%
__file__
)
tdSql
.
init
(
conn
.
cursor
())
self
.
rowNum
=
10
self
.
ts
=
1537146000000
def
run
(
self
):
tdSql
.
prepare
()
tdSql
.
execute
(
'''create table test(ts timestamp, col1 tinyint, col2 smallint, col3 int, col4 bigint, col5 float, col6 double,
col7 bool, col8 binary(20), col9 nchar(20), col11 tinyint unsigned, col12 smallint unsigned, col13 int unsigned, col14 bigint unsigned) tags(loc nchar(20))'''
)
tdSql
.
execute
(
"create table test1 using test tags('beijing')"
)
col0
=
0
for
i
in
range
(
self
.
rowNum
):
tdSql
.
execute
(
"insert into test1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
%
(
self
.
ts
+
i
,
col0
,
i
+
1
,
i
+
1
,
i
+
1
,
i
+
0.1
,
i
+
0.1
,
i
%
2
,
i
+
1
,
i
+
1
,
i
+
1
,
i
+
1
,
i
+
1
,
i
+
1
))
# operation not allowed on super table
tdSql
.
error
(
"select count(*) from test session(ts, 1s)"
)
# operation not allowde on col pro
tdSql
.
error
(
"select * from test1 session(ts, 1s)"
)
# operation not allowed on col except primary ts
tdSql
.
error
(
"select * from test1 session(col1, 1s)"
)
tdSql
.
query
(
"select count(*) from test1 state_window(col1)"
)
tdSql
.
checkRows
(
1
)
tdSql
.
checkData
(
0
,
0
,
self
.
rowNum
)
# append more data
col0
=
col0
+
1
for
i
in
range
(
self
.
rowNum
):
tdSql
.
execute
(
"insert into test1 values(%d, %d, %d, %d, %d, %f, %f, %d, 'taosdata%d', '涛思数据%d', %d, %d, %d, %d)"
%
(
self
.
ts
+
i
+
10000
,
col0
,
i
+
1
,
i
+
1
,
i
+
1
,
i
+
0.1
,
i
+
0.1
,
i
%
2
,
i
+
1
,
i
+
1
,
i
+
1
,
i
+
1
,
i
+
1
,
i
+
1
))
tdSql
.
query
(
"select count(*) from test1 state_window(col1)"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
0
,
self
.
rowNum
)
tdSql
.
checkData
(
1
,
0
,
self
.
rowNum
)
tdSql
.
query
(
"select first(col1) from test1 state_window(col1)"
)
tdSql
.
checkRows
(
2
)
col0
=
col0
-
1
tdSql
.
checkData
(
0
,
0
,
col0
)
col0
=
col0
+
1
tdSql
.
checkData
(
1
,
0
,
col0
)
tdSql
.
query
(
"select first(col2) from test1 state_window(col1)"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
0
,
1
)
tdSql
.
checkData
(
1
,
0
,
1
)
tdSql
.
query
(
"select count(col1), first(col2) from test1 state_window(col1)"
)
tdSql
.
checkRows
(
2
)
tdSql
.
checkData
(
0
,
0
,
10
)
tdSql
.
checkData
(
0
,
1
,
1
)
tdSql
.
checkData
(
1
,
0
,
10
)
tdSql
.
checkData
(
1
,
1
,
1
)
#tdSql.query("select count(*) from test1 session(ts, 1m)")
#tdSql.checkRows(1)
#tdSql.checkData(0, 1, 11)
#tdSql.query("select first(col1) from test1 session(ts, 1s)")
#tdSql.checkRows(2)
#tdSql.checkData(0, 1, 1)
#tdSql.checkData(1, 1, 1)
#tdSql.query("select first(col1), last(col2) from test1 session(ts, 1s)")
#tdSql.checkRows(2)
#tdSql.checkData(0, 1, 1)
#tdSql.checkData(0, 2, 10)
#tdSql.checkData(1, 1, 1)
#tdSql.checkData(1, 1, 1)
def
stop
(
self
):
tdSql
.
close
()
tdLog
.
success
(
"%s successfully executed"
%
__file__
)
tdCases
.
addWindows
(
__file__
,
TDTestCase
())
tdCases
.
addLinux
(
__file__
,
TDTestCase
())
tests/pytest/query/queryInsertValue.py
浏览文件 @
cc9759f3
...
...
@@ -45,7 +45,7 @@ class TDTestCase:
tdSql
.
query
(
"select * from st"
)
tdSql
.
checkRows
(
1
)
tdSql
.
execute
(
"alter table st add column len
gth
int"
)
tdSql
.
execute
(
"alter table st add column len int"
)
tdSql
.
execute
(
"insert into t1 values(now, 1, 2)"
)
tdSql
.
query
(
"select last(*) from st"
)
tdSql
.
checkData
(
0
,
2
,
2
);
...
...
tests/pytest/util/dnodes.py
浏览文件 @
cc9759f3
...
...
@@ -432,7 +432,7 @@ class TDDnodes:
self
.
simDeployed
=
False
def
init
(
self
,
path
):
psCmd
=
"ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'"
psCmd
=
"ps -ef|grep -w taosd| grep -v grep
| grep -v defunct
| awk '{print $2}'"
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
).
decode
(
"utf-8"
)
while
(
processID
):
killCmd
=
"kill -TERM %s > /dev/null 2>&1"
%
processID
...
...
@@ -545,14 +545,14 @@ class TDDnodes:
for
i
in
range
(
len
(
self
.
dnodes
)):
self
.
dnodes
[
i
].
stop
()
psCmd
=
"ps -ef | grep -w taosd | grep 'root' | grep -v grep | awk '{print $2}'"
psCmd
=
"ps -ef | grep -w taosd | grep 'root' | grep -v grep
| grep -v defunct
| awk '{print $2}'"
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
).
decode
(
"utf-8"
)
if
processID
:
cmd
=
"sudo systemctl stop taosd"
os
.
system
(
cmd
)
# if os.system(cmd) != 0 :
# tdLog.exit(cmd)
psCmd
=
"ps -ef|grep -w taosd| grep -v grep | awk '{print $2}'"
psCmd
=
"ps -ef|grep -w taosd| grep -v grep
| grep -v defunct
| awk '{print $2}'"
processID
=
subprocess
.
check_output
(
psCmd
,
shell
=
True
).
decode
(
"utf-8"
)
while
(
processID
):
killCmd
=
"kill -TERM %s > /dev/null 2>&1"
%
processID
...
...
tests/script/general/parser/alter_column.sim
0 → 100644
浏览文件 @
cc9759f3
system sh/stop_dnodes.sh
system sh/deploy.sh -n dnode1 -i 1
system sh/cfg.sh -n dnode1 -c walLevel -v 1
system sh/exec.sh -n dnode1 -s start
sleep 100
sql connect
$dbPrefix = m_alt_db
$tbPrefix = m_alt_tb
$mtPrefix = m_alt_mt
$tbNum = 10
$rowNum = 5
$totalNum = $tbNum * $rowNum
$ts0 = 1537146000000
$delta = 600000
print ========== alter.sim
$i = 0
$db = $dbPrefix . $i
$mt = $mtPrefix . $i
sql drop database if exists $db
sql create database $db
sql use $db
##### alter table test, simeplest case
sql create table tb (ts timestamp, c1 int, c2 binary(10), c3 nchar(10))
sql insert into tb values (now, 1, "1", "1")
sql alter table tb modify column c2 binary(20);
if $rows != 0 then
return -1
endi
sql alter table tb modify column c3 nchar(20);
if $rows != 0 then
return -1
endi
sql create stable stb (ts timestamp, c1 int, c2 binary(10), c3 nchar(10)) tags(id1 int, id2 binary(10), id3 nchar(10))
sql create table tb1 using stb tags(1, "a", "b")
sql insert into tb1 values (now, 1, "1", "1")
sql alter stable stb modify column c2 binary(20);
if $rows != 0 then
return -1
endi
sql alter table stb modify column c2 binary(30);
if $rows != 0 then
return -1
endi
sql alter stable stb modify column c3 nchar(20);
if $rows != 0 then
return -1
endi
sql alter table stb modify column c3 nchar(30);
if $rows != 0 then
return -1
endi
sql alter table stb modify tag id2 binary(11);
if $rows != 0 then
return -1
endi
sql alter stable stb modify tag id2 binary(11);
if $rows != 0 then
return -1
endi
sql alter table stb modify tag id3 nchar(11);
if $rows != 0 then
return -1
endi
sql alter stable stb modify tag id3 nchar(11);
if $rows != 0 then
return -1
endi
##### ILLEGAL OPERATIONS
# try dropping columns that are defined in metric
sql_error alter table tb modify column c1 binary(10);
sql_error alter table tb modify column c1 double;
sql_error alter table tb modify column c2 int;
sql_error alter table tb modify column c2 binary(10);
sql_error alter table tb modify column c2 binary(9);
sql_error alter table tb modify column c2 binary(-9);
sql_error alter table tb modify column c2 binary(0);
sql_error alter table tb modify column c2 binary(17000);
sql_error alter table tb modify column c2 nchar(30);
sql_error alter table tb modify column c3 double;
sql_error alter table tb modify column c3 nchar(10);
sql_error alter table tb modify column c3 nchar(0);
sql_error alter table tb modify column c3 nchar(-1);
sql_error alter table tb modify column c3 binary(80);
sql_error alter table tb modify column c3 nchar(17000);
sql_error alter table tb modify column c3 nchar(100), c2 binary(30);
sql_error alter table tb modify column c1 nchar(100), c2 binary(30);
sql_error alter stable tb modify column c2 binary(30);
sql_error alter table tb modify tag c2 binary(30);
sql_error alter table stb modify tag id2 binary(10);
sql_error alter table stb modify tag id2 nchar(30);
sql_error alter stable stb modify tag id2 binary(10);
sql_error alter stable stb modify tag id2 nchar(30);
sql_error alter table stb modify tag id3 nchar(10);
sql_error alter table stb modify tag id3 binary(30);
sql_error alter stable stb modify tag id3 nchar(10);
sql_error alter stable stb modify tag id3 binary(30);
sql_error alter stable stb modify tag id1 binary(30);
sql_error alter stable stb modify tag c1 binary(30);
sql_error alter table tb1 modify column c2 binary(30);
sql_error alter table tb1 modify column c3 nchar(30);
sql_error alter table tb1 modify tag id2 binary(30);
sql_error alter table tb1 modify tag id3 nchar(30);
sql_error alter stable tb1 modify tag id2 binary(30);
sql_error alter stable tb1 modify tag id3 nchar(30);
sql_error alter stable tb1 modify column c2 binary(30);
system sh/exec.sh -n dnode1 -s stop -x SIGINT
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录