Skip to content
体验新版
项目
组织
正在加载...
登录
切换导航
打开侧边栏
taosdata
TDengine
提交
ceb78fda
T
TDengine
项目概览
taosdata
/
TDengine
大约 2 年 前同步成功
通知
1193
Star
22018
Fork
4786
代码
文件
提交
分支
Tags
贡献者
分支图
Diff
Issue
1
列表
看板
标记
里程碑
合并请求
0
Wiki
0
Wiki
分析
仓库
DevOps
项目成员
Pages
T
TDengine
项目概览
项目概览
详情
发布
仓库
仓库
文件
提交
分支
标签
贡献者
分支图
比较
Issue
1
Issue
1
列表
看板
标记
里程碑
合并请求
0
合并请求
0
Pages
分析
分析
仓库分析
DevOps
Wiki
0
Wiki
成员
成员
收起侧边栏
关闭侧边栏
动态
分支图
创建新Issue
提交
Issue看板
未验证
提交
ceb78fda
编写于
6月 24, 2023
作者:
H
Haojun Liao
提交者:
GitHub
6月 24, 2023
浏览文件
操作
浏览文件
下载
差异文件
Merge branch '3.0' into refact/fillhistory
上级
13d9136a
303fdc4f
变更
71
展开全部
显示空白变更内容
内联
并排
Showing
71 changed file
with
9901 addition
and
700 deletion
+9901
-700
.gitignore
.gitignore
+0
-1
cmake/cmake.define
cmake/cmake.define
+1
-1
cmake/cmake.platform
cmake/cmake.platform
+10
-0
cmake/cmake.version
cmake/cmake.version
+1
-1
cmake/rocksdb_CMakeLists.txt.in
cmake/rocksdb_CMakeLists.txt.in
+2
-1
contrib/CMakeLists.txt
contrib/CMakeLists.txt
+41
-29
deps/arm/rocksdb_static/librocksdb.a
deps/arm/rocksdb_static/librocksdb.a
+0
-0
deps/arm/rocksdb_static/rocksdb/c.h
deps/arm/rocksdb_static/rocksdb/c.h
+2844
-0
deps/x86/rocksdb_static/librocksdb.a
deps/x86/rocksdb_static/librocksdb.a
+0
-0
deps/x86/rocksdb_static/rocksdb/c.h
deps/x86/rocksdb_static/rocksdb/c.h
+2844
-0
docs/en/14-reference/03-connector/07-python.mdx
docs/en/14-reference/03-connector/07-python.mdx
+131
-0
docs/en/28-releases/01-tdengine.md
docs/en/28-releases/01-tdengine.md
+4
-0
docs/en/28-releases/02-tools.md
docs/en/28-releases/02-tools.md
+4
-0
docs/examples/python/stmt_example.py
docs/examples/python/stmt_example.py
+82
-0
docs/examples/python/stmt_websocket_example.py
docs/examples/python/stmt_websocket_example.py
+78
-0
docs/zh/08-connector/30-python.mdx
docs/zh/08-connector/30-python.mdx
+135
-0
docs/zh/28-releases/01-tdengine.md
docs/zh/28-releases/01-tdengine.md
+4
-0
docs/zh/28-releases/02-tools.md
docs/zh/28-releases/02-tools.md
+4
-0
include/common/tcommon.h
include/common/tcommon.h
+1
-0
include/libs/nodes/plannodes.h
include/libs/nodes/plannodes.h
+4
-8
include/libs/nodes/querynodes.h
include/libs/nodes/querynodes.h
+1
-0
include/libs/qcom/query.h
include/libs/qcom/query.h
+2
-1
source/client/src/clientMain.c
source/client/src/clientMain.c
+1
-0
source/common/src/tdatablock.c
source/common/src/tdatablock.c
+2
-2
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
+3
-0
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
+5
-2
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
+18
-2
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
+54
-8
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
+1
-0
source/dnode/mnode/impl/src/mndVgroup.c
source/dnode/mnode/impl/src/mndVgroup.c
+1
-0
source/dnode/vnode/CMakeLists.txt
source/dnode/vnode/CMakeLists.txt
+19
-0
source/dnode/vnode/inc/vnode.h
source/dnode/vnode/inc/vnode.h
+1
-0
source/dnode/vnode/src/sma/smaTimeRange.c
source/dnode/vnode/src/sma/smaTimeRange.c
+42
-4
source/dnode/vnode/src/vnd/vnodeOpen.c
source/dnode/vnode/src/vnd/vnodeOpen.c
+55
-11
source/dnode/vnode/src/vnd/vnodeSvr.c
source/dnode/vnode/src/vnd/vnodeSvr.c
+0
-2
source/libs/command/inc/commandInt.h
source/libs/command/inc/commandInt.h
+1
-1
source/libs/command/src/explain.c
source/libs/command/src/explain.c
+28
-2
source/libs/executor/inc/executorInt.h
source/libs/executor/inc/executorInt.h
+2
-2
source/libs/executor/inc/tsort.h
source/libs/executor/inc/tsort.h
+1
-1
source/libs/executor/src/aggregateoperator.c
source/libs/executor/src/aggregateoperator.c
+6
-11
source/libs/executor/src/cachescanoperator.c
source/libs/executor/src/cachescanoperator.c
+1
-0
source/libs/executor/src/eventwindowoperator.c
source/libs/executor/src/eventwindowoperator.c
+4
-1
source/libs/executor/src/exchangeoperator.c
source/libs/executor/src/exchangeoperator.c
+1
-0
source/libs/executor/src/filloperator.c
source/libs/executor/src/filloperator.c
+6
-6
source/libs/executor/src/groupoperator.c
source/libs/executor/src/groupoperator.c
+6
-8
source/libs/executor/src/joinoperator.c
source/libs/executor/src/joinoperator.c
+3
-2
source/libs/executor/src/projectoperator.c
source/libs/executor/src/projectoperator.c
+14
-13
source/libs/executor/src/scanoperator.c
source/libs/executor/src/scanoperator.c
+1
-0
source/libs/executor/src/sortoperator.c
source/libs/executor/src/sortoperator.c
+10
-0
source/libs/executor/src/timesliceoperator.c
source/libs/executor/src/timesliceoperator.c
+110
-33
source/libs/executor/src/timewindowoperator.c
source/libs/executor/src/timewindowoperator.c
+36
-30
source/libs/executor/src/tsort.c
source/libs/executor/src/tsort.c
+1
-0
source/libs/nodes/src/nodesCloneFuncs.c
source/libs/nodes/src/nodesCloneFuncs.c
+5
-4
source/libs/nodes/src/nodesCodeFuncs.c
source/libs/nodes/src/nodesCodeFuncs.c
+7
-26
source/libs/nodes/src/nodesMsgFuncs.c
source/libs/nodes/src/nodesMsgFuncs.c
+15
-25
source/libs/parser/src/parTranslater.c
source/libs/parser/src/parTranslater.c
+4
-4
source/libs/planner/src/planLogicCreater.c
source/libs/planner/src/planLogicCreater.c
+17
-4
source/libs/planner/src/planOptimizer.c
source/libs/planner/src/planOptimizer.c
+86
-25
source/libs/planner/src/planPhysiCreater.c
source/libs/planner/src/planPhysiCreater.c
+14
-4
source/libs/planner/src/planSpliter.c
source/libs/planner/src/planSpliter.c
+5
-3
source/libs/scheduler/src/schTask.c
source/libs/scheduler/src/schTask.c
+1
-1
source/libs/stream/CMakeLists.txt
source/libs/stream/CMakeLists.txt
+8
-0
source/util/src/terror.c
source/util/src/terror.c
+2
-2
tests/parallel_test/cases.task
tests/parallel_test/cases.task
+2
-417
tests/script/tsim/query/explain_tsorder.sim
tests/script/tsim/query/explain_tsorder.sim
+37
-0
tests/script/tsim/query/r/explain_tsorder.result
tests/script/tsim/query/r/explain_tsorder.result
+2560
-0
tests/script/tsim/query/t/explain_tsorder.sql
tests/script/tsim/query/t/explain_tsorder.sql
+73
-0
tests/script/tsim/sma/tsmaCreateInsertQuery.sim
tests/script/tsim/sma/tsmaCreateInsertQuery.sim
+16
-1
tests/system-test/0-others/splitVGroup.py
tests/system-test/0-others/splitVGroup.py
+377
-0
tests/system-test/2-query/interp.py
tests/system-test/2-query/interp.py
+45
-0
utils/tsim/src/simExe.c
utils/tsim/src/simExe.c
+1
-1
未找到文件。
.gitignore
浏览文件 @
ceb78fda
...
...
@@ -16,7 +16,6 @@ debug/
release/
target/
debs/
deps/
rpms/
mac/
*.pyc
...
...
cmake/cmake.define
浏览文件 @
ceb78fda
cmake_minimum_required(VERSION 3.0)
set(CMAKE_VERBOSE_MAKEFILE O
FF
)
set(CMAKE_VERBOSE_MAKEFILE O
N
)
set(TD_BUILD_TAOSA_INTERNAL FALSE)
#set output directory
...
...
cmake/cmake.platform
浏览文件 @
ceb78fda
...
...
@@ -172,5 +172,15 @@ ENDIF()
MESSAGE(STATUS "Platform arch:" ${PLATFORM_ARCH_STR})
set(TD_DEPS_DIR "x86")
if (TD_LINUX)
IF (TD_ARM_64 OR TD_ARM_32)
set(TD_DEPS_DIR "arm")
ELSE()
set(TD_DEPS_DIR "x86")
ENDIF()
endif()
MESSAGE(STATUS "DEPS_DIR: " ${TD_DEPS_DIR})
MESSAGE("C Compiler: ${CMAKE_C_COMPILER} (${CMAKE_C_COMPILER_ID}, ${CMAKE_C_COMPILER_VERSION})")
MESSAGE("CXX Compiler: ${CMAKE_CXX_COMPILER} (${CMAKE_C_COMPILER_ID}, ${CMAKE_CXX_COMPILER_VERSION})")
cmake/cmake.version
浏览文件 @
ceb78fda
...
...
@@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER})
ELSE ()
SET(TD_VER_NUMBER "3.0.
5.0
")
SET(TD_VER_NUMBER "3.0.
6.0.alpha
")
ENDIF ()
IF (DEFINED VERCOMPATIBLE)
...
...
cmake/rocksdb_CMakeLists.txt.in
浏览文件 @
ceb78fda
# rocksdb
IF (NOT ${TD_LINUX})
ExternalProject_Add(rocksdb
URL https://github.com/facebook/rocksdb/archive/refs/tags/v8.1.1.tar.gz
URL_HASH MD5=3b4c97ee45df9c8a5517308d31ab008b
...
...
@@ -11,4 +12,4 @@ ExternalProject_Add(rocksdb
INSTALL_COMMAND ""
TEST_COMMAND ""
)
ENDIF(NOT ${TD_LINUX})
contrib/CMakeLists.txt
浏览文件 @
ceb78fda
...
...
@@ -78,10 +78,18 @@ if(${BUILD_WITH_LEVELDB})
endif
(
${
BUILD_WITH_LEVELDB
}
)
# rocksdb
IF
(
NOT
${
TD_LINUX
}
)
if
(
${
BUILD_WITH_ROCKSDB
}
)
cat
(
"
${
TD_SUPPORT_DIR
}
/rocksdb_CMakeLists.txt.in"
${
CONTRIB_TMP_FILE
}
)
add_definitions
(
-DUSE_ROCKSDB
)
endif
(
${
BUILD_WITH_ROCKSDB
}
)
ELSE
()
if
(
${
BUILD_WITH_ROCKSDB
}
)
#cat("${TD_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
add_definitions
(
-DUSE_ROCKSDB
)
endif
(
${
BUILD_WITH_ROCKSDB
}
)
ENDIF
(
NOT
${
TD_LINUX
}
)
# canonical-raft
if
(
${
BUILD_WITH_CRAFT
}
)
...
...
@@ -227,6 +235,8 @@ endif(${BUILD_WITH_LEVELDB})
# rocksdb
# To support rocksdb build on ubuntu: sudo apt-get install libgflags-dev
IF
(
NOT
${
TD_LINUX
}
)
if
(
${
BUILD_WITH_ROCKSDB
}
)
if
(
${
TD_LINUX
}
)
SET
(
CMAKE_CXX_FLAGS
"
${
CMAKE_CXX_FLAGS_REL
}
-Wno-error=maybe-uninitialized -Wno-error=unused-but-set-variable -Wno-error=unused-variable -Wno-error=unused-function -Wno-errno=unused-private-field -Wno-error=unused-result"
)
...
...
@@ -288,6 +298,7 @@ if(${BUILD_WITH_ROCKSDB})
)
endif
(
${
BUILD_WITH_ROCKSDB
}
)
ENDIF
(
NOT
${
TD_LINUX
}
)
# lucene
# To support build on ubuntu: sudo apt-get install libboost-all-dev
if
(
${
BUILD_WITH_LUCENE
}
)
...
...
@@ -497,6 +508,7 @@ if(${BUILD_GEOS})
endif
(
${
TD_LINUX
}
)
option
(
BUILD_SHARED_LIBS
"Build GEOS with shared libraries"
OFF
)
add_subdirectory
(
geos EXCLUDE_FROM_ALL
)
unset
(
CMAKE_CXX_STANDARD CACHE
)
# undo libgeos's setting of global CMAKE_CXX_STANDARD
target_include_directories
(
geos_c
PUBLIC $<BUILD_INTERFACE:
${
CMAKE_CURRENT_SOURCE_DIR
}
/geos/include>
...
...
deps/arm/rocksdb_static/librocksdb.a
0 → 100644
浏览文件 @
ceb78fda
文件已添加
deps/arm/rocksdb_static/rocksdb/c.h
0 → 100644
浏览文件 @
ceb78fda
此差异已折叠。
点击以展开。
deps/x86/rocksdb_static/librocksdb.a
0 → 100644
浏览文件 @
ceb78fda
文件已添加
deps/x86/rocksdb_static/rocksdb/c.h
0 → 100644
浏览文件 @
ceb78fda
此差异已折叠。
点击以展开。
docs/en/14-reference/03-connector/07-python.mdx
浏览文件 @
ceb78fda
...
...
@@ -667,6 +667,137 @@ Insert with req_id argument
</TabItem>
</Tabs>
### Parameter Binding
The Python connector provides a parameter binding api for inserting data. Similar to most databases, TDengine currently only supports the question mark `?` to indicate the parameters to be bound.
<Tabs>
<TabItem value="native" label="native connection">
#### Create Stmt
Call the `statement` method in `Connection` to create the `stmt` for parameter binding.
```
import taos
conn = taos.connect()
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
```
#### parameter binding
Call the `new_multi_binds` function to create the parameter list for parameter bindings.
```
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
```
Call the `bind_param` (for a single row) method or the `bind_param_batch` (for multiple rows) method to set the values.
```
stmt.bind_param_batch(params)
```
#### execute sql
Call `execute` method to execute sql.
```
stmt.execute()
```
#### Close Stmt
```
stmt.close()
```
#### Example
```python
{{#include docs/examples/python/stmt_example.py}}
```
</TabItem>
<TabItem value="websocket" label="WebSocket connection">
#### Create Stmt
Call the `statement` method in `Connection` to create the `stmt` for parameter binding.
```
import taosws
conn = taosws.connect('taosws://localhost:6041/test')
stmt = conn.statement()
```
#### Prepare sql
Call `prepare` method in stmt to prepare sql.
```
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
```
#### parameter binding
Call the `bind_param` method to bind parameters.
```
stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])
```
Call the `add_batch` method to add parameters to the batch.
```
stmt.add_batch()
```
#### execute sql
Call `execute` method to execute sql.
```
stmt.execute()
```
#### Close Stmt
```
stmt.close()
```
#### Example
```python
{{#include docs/examples/python/stmt_websocket_example.py}}
```
</TabItem>
</Tabs>
### Other sample programs
| Example program links | Example program content |
...
...
docs/en/28-releases/01-tdengine.md
浏览文件 @
ceb78fda
...
...
@@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://w
import Release from "/components/ReleaseV3";
## 3.0.5.1
<Release
type=
"tdengine"
version=
"3.0.5.1"
/>
## 3.0.5.0
<Release
type=
"tdengine"
version=
"3.0.5.0"
/>
...
...
docs/en/28-releases/02-tools.md
浏览文件 @
ceb78fda
...
...
@@ -10,6 +10,10 @@ For other historical version installers, please visit [here](https://www.taosdat
import Release from "/components/ReleaseV3";
## 2.5.2
<Release
type=
"tools"
version=
"2.5.2"
/>
## 2.5.1
<Release
type=
"tools"
version=
"2.5.1"
/>
...
...
docs/examples/python/stmt_example.py
0 → 100644
浏览文件 @
ceb78fda
#!
import
taosws
import
taos
db_name
=
'test_ws_stmt'
def
before
():
taos_conn
=
taos
.
connect
()
taos_conn
.
execute
(
"drop database if exists %s"
%
db_name
)
taos_conn
.
execute
(
"create database %s"
%
db_name
)
taos_conn
.
select_db
(
db_name
)
taos_conn
.
execute
(
"create table t1 (ts timestamp, a int, b float, c varchar(10))"
)
taos_conn
.
execute
(
"create table stb1 (ts timestamp, a int, b float, c varchar(10)) tags (t1 int, t2 binary(10))"
)
taos_conn
.
close
()
def
stmt_insert
():
before
()
conn
=
taosws
.
connect
(
'taosws://root:taosdata@localhost:6041/%s'
%
db_name
)
while
True
:
try
:
stmt
=
conn
.
statement
()
stmt
.
prepare
(
"insert into t1 values (?, ?, ?, ?)"
)
stmt
.
bind_param
([
taosws
.
millis_timestamps_to_column
([
1686844800000
,
1686844801000
,
1686844802000
,
1686844803000
]),
taosws
.
ints_to_column
([
1
,
2
,
3
,
4
]),
taosws
.
floats_to_column
([
1.1
,
2.2
,
3.3
,
4.4
]),
taosws
.
varchar_to_column
([
'a'
,
'b'
,
'c'
,
'd'
]),
])
stmt
.
add_batch
()
rows
=
stmt
.
execute
()
print
(
rows
)
stmt
.
close
()
except
Exception
as
e
:
if
'Retry needed'
in
e
.
args
[
0
]:
# deal with [0x0125] Retry needed
continue
else
:
raise
e
break
def
stmt_insert_into_stable
():
before
()
conn
=
taosws
.
connect
(
"taosws://root:taosdata@localhost:6041/%s"
%
db_name
)
while
True
:
try
:
stmt
=
conn
.
statement
()
stmt
.
prepare
(
"insert into ? using stb1 tags (?, ?) values (?, ?, ?, ?)"
)
stmt
.
set_tbname
(
'stb1_1'
)
stmt
.
set_tags
([
taosws
.
int_to_tag
(
1
),
taosws
.
varchar_to_tag
(
'aaa'
),
])
stmt
.
bind_param
([
taosws
.
millis_timestamps_to_column
([
1686844800000
,
1686844801000
,
1686844802000
,
1686844803000
]),
taosws
.
ints_to_column
([
1
,
2
,
3
,
4
]),
taosws
.
floats_to_column
([
1.1
,
2.2
,
3.3
,
4.4
]),
taosws
.
varchar_to_column
([
'a'
,
'b'
,
'c'
,
'd'
]),
])
stmt
.
add_batch
()
rows
=
stmt
.
execute
()
print
(
rows
)
stmt
.
close
()
except
Exception
as
e
:
if
'Retry needed'
in
e
.
args
[
0
]:
# deal with [0x0125] Retry needed
continue
else
:
raise
e
break
docs/examples/python/stmt_websocket_example.py
0 → 100644
浏览文件 @
ceb78fda
#!
import
time
import
taosws
import
taos
def
before_test
(
db_name
):
taos_conn
=
taos
.
connect
()
taos_conn
.
execute
(
"drop database if exists %s"
%
db_name
)
taos_conn
.
execute
(
"create database %s"
%
db_name
)
taos_conn
.
select_db
(
db_name
)
taos_conn
.
execute
(
"create table t1 (ts timestamp, a int, b float, c varchar(10))"
)
taos_conn
.
execute
(
"create table stb1 (ts timestamp, a int, b float, c varchar(10)) tags (t1 int, t2 binary(10))"
)
taos_conn
.
close
()
def
after_test
(
db_name
):
taos_conn
=
taos
.
connect
()
taos_conn
.
execute
(
"drop database if exists %s"
%
db_name
)
taos_conn
.
close
()
def
stmt_insert
():
db_name
=
'test_ws_stmt_{}'
.
format
(
int
(
time
.
time
()))
before_test
(
db_name
)
conn
=
taosws
.
connect
(
'taosws://root:taosdata@localhost:6041/%s'
%
db_name
)
stmt
=
conn
.
statement
()
stmt
.
prepare
(
"insert into t1 values (?, ?, ?, ?)"
)
stmt
.
bind_param
([
taosws
.
millis_timestamps_to_column
([
1686844800000
,
1686844801000
,
1686844802000
,
1686844803000
]),
taosws
.
ints_to_column
([
1
,
2
,
3
,
4
]),
taosws
.
floats_to_column
([
1.1
,
2.2
,
3.3
,
4.4
]),
taosws
.
varchar_to_column
([
'a'
,
'b'
,
'c'
,
'd'
]),
])
stmt
.
add_batch
()
rows
=
stmt
.
execute
()
assert
rows
==
4
stmt
.
close
()
after_test
(
db_name
)
def
stmt_insert_into_stable
():
db_name
=
'test_ws_stmt_{}'
.
format
(
int
(
time
.
time
()))
before_test
(
db_name
)
conn
=
taosws
.
connect
(
"taosws://root:taosdata@localhost:6041/%s"
%
db_name
)
stmt
=
conn
.
statement
()
stmt
.
prepare
(
"insert into ? using stb1 tags (?, ?) values (?, ?, ?, ?)"
)
stmt
.
set_tbname
(
'stb1_1'
)
stmt
.
set_tags
([
taosws
.
int_to_tag
(
1
),
taosws
.
varchar_to_tag
(
'aaa'
),
])
stmt
.
bind_param
([
taosws
.
millis_timestamps_to_column
([
1686844800000
,
1686844801000
,
1686844802000
,
1686844803000
]),
taosws
.
ints_to_column
([
1
,
2
,
3
,
4
]),
taosws
.
floats_to_column
([
1.1
,
2.2
,
3.3
,
4.4
]),
taosws
.
varchar_to_column
([
'a'
,
'b'
,
'c'
,
'd'
]),
])
stmt
.
add_batch
()
rows
=
stmt
.
execute
()
assert
rows
==
4
stmt
.
close
()
after_test
(
db_name
)
if
__name__
==
'__main__'
:
stmt_insert
()
stmt_insert_into_stable
()
docs/zh/08-connector/30-python.mdx
浏览文件 @
ceb78fda
...
...
@@ -672,6 +672,141 @@ consumer.close()
</TabItem>
</Tabs>
### 通过参数绑定写入数据
TDengine 的 Python 连接器支持参数绑定风格的 Prepare API 方式写入数据,和大多数数据库类似,目前仅支持用 `?` 来代表待绑定的参数。
<Tabs>
<TabItem value="native" label="原生连接">
#### 创建 stmt
Python 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。
```
import taos
conn = taos.connect()
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
```
#### 参数绑定
调用 `new_multi_binds` 函数创建 params 列表,用于参数绑定。
```
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
```
调用 stmt 的 `bind_param` 以单行的方式设置 values 或 `bind_param_batch` 以多行的方式设置 values 方法绑定参数。
```
stmt.bind_param_batch(params)
```
#### 执行 sql
调用 stmt 的 `execute` 方法执行 sql
```
stmt.execute()
```
#### 关闭 stmt
最后需要关闭 stmt。
```
stmt.close()
```
#### 示例代码
```python
{{#include docs/examples/python/stmt_example.py}}
```
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
#### 创建 stmt
Python WebSocket 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。
```
import taosws
conn = taosws.connect('taosws://localhost:6041/test')
stmt = conn.statement()
```
#### 解析 sql
调用 stmt 的 `prepare` 方法来解析 insert 语句。
```
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
```
#### 参数绑定
调用 stmt 的 `bind_param` 方法绑定参数。
```
stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])
```
调用 stmt 的 `add_batch` 方法,将参数加入批处理。
```
stmt.add_batch()
```
#### 执行 sql
调用 stmt 的 `execute` 方法执行 sql
```
stmt.execute()
```
#### 关闭 stmt
最后需要关闭 stmt。
```
stmt.close()
```
#### 示例代码
```python
{{#include docs/examples/python/stmt_websocket_example.py}}
```
</TabItem>
</Tabs>
### 其它示例程序
| 示例程序链接 | 示例程序内容 |
...
...
docs/zh/28-releases/01-tdengine.md
浏览文件 @
ceb78fda
...
...
@@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do
import Release from "/components/ReleaseV3";
## 3.0.5.1
<Release
type=
"tdengine"
version=
"3.0.5.1"
/>
## 3.0.5.0
<Release
type=
"tdengine"
version=
"3.0.5.0"
/>
...
...
docs/zh/28-releases/02-tools.md
浏览文件 @
ceb78fda
...
...
@@ -10,6 +10,10 @@ taosTools 各版本安装包下载链接如下:
import Release from "/components/ReleaseV3";
## 2.5.2
<Release
type=
"tools"
version=
"2.5.2"
/>
## 2.5.1
<Release
type=
"tools"
version=
"2.5.1"
/>
...
...
include/common/tcommon.h
浏览文件 @
ceb78fda
...
...
@@ -203,6 +203,7 @@ typedef struct SDataBlockInfo {
SBlockID
id
;
int16_t
hasVarCol
;
int16_t
dataLoad
;
// denote if the data is loaded or not
uint8_t
scanFlag
;
// TODO: optimize and remove following
int64_t
version
;
// used for stream, and need serialization
...
...
include/libs/nodes/plannodes.h
浏览文件 @
ceb78fda
...
...
@@ -53,6 +53,8 @@ typedef struct SLogicNode {
EDataOrderLevel
requireDataOrder
;
// requirements for input data
EDataOrderLevel
resultDataOrder
;
// properties of the output data
EGroupAction
groupAction
;
EOrder
inputTsOrder
;
EOrder
outputTsOrder
;
}
SLogicNode
;
typedef
enum
EScanType
{
...
...
@@ -111,7 +113,6 @@ typedef struct SJoinLogicNode {
SNode
*
pMergeCondition
;
SNode
*
pOnConditions
;
bool
isSingleTableJoin
;
EOrder
inputTsOrder
;
SNode
*
pColEqualOnConditions
;
}
SJoinLogicNode
;
...
...
@@ -229,8 +230,6 @@ typedef struct SWindowLogicNode {
int8_t
igExpired
;
int8_t
igCheckUpdate
;
EWindowAlgorithm
windowAlgo
;
EOrder
inputTsOrder
;
EOrder
outputTsOrder
;
}
SWindowLogicNode
;
typedef
struct
SFillLogicNode
{
...
...
@@ -241,7 +240,6 @@ typedef struct SFillLogicNode {
SNode
*
pWStartTs
;
SNode
*
pValues
;
// SNodeListNode
STimeWindow
timeRange
;
EOrder
inputTsOrder
;
}
SFillLogicNode
;
typedef
struct
SSortLogicNode
{
...
...
@@ -310,6 +308,8 @@ typedef struct SDataBlockDescNode {
typedef
struct
SPhysiNode
{
ENodeType
type
;
EOrder
inputTsOrder
;
EOrder
outputTsOrder
;
SDataBlockDescNode
*
pOutputDataBlockDesc
;
SNode
*
pConditions
;
SNodeList
*
pChildren
;
...
...
@@ -406,7 +406,6 @@ typedef struct SSortMergeJoinPhysiNode {
SNode
*
pMergeCondition
;
SNode
*
pOnConditions
;
SNodeList
*
pTargets
;
EOrder
inputTsOrder
;
SNode
*
pColEqualOnConditions
;
}
SSortMergeJoinPhysiNode
;
...
...
@@ -460,8 +459,6 @@ typedef struct SWindowPhysiNode {
int64_t
watermark
;
int64_t
deleteMark
;
int8_t
igExpired
;
EOrder
inputTsOrder
;
EOrder
outputTsOrder
;
bool
mergeDataBlock
;
}
SWindowPhysiNode
;
...
...
@@ -488,7 +485,6 @@ typedef struct SFillPhysiNode {
SNode
*
pWStartTs
;
// SColumnNode
SNode
*
pValues
;
// SNodeListNode
STimeWindow
timeRange
;
EOrder
inputTsOrder
;
}
SFillPhysiNode
;
typedef
SFillPhysiNode
SStreamFillPhysiNode
;
...
...
include/libs/nodes/querynodes.h
浏览文件 @
ceb78fda
...
...
@@ -69,6 +69,7 @@ typedef struct SColumnNode {
uint64_t
tableId
;
int8_t
tableType
;
col_id_t
colId
;
uint16_t
projIdx
;
// the idx in project list, start from 1
EColumnType
colType
;
// column or tag
bool
hasIndex
;
char
dbName
[
TSDB_DB_NAME_LEN
];
...
...
include/libs/qcom/query.h
浏览文件 @
ceb78fda
...
...
@@ -281,7 +281,8 @@ extern int32_t (*queryProcessMsgRsp[TDMT_MAX])(void* output, char* msg, int32_t
(_code) == TSDB_CODE_PAR_INVALID_DROP_COL || ((_code) == TSDB_CODE_TDB_INVALID_TABLE_ID))
#define NEED_CLIENT_REFRESH_VG_ERROR(_code) \
((_code) == TSDB_CODE_VND_HASH_MISMATCH || (_code) == TSDB_CODE_VND_INVALID_VGROUP_ID)
#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) ((_code) == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER)
#define NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code) \
((_code) == TSDB_CODE_TDB_INVALID_TABLE_SCHEMA_VER || (_code) == TSDB_CODE_MND_INVALID_SCHEMA_VER)
#define NEED_CLIENT_HANDLE_ERROR(_code) \
(NEED_CLIENT_RM_TBLMETA_ERROR(_code) || NEED_CLIENT_REFRESH_VG_ERROR(_code) || \
NEED_CLIENT_REFRESH_TBLMETA_ERROR(_code))
...
...
source/client/src/clientMain.c
浏览文件 @
ceb78fda
...
...
@@ -1120,6 +1120,7 @@ void doAsyncQuery(SRequestObj *pRequest, bool updateMetaForce) {
if
(
NEED_CLIENT_HANDLE_ERROR
(
code
))
{
tscDebug
(
"0x%"
PRIx64
" client retry to handle the error, code:%d - %s, tryCount:%d, reqId:0x%"
PRIx64
,
pRequest
->
self
,
code
,
tstrerror
(
code
),
pRequest
->
retry
,
pRequest
->
requestId
);
refreshMeta
(
pRequest
->
pTscObj
,
pRequest
);
pRequest
->
prevCode
=
code
;
doAsyncQuery
(
pRequest
,
true
);
return
;
...
...
source/common/src/tdatablock.c
浏览文件 @
ceb78fda
...
...
@@ -783,8 +783,8 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) {
* @return
*/
size_t
blockDataGetSerialMetaSize
(
uint32_t
numOfCols
)
{
// | version | total length | total rows | total columns | flag seg| block group id | column schema
| each column
// length |
// | version | total length | total rows | total columns | flag seg| block group id | column schema
//
| each column
length |
return
sizeof
(
int32_t
)
+
sizeof
(
int32_t
)
+
sizeof
(
int32_t
)
+
sizeof
(
int32_t
)
+
sizeof
(
int32_t
)
+
sizeof
(
uint64_t
)
+
numOfCols
*
(
sizeof
(
int8_t
)
+
sizeof
(
int32_t
))
+
numOfCols
*
sizeof
(
int32_t
);
}
...
...
source/dnode/mgmt/mgmt_vnode/inc/vmInt.h
浏览文件 @
ceb78fda
...
...
@@ -46,6 +46,7 @@ typedef struct {
int32_t
vgId
;
int32_t
vgVersion
;
int8_t
dropped
;
int32_t
toVgId
;
char
path
[
PATH_MAX
+
20
];
}
SWrapperCfg
;
...
...
@@ -55,6 +56,7 @@ typedef struct {
int32_t
refCount
;
int8_t
dropped
;
int8_t
disable
;
int32_t
toVgId
;
char
*
path
;
SVnode
*
pImpl
;
SMultiWorker
pWriteW
;
...
...
@@ -70,6 +72,7 @@ typedef struct {
int32_t
vnodeNum
;
int32_t
opened
;
int32_t
failed
;
bool
updateVnodesList
;
int32_t
threadIndex
;
TdThread
thread
;
SVnodeMgmt
*
pMgmt
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmFile.c
浏览文件 @
ceb78fda
...
...
@@ -71,6 +71,8 @@ static int32_t vmDecodeVnodeList(SJson *pJson, SVnodeMgmt *pMgmt, SWrapperCfg **
if
(
code
<
0
)
goto
_OVER
;
tjsonGetInt32ValueFromDouble
(
vnode
,
"vgVersion"
,
pCfg
->
vgVersion
,
code
);
if
(
code
<
0
)
goto
_OVER
;
tjsonGetInt32ValueFromDouble
(
vnode
,
"toVgId"
,
pCfg
->
toVgId
,
code
);
if
(
code
<
0
)
goto
_OVER
;
snprintf
(
pCfg
->
path
,
sizeof
(
pCfg
->
path
),
"%s%svnode%d"
,
pMgmt
->
path
,
TD_DIRSEP
,
pCfg
->
vgId
);
}
...
...
@@ -165,6 +167,7 @@ static int32_t vmEncodeVnodeList(SJson *pJson, SVnodeObj **ppVnodes, int32_t num
if
(
tjsonAddDoubleToObject
(
vnode
,
"vgId"
,
pVnode
->
vgId
)
<
0
)
return
-
1
;
if
(
tjsonAddDoubleToObject
(
vnode
,
"dropped"
,
pVnode
->
dropped
)
<
0
)
return
-
1
;
if
(
tjsonAddDoubleToObject
(
vnode
,
"vgVersion"
,
pVnode
->
vgVersion
)
<
0
)
return
-
1
;
if
(
pVnode
->
toVgId
&&
tjsonAddDoubleToObject
(
vnode
,
"toVgId"
,
pVnode
->
toVgId
)
<
0
)
return
-
1
;
if
(
tjsonAddItemToArray
(
vnodes
,
vnode
)
<
0
)
return
-
1
;
}
...
...
@@ -179,7 +182,7 @@ int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) {
SVnodeObj
**
ppVnodes
=
NULL
;
char
file
[
PATH_MAX
]
=
{
0
};
char
realfile
[
PATH_MAX
]
=
{
0
};
snprintf
(
file
,
sizeof
(
file
),
"%s%svnodes
.json.bak
"
,
pMgmt
->
path
,
TD_DIRSEP
);
snprintf
(
file
,
sizeof
(
file
),
"%s%svnodes
_tmp.json
"
,
pMgmt
->
path
,
TD_DIRSEP
);
snprintf
(
realfile
,
sizeof
(
realfile
),
"%s%svnodes.json"
,
pMgmt
->
path
,
TD_DIRSEP
);
int32_t
numOfVnodes
=
0
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmHandle.c
浏览文件 @
ceb78fda
...
...
@@ -484,10 +484,18 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t
srcVgId
=
req
.
srcVgId
;
int32_t
dstVgId
=
req
.
dstVgId
;
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
dstVgId
);
if
(
pVnode
!=
NULL
)
{
dError
(
"vgId:%d, vnode already exist"
,
dstVgId
);
vmReleaseVnode
(
pMgmt
,
pVnode
);
terrno
=
TSDB_CODE_VND_ALREADY_EXIST
;
return
-
1
;
}
dInfo
(
"vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d"
,
req
.
srcVgId
,
req
.
hashBegin
,
req
.
hashEnd
,
req
.
dstVgId
);
SVnodeObj
*
pVnode
=
vmAcquireVnode
(
pMgmt
,
srcVgId
);
pVnode
=
vmAcquireVnode
(
pMgmt
,
srcVgId
);
if
(
pVnode
==
NULL
)
{
dError
(
"vgId:%d, failed to alter hashrange since %s"
,
srcVgId
,
terrstr
());
terrno
=
TSDB_CODE_VND_NOT_EXIST
;
...
...
@@ -501,6 +509,13 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
};
tstrncpy
(
wrapperCfg
.
path
,
pVnode
->
path
,
sizeof
(
wrapperCfg
.
path
));
// prepare alter
pVnode
->
toVgId
=
dstVgId
;
if
(
vmWriteVnodeListToFile
(
pMgmt
)
!=
0
)
{
dError
(
"vgId:%d, failed to write vnode list since %s"
,
dstVgId
,
terrstr
());
return
-
1
;
}
dInfo
(
"vgId:%d, close vnode"
,
srcVgId
);
vmCloseVnode
(
pMgmt
,
pVnode
,
true
);
...
...
@@ -532,6 +547,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return
-
1
;
}
// complete alter
if
(
vmWriteVnodeListToFile
(
pMgmt
)
!=
0
)
{
dError
(
"vgId:%d, failed to write vnode list since %s"
,
dstVgId
,
terrstr
());
return
-
1
;
...
...
source/dnode/mgmt/mgmt_vnode/src/vmInt.c
浏览文件 @
ceb78fda
...
...
@@ -158,6 +158,28 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
taosMemoryFree
(
pVnode
);
}
static
int32_t
vmRestoreVgroupId
(
SWrapperCfg
*
pCfg
,
STfs
*
pTfs
)
{
int32_t
srcVgId
=
pCfg
->
vgId
;
int32_t
dstVgId
=
pCfg
->
toVgId
;
if
(
dstVgId
==
0
)
return
0
;
char
srcPath
[
TSDB_FILENAME_LEN
];
char
dstPath
[
TSDB_FILENAME_LEN
];
snprintf
(
srcPath
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
srcVgId
);
snprintf
(
dstPath
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
dstVgId
);
int32_t
vgId
=
vnodeRestoreVgroupId
(
srcPath
,
dstPath
,
srcVgId
,
dstVgId
,
pTfs
);
if
(
vgId
<=
0
)
{
dError
(
"vgId:%d, failed to restore vgroup id. srcPath: %s"
,
pCfg
->
vgId
,
srcPath
);
return
-
1
;
}
pCfg
->
vgId
=
vgId
;
pCfg
->
toVgId
=
0
;
return
0
;
}
static
void
*
vmOpenVnodeInThread
(
void
*
param
)
{
SVnodeThread
*
pThread
=
param
;
SVnodeMgmt
*
pMgmt
=
pThread
->
pMgmt
;
...
...
@@ -174,18 +196,34 @@ static void *vmOpenVnodeInThread(void *param) {
pMgmt
->
state
.
openVnodes
,
pMgmt
->
state
.
totalVnodes
);
tmsgReportStartup
(
"vnode-open"
,
stepDesc
);
if
(
pCfg
->
toVgId
)
{
if
(
vmRestoreVgroupId
(
pCfg
,
pMgmt
->
pTfs
)
!=
0
)
{
dError
(
"vgId:%d, failed to restore vgroup id by thread:%d"
,
pCfg
->
vgId
,
pThread
->
threadIndex
);
pThread
->
failed
++
;
continue
;
}
pThread
->
updateVnodesList
=
true
;
}
snprintf
(
path
,
TSDB_FILENAME_LEN
,
"vnode%svnode%d"
,
TD_DIRSEP
,
pCfg
->
vgId
);
SVnode
*
pImpl
=
vnodeOpen
(
path
,
pMgmt
->
pTfs
,
pMgmt
->
msgCb
);
if
(
pImpl
==
NULL
)
{
dError
(
"vgId:%d, failed to open vnode by thread:%d"
,
pCfg
->
vgId
,
pThread
->
threadIndex
);
pThread
->
failed
++
;
}
else
{
vmOpenVnode
(
pMgmt
,
pCfg
,
pImpl
);
continue
;
}
if
(
vmOpenVnode
(
pMgmt
,
pCfg
,
pImpl
)
!=
0
)
{
dError
(
"vgId:%d, failed to open vnode by thread:%d"
,
pCfg
->
vgId
,
pThread
->
threadIndex
);
pThread
->
failed
++
;
continue
;
}
dInfo
(
"vgId:%d, is opened by thread:%d"
,
pCfg
->
vgId
,
pThread
->
threadIndex
);
pThread
->
opened
++
;
atomic_add_fetch_32
(
&
pMgmt
->
state
.
openVnodes
,
1
);
}
}
dInfo
(
"thread:%d, numOfVnodes:%d, opened:%d failed:%d"
,
pThread
->
threadIndex
,
pThread
->
vnodeNum
,
pThread
->
opened
,
pThread
->
failed
);
...
...
@@ -242,6 +280,8 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
taosThreadAttrDestroy
(
&
thAttr
);
}
bool
updateVnodesList
=
false
;
for
(
int32_t
t
=
0
;
t
<
threadNum
;
++
t
)
{
SVnodeThread
*
pThread
=
&
threads
[
t
];
if
(
pThread
->
vnodeNum
>
0
&&
taosCheckPthreadValid
(
pThread
->
thread
))
{
...
...
@@ -249,6 +289,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
taosThreadClear
(
&
pThread
->
thread
);
}
taosMemoryFree
(
pThread
->
pCfgs
);
if
(
pThread
->
updateVnodesList
)
updateVnodesList
=
true
;
}
taosMemoryFree
(
threads
);
taosMemoryFree
(
pCfgs
);
...
...
@@ -256,10 +297,15 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
if
(
pMgmt
->
state
.
openVnodes
!=
pMgmt
->
state
.
totalVnodes
)
{
dError
(
"there are total vnodes:%d, opened:%d"
,
pMgmt
->
state
.
totalVnodes
,
pMgmt
->
state
.
openVnodes
);
return
-
1
;
}
else
{
}
if
(
updateVnodesList
&&
vmWriteVnodeListToFile
(
pMgmt
)
!=
0
)
{
dError
(
"failed to write vnode list since %s"
,
terrstr
());
return
-
1
;
}
dInfo
(
"successfully opened %d vnodes"
,
pMgmt
->
state
.
totalVnodes
);
return
0
;
}
}
static
void
*
vmCloseVnodeInThread
(
void
*
param
)
{
...
...
source/dnode/mgmt/mgmt_vnode/src/vmWorker.c
浏览文件 @
ceb78fda
...
...
@@ -218,6 +218,7 @@ static int32_t vmPutMsgToQueue(SVnodeMgmt *pMgmt, SRpcMsg *pMsg, EQueueType qtyp
if
(
pMsg
->
msgType
!=
TDMT_VND_ALTER_CONFIRM
&&
pVnode
->
disable
)
{
dDebug
(
"vgId:%d, msg:%p put into vnode-write queue failed since its disable"
,
pVnode
->
vgId
,
pMsg
);
terrno
=
TSDB_CODE_VND_STOPPED
;
code
=
terrno
;
break
;
}
dGTrace
(
"vgId:%d, msg:%p put into vnode-write queue"
,
pVnode
->
vgId
,
pMsg
);
...
...
source/dnode/mnode/impl/src/mndVgroup.c
浏览文件 @
ceb78fda
...
...
@@ -1217,6 +1217,7 @@ static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, i
action
.
pCont
=
pReq
;
action
.
contLen
=
contLen
;
action
.
msgType
=
TDMT_VND_ALTER_HASHRANGE
;
action
.
acceptableCode
=
TSDB_CODE_VND_ALREADY_EXIST
;
if
(
mndTransAppendRedoAction
(
pTrans
,
&
action
)
!=
0
)
{
taosMemoryFree
(
pReq
);
...
...
source/dnode/vnode/CMakeLists.txt
浏览文件 @
ceb78fda
...
...
@@ -80,6 +80,7 @@ IF (TD_VNODE_PLUGINS)
)
ENDIF
()
IF
(
NOT
${
TD_LINUX
}
)
target_include_directories
(
vnode
PUBLIC
"inc"
...
...
@@ -87,7 +88,25 @@ target_include_directories(
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/scalar"
PUBLIC
"
${
TD_SOURCE_DIR
}
/contrib/rocksdb/include"
)
ELSE
()
target_include_directories
(
vnode
PUBLIC
"inc"
PUBLIC
"src/inc"
PUBLIC
"
${
TD_SOURCE_DIR
}
/include/libs/scalar"
)
ENDIF
(
NOT
${
TD_LINUX
}
)
IF
(
TD_LINUX
)
target_include_directories
(
vnode
PUBLIC
"
${
TD_SOURCE_DIR
}
/deps/
${
TD_DEPS_DIR
}
/rocksdb_static"
)
target_link_directories
(
vnode
PUBLIC
"
${
TD_SOURCE_DIR
}
/deps/
${
TD_DEPS_DIR
}
/rocksdb_static"
)
target_link_libraries
(
vnode
PUBLIC os
...
...
source/dnode/vnode/inc/vnode.h
浏览文件 @
ceb78fda
...
...
@@ -54,6 +54,7 @@ void vnodeCleanup();
int32_t
vnodeCreate
(
const
char
*
path
,
SVnodeCfg
*
pCfg
,
STfs
*
pTfs
);
int32_t
vnodeAlterReplica
(
const
char
*
path
,
SAlterVnodeReplicaReq
*
pReq
,
STfs
*
pTfs
);
int32_t
vnodeAlterHashRange
(
const
char
*
srcPath
,
const
char
*
dstPath
,
SAlterVnodeHashRangeReq
*
pReq
,
STfs
*
pTfs
);
int32_t
vnodeRestoreVgroupId
(
const
char
*
srcPath
,
const
char
*
dstPath
,
int32_t
srcVgId
,
int32_t
dstVgId
,
STfs
*
pTfs
);
void
vnodeDestroy
(
const
char
*
path
,
STfs
*
pTfs
);
SVnode
*
vnodeOpen
(
const
char
*
path
,
STfs
*
pTfs
,
SMsgCb
msgCb
);
void
vnodePreClose
(
SVnode
*
pVnode
);
...
...
source/dnode/vnode/src/sma/smaTimeRange.c
浏览文件 @
ceb78fda
...
...
@@ -29,7 +29,7 @@ int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
((
code
=
tdProcessTSmaInsertImpl
(
pSma
,
indexUid
,
msg
))
<
0
)
{
sma
Warn
(
"vgId:%d, insert tsma data failed since %s"
,
SMA_VID
(
pSma
),
tstrerror
(
terrno
));
sma
Error
(
"vgId:%d, insert tsma data failed since %s"
,
SMA_VID
(
pSma
),
tstrerror
(
terrno
));
}
return
code
;
...
...
@@ -346,6 +346,43 @@ _end:
return
TSDB_CODE_SUCCESS
;
}
static
int32_t
tsmaProcessDelReq
(
SSma
*
pSma
,
int64_t
indexUid
,
SBatchDeleteReq
*
pDelReq
)
{
int32_t
code
=
0
;
int32_t
lino
=
0
;
if
(
taosArrayGetSize
(
pDelReq
->
deleteReqs
)
>
0
)
{
int32_t
len
=
0
;
tEncodeSize
(
tEncodeSBatchDeleteReq
,
pDelReq
,
len
,
code
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
void
*
pBuf
=
rpcMallocCont
(
len
+
sizeof
(
SMsgHead
));
if
(
!
pBuf
)
{
code
=
terrno
;
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
SEncoder
encoder
;
tEncoderInit
(
&
encoder
,
POINTER_SHIFT
(
pBuf
,
sizeof
(
SMsgHead
)),
len
);
tEncodeSBatchDeleteReq
(
&
encoder
,
pDelReq
);
tEncoderClear
(
&
encoder
);
((
SMsgHead
*
)
pBuf
)
->
vgId
=
TD_VID
(
pSma
->
pVnode
);
SRpcMsg
delMsg
=
{.
msgType
=
TDMT_VND_BATCH_DEL
,
.
pCont
=
pBuf
,
.
contLen
=
len
+
sizeof
(
SMsgHead
)};
code
=
tmsgPutToQueue
(
&
pSma
->
pVnode
->
msgCb
,
WRITE_QUEUE
,
&
delMsg
);
TSDB_CHECK_CODE
(
code
,
lino
,
_exit
);
}
_exit:
taosArrayDestroy
(
pDelReq
->
deleteReqs
);
if
(
code
)
{
smaError
(
"vgId:%d, failed at line %d to process delete req for smaIndex %"
PRIi64
" since %s"
,
SMA_VID
(
pSma
),
lino
,
indexUid
,
tstrerror
(
code
));
}
return
code
;
}
/**
* @brief Insert/Update Time-range-wise SMA data.
*
...
...
@@ -355,7 +392,6 @@ _end:
*/
static
int32_t
tdProcessTSmaInsertImpl
(
SSma
*
pSma
,
int64_t
indexUid
,
const
char
*
msg
)
{
const
SArray
*
pDataBlocks
=
(
const
SArray
*
)
msg
;
// TODO: destroy SSDataBlocks(msg)
if
(
!
pDataBlocks
)
{
terrno
=
TSDB_CODE_TSMA_INVALID_PTR
;
smaWarn
(
"vgId:%d, insert tsma data failed since pDataBlocks is NULL"
,
SMA_VID
(
pSma
));
...
...
@@ -419,8 +455,10 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
goto
_err
;
}
// TODO deleteReq
taosArrayDestroy
(
deleteReq
.
deleteReqs
);
if
((
terrno
=
tsmaProcessDelReq
(
pSma
,
indexUid
,
&
deleteReq
))
!=
0
)
{
goto
_err
;
}
#if 0
if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) {
terrno = TSDB_CODE_APP_ERROR;
...
...
source/dnode/vnode/src/vnd/vnodeOpen.c
浏览文件 @
ceb78fda
...
...
@@ -136,14 +136,13 @@ static int32_t vnodeVgroupIdLen(int32_t vgId) {
}
int32_t
vnodeRenameVgroupId
(
const
char
*
srcPath
,
const
char
*
dstPath
,
int32_t
srcVgId
,
int32_t
dstVgId
,
STfs
*
pTfs
)
{
int32_t
ret
=
tfsRename
(
pTfs
,
srcPath
,
dstPath
);
if
(
ret
!=
0
)
return
ret
;
int32_t
ret
=
0
;
char
oldRname
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
newRname
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
tsdbPath
[
TSDB_FILENAME_LEN
]
=
{
0
};
char
tsdbFilePrefix
[
TSDB_FILENAME_LEN
]
=
{
0
};
snprintf
(
tsdbPath
,
TSDB_FILENAME_LEN
,
"%s%stsdb"
,
dst
Path
,
TD_DIRSEP
);
snprintf
(
tsdbPath
,
TSDB_FILENAME_LEN
,
"%s%stsdb"
,
src
Path
,
TD_DIRSEP
);
snprintf
(
tsdbFilePrefix
,
TSDB_FILENAME_LEN
,
"tsdb%sv"
,
TD_DIRSEP
);
STfsDir
*
tsdbDir
=
tfsOpendir
(
pTfs
,
tsdbPath
);
...
...
@@ -168,7 +167,7 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
ret
=
tfsRename
(
pTfs
,
tsdbFile
->
rname
,
newRname
);
if
(
ret
!=
0
)
{
v
Info
(
"vgId:%d, failed to rename file from %s to %s since %s"
,
dstVgId
,
tsdbFile
->
rname
,
newRname
,
terrstr
());
v
Error
(
"vgId:%d, failed to rename file from %s to %s since %s"
,
dstVgId
,
tsdbFile
->
rname
,
newRname
,
terrstr
());
tfsClosedir
(
tsdbDir
);
return
ret
;
}
...
...
@@ -176,6 +175,21 @@ int32_t vnodeRenameVgroupId(const char *srcPath, const char *dstPath, int32_t sr
}
tfsClosedir
(
tsdbDir
);
vInfo
(
"vgId:%d, rename dir from %s to %s"
,
dstVgId
,
srcPath
,
dstPath
);
ret
=
tfsRename
(
pTfs
,
srcPath
,
dstPath
);
if
(
ret
!=
0
)
{
vError
(
"vgId:%d, failed to rename dir from %s to %s since %s"
,
dstVgId
,
srcPath
,
dstPath
,
terrstr
());
}
return
ret
;
}
int32_t
vnodeGetAbsDir
(
const
char
*
relPath
,
STfs
*
pTfs
,
char
*
buf
,
size_t
bufLen
)
{
if
(
pTfs
)
{
snprintf
(
buf
,
bufLen
,
"%s%s%s"
,
tfsGetPrimaryPath
(
pTfs
),
TD_DIRSEP
,
relPath
);
}
else
{
snprintf
(
buf
,
bufLen
,
"%s"
,
relPath
);
}
return
0
;
}
...
...
@@ -184,13 +198,7 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
int32_t
ret
=
0
;
if
(
pTfs
)
{
snprintf
(
dir
,
TSDB_FILENAME_LEN
,
"%s%s%s"
,
tfsGetPrimaryPath
(
pTfs
),
TD_DIRSEP
,
srcPath
);
}
else
{
snprintf
(
dir
,
TSDB_FILENAME_LEN
,
"%s"
,
srcPath
);
}
// todo add stat file to handle exception while vnode open
vnodeGetAbsDir
(
srcPath
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
ret
=
vnodeLoadInfo
(
dir
,
&
info
);
if
(
ret
<
0
)
{
...
...
@@ -245,6 +253,42 @@ int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnod
return
0
;
}
int32_t
vnodeRestoreVgroupId
(
const
char
*
srcPath
,
const
char
*
dstPath
,
int32_t
srcVgId
,
int32_t
dstVgId
,
STfs
*
pTfs
)
{
SVnodeInfo
info
=
{
0
};
char
dir
[
TSDB_FILENAME_LEN
]
=
{
0
};
vnodeGetAbsDir
(
dstPath
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
if
(
vnodeLoadInfo
(
dir
,
&
info
)
==
0
)
{
if
(
info
.
config
.
vgId
!=
dstVgId
)
{
vError
(
"vgId:%d, unexpected vnode config.vgId:%d"
,
dstVgId
,
info
.
config
.
vgId
);
return
-
1
;
}
return
dstVgId
;
}
vnodeGetAbsDir
(
srcPath
,
pTfs
,
dir
,
TSDB_FILENAME_LEN
);
if
(
vnodeLoadInfo
(
dir
,
&
info
)
<
0
)
{
vError
(
"vgId:%d, failed to read vnode config from %s since %s"
,
srcVgId
,
srcPath
,
tstrerror
(
terrno
));
return
-
1
;
}
if
(
info
.
config
.
vgId
==
srcVgId
)
{
vInfo
(
"vgId:%d, rollback alter hashrange"
,
srcVgId
);
return
srcVgId
;
}
else
if
(
info
.
config
.
vgId
!=
dstVgId
)
{
vError
(
"vgId:%d, unexpected vnode config.vgId:%d"
,
dstVgId
,
info
.
config
.
vgId
);
return
-
1
;
}
vInfo
(
"vgId:%d, rename %s to %s"
,
dstVgId
,
srcPath
,
dstPath
);
if
(
vnodeRenameVgroupId
(
srcPath
,
dstPath
,
srcVgId
,
dstVgId
,
pTfs
)
<
0
)
{
vError
(
"vgId:%d, failed to rename vnode from %s to %s since %s"
,
dstVgId
,
srcPath
,
dstPath
,
tstrerror
(
terrno
));
return
-
1
;
}
return
dstVgId
;
}
void
vnodeDestroy
(
const
char
*
path
,
STfs
*
pTfs
)
{
vInfo
(
"path:%s is removed while destroy vnode"
,
path
);
tfsRmdir
(
pTfs
,
path
);
...
...
source/dnode/vnode/src/vnd/vnodeSvr.c
浏览文件 @
ceb78fda
...
...
@@ -602,9 +602,7 @@ int32_t vnodeProcessStreamMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo)
}
}
// TODO: remove the function
void
smaHandleRes
(
void
*
pVnode
,
int64_t
smaId
,
const
SArray
*
data
)
{
// TODO
// blockDebugShowDataBlocks(data, __func__);
tdProcessTSmaInsert
(((
SVnode
*
)
pVnode
)
->
pSma
,
smaId
,
(
const
char
*
)
data
);
}
...
...
source/libs/command/inc/commandInt.h
浏览文件 @
ceb78fda
...
...
@@ -145,7 +145,7 @@ typedef struct SExplainCtx {
SHashObj
*
groupHash
;
// Hash<SExplainGroup>
}
SExplainCtx
;
#define EXPLAIN_ORDER_STRING(_order) ((ORDER_ASC == _order) ? "asc" :
"desc
")
#define EXPLAIN_ORDER_STRING(_order) ((ORDER_ASC == _order) ? "asc" :
ORDER_DESC == _order ? "desc" : "unknown
")
#define EXPLAIN_JOIN_STRING(_type) ((JOIN_TYPE_INNER == _type) ? "Inner join" : "Join")
#define INVERAL_TIME_FROM_PRECISION_TO_UNIT(_t, _u, _p) (((_u) == 'n' || (_u) == 'y') ? (_t) : (convertTimeFromPrecisionToUnit(_t, _p, _u)))
...
...
source/libs/command/src/explain.c
浏览文件 @
ceb78fda
...
...
@@ -499,6 +499,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND
(
EXPLAIN_COLUMNS_FORMAT
,
pPrjNode
->
pProjections
->
length
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_WIDTH_FORMAT
,
pPrjNode
->
node
.
pOutputDataBlockDesc
->
totalRowSize
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_INPUT_ORDER_FORMAT
,
EXPLAIN_ORDER_STRING
(
pPrjNode
->
node
.
inputTsOrder
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_RIGHT_PARENTHESIS_FORMAT
);
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
...
...
@@ -544,6 +547,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND
(
EXPLAIN_COLUMNS_FORMAT
,
pJoinNode
->
pTargets
->
length
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_WIDTH_FORMAT
,
pJoinNode
->
node
.
pOutputDataBlockDesc
->
totalRowSize
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_INPUT_ORDER_FORMAT
,
EXPLAIN_ORDER_STRING
(
pJoinNode
->
node
.
inputTsOrder
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_RIGHT_PARENTHESIS_FORMAT
);
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
...
...
@@ -597,6 +603,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_GROUPS_FORMAT
,
pAggNode
->
pGroupKeys
->
length
);
}
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_INPUT_ORDER_FORMAT
,
EXPLAIN_ORDER_STRING
(
pAggNode
->
node
.
inputTsOrder
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_RIGHT_PARENTHESIS_FORMAT
);
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
...
...
@@ -716,6 +725,11 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
case
QUERY_NODE_PHYSICAL_PLAN_SORT
:
{
SSortPhysiNode
*
pSortNode
=
(
SSortPhysiNode
*
)
pNode
;
EXPLAIN_ROW_NEW
(
level
,
EXPLAIN_SORT_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_INPUT_ORDER_FORMAT
,
EXPLAIN_ORDER_STRING
(
pSortNode
->
node
.
inputTsOrder
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT
,
EXPLAIN_ORDER_STRING
(
pSortNode
->
node
.
outputTsOrder
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_LEFT_PARENTHESIS_FORMAT
);
if
(
pResNode
->
pExecInfo
)
{
QRY_ERR_RET
(
qExplainBufAppendExecInfo
(
pResNode
->
pExecInfo
,
tbuf
,
&
tlen
));
...
...
@@ -796,9 +810,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_WIDTH_FORMAT
,
pIntNode
->
window
.
node
.
pOutputDataBlockDesc
->
totalRowSize
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_INPUT_ORDER_FORMAT
,
EXPLAIN_ORDER_STRING
(
pIntNode
->
window
.
inputTsOrder
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_INPUT_ORDER_FORMAT
,
EXPLAIN_ORDER_STRING
(
pIntNode
->
window
.
node
.
inputTsOrder
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT
,
EXPLAIN_ORDER_STRING
(
pIntNode
->
window
.
node
.
outputTsOrder
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT
,
EXPLAIN_ORDER_STRING
(
pIntNode
->
window
.
outputTsOrder
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_RIGHT_PARENTHESIS_FORMAT
);
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
...
...
@@ -847,6 +862,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND
(
EXPLAIN_FUNCTIONS_FORMAT
,
pIntNode
->
window
.
pFuncs
->
length
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_WIDTH_FORMAT
,
pIntNode
->
window
.
node
.
pOutputDataBlockDesc
->
totalRowSize
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_INPUT_ORDER_FORMAT
,
EXPLAIN_ORDER_STRING
(
pIntNode
->
window
.
node
.
inputTsOrder
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT
,
EXPLAIN_ORDER_STRING
(
pIntNode
->
window
.
node
.
outputTsOrder
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_RIGHT_PARENTHESIS_FORMAT
);
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
...
...
@@ -895,6 +914,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND
(
EXPLAIN_MODE_FORMAT
,
nodesGetFillModeString
(
pFillNode
->
mode
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_WIDTH_FORMAT
,
pFillNode
->
node
.
pOutputDataBlockDesc
->
totalRowSize
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_INPUT_ORDER_FORMAT
,
EXPLAIN_ORDER_STRING
(
pFillNode
->
node
.
inputTsOrder
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_RIGHT_PARENTHESIS_FORMAT
);
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
...
...
@@ -1080,6 +1102,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND
(
EXPLAIN_COLUMNS_FORMAT
,
nodesGetOutputNumFromSlotList
(
pDescNode
->
pSlots
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_WIDTH_FORMAT
,
pDescNode
->
totalRowSize
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_INPUT_ORDER_FORMAT
,
EXPLAIN_ORDER_STRING
(
pMergeNode
->
node
.
inputTsOrder
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_BLANK_FORMAT
);
EXPLAIN_ROW_APPEND
(
EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT
,
EXPLAIN_ORDER_STRING
(
pMergeNode
->
node
.
outputTsOrder
));
EXPLAIN_ROW_APPEND
(
EXPLAIN_RIGHT_PARENTHESIS_FORMAT
);
EXPLAIN_ROW_END
();
QRY_ERR_RET
(
qExplainResAppendRow
(
ctx
,
tbuf
,
tlen
,
level
));
...
...
source/libs/executor/inc/executorInt.h
浏览文件 @
ceb78fda
...
...
@@ -399,6 +399,8 @@ typedef struct SOptrBasicInfo {
SResultRowInfo
resultRowInfo
;
SSDataBlock
*
pRes
;
bool
mergeResultBlock
;
int32_t
inputTsOrder
;
int32_t
outputTsOrder
;
}
SOptrBasicInfo
;
typedef
struct
SIntervalAggOperatorInfo
{
...
...
@@ -411,8 +413,6 @@ typedef struct SIntervalAggOperatorInfo {
STimeWindow
win
;
// query time range
bool
timeWindowInterpo
;
// interpolation needed or not
SArray
*
pInterpCols
;
// interpolation columns
int32_t
resultTsOrder
;
// result timestamp order
int32_t
inputOrder
;
// input data ts order
EOPTR_EXEC_MODEL
execModel
;
// operator execution model [batch model|stream model]
STimeWindowAggSupp
twAggSup
;
SArray
*
pPrevValues
;
// SArray<SGroupKeys> used to keep the previous not null value for interpolation.
...
...
source/libs/executor/inc/tsort.h
浏览文件 @
ceb78fda
...
...
@@ -146,7 +146,7 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colId);
* @return
*/
uint64_t
tsortGetGroupId
(
STupleHandle
*
pVHandle
);
void
*
tsortGetBlockInfo
(
STupleHandle
*
pVHandle
);
/**
*
* @param pSortHandle
...
...
source/libs/executor/src/aggregateoperator.c
浏览文件 @
ceb78fda
...
...
@@ -108,6 +108,8 @@ SOperatorInfo* createAggregateOperatorInfo(SOperatorInfo* downstream, SAggPhysiN
pInfo
->
binfo
.
mergeResultBlock
=
pAggNode
->
mergeDataBlock
;
pInfo
->
groupKeyOptimized
=
pAggNode
->
groupKeyOptimized
;
pInfo
->
groupId
=
UINT64_MAX
;
pInfo
->
binfo
.
inputTsOrder
=
pAggNode
->
node
.
inputTsOrder
;
pInfo
->
binfo
.
outputTsOrder
=
pAggNode
->
node
.
outputTsOrder
;
setOperatorInfo
(
pOperator
,
"TableAggregate"
,
QUERY_NODE_PHYSICAL_PLAN_HASH_AGG
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
...
...
@@ -164,10 +166,8 @@ int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
int64_t
st
=
taosGetTimestampUs
();
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
scanFlag
=
MAIN_SCAN
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
int32_t
order
=
pAggInfo
->
binfo
.
inputTsOrder
;
bool
hasValidBlock
=
false
;
while
(
1
)
{
...
...
@@ -185,12 +185,7 @@ int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
}
}
hasValidBlock
=
true
;
int32_t
code
=
getTableScanInfo
(
pOperator
,
&
order
,
&
scanFlag
,
false
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
destroyDataBlockForEmptyInput
(
blockAllocated
,
&
pBlock
);
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
pAggInfo
->
binfo
.
pRes
->
info
.
scanFlag
=
pBlock
->
info
.
scanFlag
;
// there is an scalar expression that needs to be calculated before apply the group aggregation.
if
(
pAggInfo
->
scalarExprSup
.
pExprInfo
!=
NULL
&&
!
blockAllocated
)
{
...
...
@@ -204,7 +199,7 @@ int32_t doOpenAggregateOptr(SOperatorInfo* pOperator) {
// the pDataBlock are always the same one, no need to call this again
setExecutionContext
(
pOperator
,
pOperator
->
exprSupp
.
numOfExprs
,
pBlock
->
info
.
id
.
groupId
);
setInputDataBlock
(
pSup
,
pBlock
,
order
,
scanFlag
,
true
);
setInputDataBlock
(
pSup
,
pBlock
,
order
,
pBlock
->
info
.
scanFlag
,
true
);
code
=
doAggregateImpl
(
pOperator
,
pSup
->
pCtx
);
if
(
code
!=
0
)
{
destroyDataBlockForEmptyInput
(
blockAllocated
,
&
pBlock
);
...
...
source/libs/executor/src/cachescanoperator.c
浏览文件 @
ceb78fda
...
...
@@ -208,6 +208,7 @@ SSDataBlock* doScanCache(SOperatorInfo* pOperator) {
pRes
->
info
.
id
.
uid
=
*
(
tb_uid_t
*
)
taosArrayGet
(
pInfo
->
pUidList
,
pInfo
->
indexOfBufferedRes
);
pRes
->
info
.
rows
=
1
;
pRes
->
info
.
scanFlag
=
MAIN_SCAN
;
SExprSupp
*
pSup
=
&
pInfo
->
pseudoExprSup
;
int32_t
code
=
addTagPseudoColumnData
(
&
pInfo
->
readHandle
,
pSup
->
pExprInfo
,
pSup
->
numOfExprs
,
pRes
,
...
...
source/libs/executor/src/eventwindowoperator.c
浏览文件 @
ceb78fda
...
...
@@ -120,6 +120,8 @@ SOperatorInfo* createEventwindowOperatorInfo(SOperatorInfo* downstream, SPhysiNo
initBasicInfo
(
&
pInfo
->
binfo
,
pResBlock
);
initResultRowInfo
(
&
pInfo
->
binfo
.
resultRowInfo
);
pInfo
->
binfo
.
inputTsOrder
=
physiNode
->
inputTsOrder
;
pInfo
->
binfo
.
outputTsOrder
=
physiNode
->
outputTsOrder
;
pInfo
->
twAggSup
=
(
STimeWindowAggSupp
){.
waterMark
=
pEventWindowNode
->
window
.
watermark
,
.
calTrigger
=
pEventWindowNode
->
window
.
triggerType
};
...
...
@@ -183,7 +185,7 @@ static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) {
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
order
=
pInfo
->
binfo
.
inputTsOrder
;
SSDataBlock
*
pRes
=
pInfo
->
binfo
.
pRes
;
...
...
@@ -196,6 +198,7 @@ static SSDataBlock* eventWindowAggregate(SOperatorInfo* pOperator) {
break
;
}
pRes
->
info
.
scanFlag
=
pBlock
->
info
.
scanFlag
;
setInputDataBlock
(
pSup
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
tsSlotId
);
...
...
source/libs/executor/src/exchangeoperator.c
浏览文件 @
ceb78fda
...
...
@@ -520,6 +520,7 @@ int32_t extractDataBlockFromFetchRsp(SSDataBlock* pRes, char* pData, SArray* pCo
// data from mnode
pRes
->
info
.
dataLoad
=
1
;
pRes
->
info
.
rows
=
pBlock
->
info
.
rows
;
pRes
->
info
.
scanFlag
=
MAIN_SCAN
;
relocateColumnData
(
pRes
,
pColList
,
pBlock
->
pDataBlock
,
false
);
blockDataDestroy
(
pBlock
);
}
...
...
source/libs/executor/src/filloperator.c
浏览文件 @
ceb78fda
...
...
@@ -178,16 +178,15 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
blockDataCleanup
(
pResBlock
);
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
scanFlag
=
MAIN_SCAN
;
getTableScanInfo
(
pOperator
,
&
order
,
&
scanFlag
,
false
);
int32_t
order
=
pInfo
->
pFillInfo
->
order
;
SOperatorInfo
*
pDownstream
=
pOperator
->
pDownstream
[
0
];
#if 0
// the scan order may be different from the output result order for agg interval operator.
if (pDownstream->operatorType == QUERY_NODE_PHYSICAL_PLAN_HASH_INTERVAL) {
order = ((SIntervalAggOperatorInfo*) pDownstream->info)->resultTsOrder;
}
#endif
doHandleRemainBlockFromNewGroup
(
pOperator
,
pInfo
,
pResultInfo
,
order
);
if
(
pResBlock
->
info
.
rows
>
0
)
{
...
...
@@ -206,13 +205,14 @@ static SSDataBlock* doFillImpl(SOperatorInfo* pOperator) {
taosFillSetStartInfo
(
pInfo
->
pFillInfo
,
0
,
pInfo
->
win
.
ekey
);
}
else
{
pResBlock
->
info
.
scanFlag
=
pBlock
->
info
.
scanFlag
;
pBlock
->
info
.
dataLoad
=
1
;
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
primarySrcSlotId
);
blockDataCleanup
(
pInfo
->
pRes
);
blockDataEnsureCapacity
(
pInfo
->
pRes
,
pBlock
->
info
.
rows
);
blockDataEnsureCapacity
(
pInfo
->
pFinalRes
,
pBlock
->
info
.
rows
);
doApplyScalarCalculation
(
pOperator
,
pBlock
,
order
,
scanFlag
);
doApplyScalarCalculation
(
pOperator
,
pBlock
,
order
,
pBlock
->
info
.
scanFlag
);
if
(
pInfo
->
curGroupId
==
0
||
(
pInfo
->
curGroupId
==
pInfo
->
pRes
->
info
.
id
.
groupId
))
{
if
(
pInfo
->
curGroupId
==
0
&&
taosFillNotStarted
(
pInfo
->
pFillInfo
))
{
...
...
@@ -405,7 +405,7 @@ SOperatorInfo* createFillOperatorInfo(SOperatorInfo* downstream, SFillPhysiNode*
?
&
((
SMergeAlignedIntervalAggOperatorInfo
*
)
downstream
->
info
)
->
intervalAggOperatorInfo
->
interval
:
&
((
SIntervalAggOperatorInfo
*
)
downstream
->
info
)
->
interval
;
int32_t
order
=
(
pPhyFillNode
->
inputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
int32_t
order
=
(
pPhyFillNode
->
node
.
inputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
int32_t
type
=
convertFillType
(
pPhyFillNode
->
mode
);
SResultInfo
*
pResultInfo
=
&
pOperator
->
resultInfo
;
...
...
source/libs/executor/src/groupoperator.c
浏览文件 @
ceb78fda
...
...
@@ -378,9 +378,7 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
return
buildGroupResultDataBlock
(
pOperator
);
}
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
scanFlag
=
MAIN_SCAN
;
int32_t
order
=
pInfo
->
binfo
.
inputTsOrder
;
int64_t
st
=
taosGetTimestampUs
();
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
...
...
@@ -390,13 +388,10 @@ static SSDataBlock* hashGroupbyAggregate(SOperatorInfo* pOperator) {
break
;
}
int32_t
code
=
getTableScanInfo
(
pOperator
,
&
order
,
&
scanFlag
,
false
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
pInfo
->
binfo
.
pRes
->
info
.
scanFlag
=
pBlock
->
info
.
scanFlag
;
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
&
pOperator
->
exprSupp
,
pBlock
,
order
,
scanFlag
,
true
);
setInputDataBlock
(
&
pOperator
->
exprSupp
,
pBlock
,
order
,
pBlock
->
info
.
scanFlag
,
true
);
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if
(
pInfo
->
scalarSup
.
pExprInfo
!=
NULL
)
{
...
...
@@ -481,6 +476,8 @@ SOperatorInfo* createGroupOperatorInfo(SOperatorInfo* downstream, SAggPhysiNode*
setOperatorInfo
(
pOperator
,
"GroupbyAggOperator"
,
0
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pInfo
->
binfo
.
mergeResultBlock
=
pAggNode
->
mergeDataBlock
;
pInfo
->
binfo
.
inputTsOrder
=
pAggNode
->
node
.
inputTsOrder
;
pInfo
->
binfo
.
outputTsOrder
=
pAggNode
->
node
.
outputTsOrder
;
pOperator
->
fpSet
=
createOperatorFpSet
(
optrDummyOpenFn
,
hashGroupbyAggregate
,
NULL
,
destroyGroupOperatorInfo
,
optrDefaultBufFn
,
NULL
);
...
...
@@ -762,6 +759,7 @@ static SSDataBlock* hashPartition(SOperatorInfo* pOperator) {
break
;
}
pInfo
->
binfo
.
pRes
->
info
.
scanFlag
=
pBlock
->
info
.
scanFlag
;
// there is an scalar expression that needs to be calculated right before apply the group aggregation.
if
(
pInfo
->
scalarSup
.
pExprInfo
!=
NULL
)
{
pTaskInfo
->
code
=
projectApplyFunctions
(
pInfo
->
scalarSup
.
pExprInfo
,
pBlock
,
pBlock
,
pInfo
->
scalarSup
.
pCtx
,
...
...
source/libs/executor/src/joinoperator.c
浏览文件 @
ceb78fda
...
...
@@ -253,9 +253,9 @@ SOperatorInfo* createMergeJoinOperatorInfo(SOperatorInfo** pDownstream, int32_t
}
pInfo
->
inputOrder
=
TSDB_ORDER_ASC
;
if
(
pJoinNode
->
inputTsOrder
==
ORDER_ASC
)
{
if
(
pJoinNode
->
node
.
inputTsOrder
==
ORDER_ASC
)
{
pInfo
->
inputOrder
=
TSDB_ORDER_ASC
;
}
else
if
(
pJoinNode
->
inputTsOrder
==
ORDER_DESC
)
{
}
else
if
(
pJoinNode
->
node
.
inputTsOrder
==
ORDER_DESC
)
{
pInfo
->
inputOrder
=
TSDB_ORDER_DESC
;
}
...
...
@@ -684,6 +684,7 @@ static void doMergeJoinImpl(struct SOperatorInfo* pOperator, SSDataBlock* pRes)
// the pDataBlock are always the same one, no need to call this again
pRes
->
info
.
rows
=
nrows
;
pRes
->
info
.
dataLoad
=
1
;
pRes
->
info
.
scanFlag
=
MAIN_SCAN
;
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
{
break
;
}
...
...
source/libs/executor/src/projectoperator.c
浏览文件 @
ceb78fda
...
...
@@ -107,6 +107,8 @@ SOperatorInfo* createProjectOperatorInfo(SOperatorInfo* downstream, SProjectPhys
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
pFinalRes
=
createOneDataBlock
(
pResBlock
,
false
);
pInfo
->
binfo
.
inputTsOrder
=
pProjPhyNode
->
node
.
inputTsOrder
;
pInfo
->
binfo
.
outputTsOrder
=
pProjPhyNode
->
node
.
outputTsOrder
;
if
(
pTaskInfo
->
execModel
==
OPTR_EXEC_MODEL_STREAM
)
{
pInfo
->
mergeDataBlocks
=
false
;
...
...
@@ -253,8 +255,9 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
}
int64_t
st
=
0
;
int32_t
order
=
0
;
int32_t
order
=
pInfo
->
inputTsOrder
;
int32_t
scanFlag
=
0
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
if
(
pOperator
->
cost
.
openCost
==
0
)
{
st
=
taosGetTimestampUs
();
...
...
@@ -299,10 +302,10 @@ SSDataBlock* doProjectOperation(SOperatorInfo* pOperator) {
break
;
}
// the pDataBlock are always the same one, no need to call this again
int32_t
code
=
getTableScanInfo
(
downstream
,
&
order
,
&
scanFlag
,
false
)
;
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
)
;
if
(
pProjectInfo
->
mergeDataBlocks
)
{
pFinalRes
->
info
.
scanFlag
=
scanFlag
=
pBlock
->
info
.
scanFlag
;
}
else
{
pRes
->
info
.
scanFlag
=
scanFlag
=
pBlock
->
info
.
scanFlag
;
}
setInputDataBlock
(
pSup
,
pBlock
,
order
,
scanFlag
,
false
);
...
...
@@ -421,6 +424,8 @@ SOperatorInfo* createIndefinitOutputOperatorInfo(SOperatorInfo* downstream, SPhy
}
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
binfo
.
inputTsOrder
=
pNode
->
inputTsOrder
;
pInfo
->
binfo
.
outputTsOrder
=
pNode
->
outputTsOrder
;
pInfo
->
pPseudoColInfo
=
setRowTsColumnOutputInfo
(
pSup
->
pCtx
,
numOfExpr
);
setOperatorInfo
(
pOperator
,
"IndefinitOperator"
,
QUERY_NODE_PHYSICAL_PLAN_INDEF_ROWS_FUNC
,
false
,
OP_NOT_OPENED
,
pInfo
,
...
...
@@ -444,18 +449,13 @@ _error:
static
void
doHandleDataBlock
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
,
SOperatorInfo
*
downstream
,
SExecTaskInfo
*
pTaskInfo
)
{
int32_t
order
=
0
;
int32_t
scanFlag
=
0
;
SIndefOperatorInfo
*
pIndefInfo
=
pOperator
->
info
;
SOptrBasicInfo
*
pInfo
=
&
pIndefInfo
->
binfo
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
// the pDataBlock are always the same one, no need to call this again
int32_t
code
=
getTableScanInfo
(
downstream
,
&
order
,
&
scanFlag
,
false
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
int32_t
order
=
pInfo
->
inputTsOrder
;
int32_t
scanFlag
=
pBlock
->
info
.
scanFlag
;
int32_t
code
=
TSDB_CODE_SUCCESS
;
// there is an scalar expression that needs to be calculated before apply the group aggregation.
SExprSupp
*
pScalarSup
=
&
pIndefInfo
->
scalarSup
;
...
...
@@ -521,6 +521,7 @@ SSDataBlock* doApplyIndefinitFunction(SOperatorInfo* pOperator) {
setOperatorCompleted
(
pOperator
);
break
;
}
pInfo
->
pRes
->
info
.
scanFlag
=
pBlock
->
info
.
scanFlag
;
if
(
pIndefInfo
->
groupId
==
0
&&
pBlock
->
info
.
id
.
groupId
!=
0
)
{
pIndefInfo
->
groupId
=
pBlock
->
info
.
id
.
groupId
;
// this is the initial group result
...
...
source/libs/executor/src/scanoperator.c
浏览文件 @
ceb78fda
...
...
@@ -713,6 +713,7 @@ static SSDataBlock* doTableScanImpl(SOperatorInfo* pOperator) {
pTableScanInfo
->
base
.
readRecorder
.
elapsedTime
+=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
pOperator
->
cost
.
totalCost
=
pTableScanInfo
->
base
.
readRecorder
.
elapsedTime
;
pBlock
->
info
.
scanFlag
=
pTableScanInfo
->
base
.
scanFlag
;
return
pBlock
;
}
return
NULL
;
...
...
source/libs/executor/src/sortoperator.c
浏览文件 @
ceb78fda
...
...
@@ -69,6 +69,8 @@ SOperatorInfo* createSortOperatorInfo(SOperatorInfo* downstream, SSortPhysiNode*
pInfo
->
binfo
.
pRes
=
createDataBlockFromDescNode
(
pDescNode
);
pInfo
->
pSortInfo
=
createSortInfo
(
pSortNode
->
pSortKeys
);
pInfo
->
binfo
.
inputTsOrder
=
pSortNode
->
node
.
inputTsOrder
;
pInfo
->
binfo
.
outputTsOrder
=
pSortNode
->
node
.
outputTsOrder
;
initLimitInfo
(
pSortNode
->
node
.
pLimit
,
pSortNode
->
node
.
pSlimit
,
&
pInfo
->
limitInfo
);
setOperatorInfo
(
pOperator
,
"SortOperator"
,
QUERY_NODE_PHYSICAL_PLAN_SORT
,
true
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
...
...
@@ -114,6 +116,7 @@ void appendOneRowToDataBlock(SSDataBlock* pBlock, STupleHandle* pTupleHandle) {
}
pBlock
->
info
.
dataLoad
=
1
;
pBlock
->
info
.
scanFlag
=
((
SDataBlockInfo
*
)
tsortGetBlockInfo
(
pTupleHandle
))
->
scanFlag
;
pBlock
->
info
.
rows
+=
1
;
}
...
...
@@ -155,6 +158,7 @@ SSDataBlock* getSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlock, i
pDataBlock
->
info
.
dataLoad
=
1
;
pDataBlock
->
info
.
rows
=
p
->
info
.
rows
;
pDataBlock
->
info
.
scanFlag
=
p
->
info
.
scanFlag
;
}
blockDataDestroy
(
p
);
...
...
@@ -331,6 +335,7 @@ SSDataBlock* getGroupSortedBlockData(SSortHandle* pHandle, SSDataBlock* pDataBlo
pDataBlock
->
info
.
rows
=
p
->
info
.
rows
;
pDataBlock
->
info
.
capacity
=
p
->
info
.
rows
;
pDataBlock
->
info
.
scanFlag
=
p
->
info
.
scanFlag
;
}
blockDataDestroy
(
p
);
...
...
@@ -505,6 +510,8 @@ SOperatorInfo* createGroupSortOperatorInfo(SOperatorInfo* downstream, SGroupSort
pInfo
->
binfo
.
pRes
=
createDataBlockFromDescNode
(
pDescNode
);
blockDataEnsureCapacity
(
pInfo
->
binfo
.
pRes
,
pOperator
->
resultInfo
.
capacity
);
pInfo
->
binfo
.
inputTsOrder
=
pSortPhyNode
->
node
.
inputTsOrder
;
pInfo
->
binfo
.
outputTsOrder
=
pSortPhyNode
->
node
.
outputTsOrder
;
int32_t
numOfOutputCols
=
0
;
int32_t
code
=
extractColMatchInfo
(
pSortPhyNode
->
pTargets
,
pDescNode
,
&
numOfOutputCols
,
COL_MATCH_FROM_SLOT_ID
,
...
...
@@ -698,6 +705,7 @@ SSDataBlock* getMultiwaySortedBlockData(SSortHandle* pHandle, SSDataBlock* pData
}
pDataBlock
->
info
.
rows
=
p
->
info
.
rows
;
pDataBlock
->
info
.
scanFlag
=
p
->
info
.
scanFlag
;
if
(
pInfo
->
ignoreGroupId
)
{
pDataBlock
->
info
.
id
.
groupId
=
0
;
}
else
{
...
...
@@ -799,6 +807,8 @@ SOperatorInfo* createMultiwayMergeOperatorInfo(SOperatorInfo** downStreams, size
size_t
numOfCols
=
taosArrayGetSize
(
pInfo
->
binfo
.
pRes
->
pDataBlock
);
pInfo
->
bufPageSize
=
getProperSortPageSize
(
rowSize
,
numOfCols
);
pInfo
->
sortBufSize
=
pInfo
->
bufPageSize
*
(
numStreams
+
1
);
// one additional is reserved for merged result.
pInfo
->
binfo
.
inputTsOrder
=
pMergePhyNode
->
node
.
inputTsOrder
;
pInfo
->
binfo
.
outputTsOrder
=
pMergePhyNode
->
node
.
outputTsOrder
;
setOperatorInfo
(
pOperator
,
"MultiwayMergeOperator"
,
QUERY_NODE_PHYSICAL_PLAN_MERGE
,
false
,
OP_NOT_OPENED
,
pInfo
,
pTaskInfo
);
pOperator
->
fpSet
=
createOperatorFpSet
(
openMultiwayMergeOperator
,
doMultiwayMerge
,
NULL
,
...
...
source/libs/executor/src/timesliceoperator.c
浏览文件 @
ceb78fda
...
...
@@ -44,6 +44,8 @@ typedef struct STimeSliceOperatorInfo {
uint64_t
groupId
;
SGroupKeys
*
pPrevGroupKey
;
SSDataBlock
*
pNextGroupRes
;
SSDataBlock
*
pRemainRes
;
// save block unfinished processing
int32_t
remainIndex
;
// the remaining index in the block to be processed
}
STimeSliceOperatorInfo
;
static
void
destroyTimeSliceOperatorInfo
(
void
*
param
);
...
...
@@ -644,13 +646,47 @@ static int32_t resetKeeperInfo(STimeSliceOperatorInfo* pInfo) {
return
TSDB_CODE_SUCCESS
;
}
static
bool
checkThresholdReached
(
STimeSliceOperatorInfo
*
pSliceInfo
,
int32_t
threshold
)
{
SSDataBlock
*
pResBlock
=
pSliceInfo
->
pRes
;
if
(
pResBlock
->
info
.
rows
>
threshold
)
{
return
true
;
}
return
false
;
}
static
bool
checkWindowBoundReached
(
STimeSliceOperatorInfo
*
pSliceInfo
)
{
if
(
pSliceInfo
->
current
>
pSliceInfo
->
win
.
ekey
)
{
return
true
;
}
return
false
;
}
static
void
saveBlockStatus
(
STimeSliceOperatorInfo
*
pSliceInfo
,
SSDataBlock
*
pBlock
,
int32_t
curIndex
)
{
SSDataBlock
*
pResBlock
=
pSliceInfo
->
pRes
;
SColumnInfoData
*
pTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pSliceInfo
->
tsCol
.
slotId
);
if
(
curIndex
<
pBlock
->
info
.
rows
-
1
)
{
pSliceInfo
->
pRemainRes
=
pBlock
;
pSliceInfo
->
remainIndex
=
curIndex
+
1
;
return
;
}
// all data in remaining block processed
pSliceInfo
->
pRemainRes
=
NULL
;
}
static
void
doTimesliceImpl
(
SOperatorInfo
*
pOperator
,
STimeSliceOperatorInfo
*
pSliceInfo
,
SSDataBlock
*
pBlock
,
SExecTaskInfo
*
pTaskInfo
,
bool
ignoreNull
)
{
SSDataBlock
*
pResBlock
=
pSliceInfo
->
pRes
;
SInterval
*
pInterval
=
&
pSliceInfo
->
interval
;
SColumnInfoData
*
pTsCol
=
taosArrayGet
(
pBlock
->
pDataBlock
,
pSliceInfo
->
tsCol
.
slotId
);
for
(
int32_t
i
=
0
;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
int32_t
i
=
(
pSliceInfo
->
pRemainRes
==
NULL
)
?
0
:
pSliceInfo
->
remainIndex
;
for
(;
i
<
pBlock
->
info
.
rows
;
++
i
)
{
int64_t
ts
=
*
(
int64_t
*
)
colDataGetData
(
pTsCol
,
i
);
// check for duplicate timestamps
...
...
@@ -662,10 +698,6 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
continue
;
}
if
(
pSliceInfo
->
current
>
pSliceInfo
->
win
.
ekey
)
{
break
;
}
if
(
ts
==
pSliceInfo
->
current
)
{
addCurrentRowToResult
(
pSliceInfo
,
&
pOperator
->
exprSupp
,
pResBlock
,
pBlock
,
i
);
...
...
@@ -674,9 +706,14 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
pSliceInfo
->
current
=
taosTimeAdd
(
pSliceInfo
->
current
,
pInterval
->
interval
,
pInterval
->
intervalUnit
,
pInterval
->
precision
);
if
(
pSliceInfo
->
current
>
pSliceInfo
->
win
.
ekey
)
{
if
(
checkWindowBoundReached
(
pSliceInfo
))
{
break
;
}
if
(
checkThresholdReached
(
pSliceInfo
,
pOperator
->
resultInfo
.
threshold
))
{
saveBlockStatus
(
pSliceInfo
,
pBlock
,
i
);
return
;
}
}
else
if
(
ts
<
pSliceInfo
->
current
)
{
// in case of interpolation window starts and ends between two datapoints, fill(prev) need to interpolate
doKeepPrevRows
(
pSliceInfo
,
pBlock
,
i
);
...
...
@@ -697,9 +734,13 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
}
}
if
(
pSliceInfo
->
current
>
pSliceInfo
->
win
.
ekey
)
{
if
(
checkWindowBoundReached
(
pSliceInfo
)
)
{
break
;
}
if
(
checkThresholdReached
(
pSliceInfo
,
pOperator
->
resultInfo
.
threshold
))
{
saveBlockStatus
(
pSliceInfo
,
pBlock
,
i
);
return
;
}
}
else
{
// ignore current row, and do nothing
}
...
...
@@ -730,11 +771,20 @@ static void doTimesliceImpl(SOperatorInfo* pOperator, STimeSliceOperatorInfo* pS
}
doKeepPrevRows
(
pSliceInfo
,
pBlock
,
i
);
if
(
pSliceInfo
->
current
>
pSliceInfo
->
win
.
ekey
)
{
if
(
checkWindowBoundReached
(
pSliceInfo
)
)
{
break
;
}
if
(
checkThresholdReached
(
pSliceInfo
,
pOperator
->
resultInfo
.
threshold
))
{
saveBlockStatus
(
pSliceInfo
,
pBlock
,
i
);
return
;
}
}
}
// if reached here, meaning block processing finished naturally,
// or interpolation reach window upper bound
pSliceInfo
->
pRemainRes
=
NULL
;
}
static
void
genInterpAfterDataBlock
(
STimeSliceOperatorInfo
*
pSliceInfo
,
SOperatorInfo
*
pOperator
,
int32_t
index
)
{
...
...
@@ -781,39 +831,69 @@ static void resetTimesliceInfo(STimeSliceOperatorInfo* pSliceInfo) {
resetKeeperInfo
(
pSliceInfo
);
}
static
void
doHandleTimeslice
(
SOperatorInfo
*
pOperator
,
SSDataBlock
*
pBlock
)
{
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
STimeSliceOperatorInfo
*
pSliceInfo
=
pOperator
->
info
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
bool
ignoreNull
=
getIgoreNullRes
(
pSup
);
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
code
=
initKeeperInfo
(
pSliceInfo
,
pBlock
,
&
pOperator
->
exprSupp
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
if
(
pSliceInfo
->
scalarSup
.
pExprInfo
!=
NULL
)
{
SExprSupp
*
pExprSup
=
&
pSliceInfo
->
scalarSup
;
projectApplyFunctions
(
pExprSup
->
pExprInfo
,
pBlock
,
pBlock
,
pExprSup
->
pCtx
,
pExprSup
->
numOfExprs
,
NULL
);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pSup
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
doTimesliceImpl
(
pOperator
,
pSliceInfo
,
pBlock
,
pTaskInfo
,
ignoreNull
);
copyPrevGroupKey
(
&
pOperator
->
exprSupp
,
pSliceInfo
->
pPrevGroupKey
,
pBlock
);
}
static
SSDataBlock
*
doTimeslice
(
SOperatorInfo
*
pOperator
)
{
if
(
pOperator
->
status
==
OP_EXEC_DONE
)
{
return
NULL
;
}
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
STimeSliceOperatorInfo
*
pSliceInfo
=
pOperator
->
info
;
SSDataBlock
*
pResBlock
=
pSliceInfo
->
pRes
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
bool
ignoreNull
=
getIgoreNullRes
(
pSup
);
int32_t
order
=
TSDB_ORDER_ASC
;
SInterval
*
pInterval
=
&
pSliceInfo
->
interval
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
blockDataCleanup
(
pResBlock
);
while
(
1
)
{
if
(
pSliceInfo
->
pNextGroupRes
!=
NULL
)
{
setInputDataBlock
(
pSup
,
pSliceInfo
->
pNextGroupRes
,
order
,
MAIN_SCAN
,
true
);
doTimesliceImpl
(
pOperator
,
pSliceInfo
,
pSliceInfo
->
pNextGroupRes
,
pTaskInfo
,
ignoreNull
);
copyPrevGroupKey
(
&
pOperator
->
exprSupp
,
pSliceInfo
->
pPrevGroupKey
,
pSliceInfo
->
pNextGroupRes
);
doHandleTimeslice
(
pOperator
,
pSliceInfo
->
pNextGroupRes
);
if
(
checkWindowBoundReached
(
pSliceInfo
)
||
checkThresholdReached
(
pSliceInfo
,
pOperator
->
resultInfo
.
threshold
))
{
doFilter
(
pResBlock
,
pOperator
->
exprSupp
.
pFilterInfo
,
NULL
);
if
(
pSliceInfo
->
pRemainRes
==
NULL
)
{
pSliceInfo
->
pNextGroupRes
=
NULL
;
}
if
(
pResBlock
->
info
.
rows
!=
0
)
{
goto
_finished
;
}
else
{
// after fillter if result block has 0 rows, go back to
// process pNextGroupRes again for unfinished data
continue
;
}
}
pSliceInfo
->
pNextGroupRes
=
NULL
;
}
while
(
1
)
{
SSDataBlock
*
pBlock
=
downstream
->
fpSet
.
getNextFn
(
downstream
);
SSDataBlock
*
pBlock
=
pSliceInfo
->
pRemainRes
?
pSliceInfo
->
pRemainRes
:
downstream
->
fpSet
.
getNextFn
(
downstream
);
if
(
pBlock
==
NULL
)
{
setOperatorCompleted
(
pOperator
);
break
;
}
pResBlock
->
info
.
scanFlag
=
pBlock
->
info
.
scanFlag
;
if
(
pSliceInfo
->
groupId
==
0
&&
pBlock
->
info
.
id
.
groupId
!=
0
)
{
pSliceInfo
->
groupId
=
pBlock
->
info
.
id
.
groupId
;
}
else
{
...
...
@@ -824,21 +904,15 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
}
}
if
(
pSliceInfo
->
scalarSup
.
pExprInfo
!=
NULL
)
{
SExprSupp
*
pExprSup
=
&
pSliceInfo
->
scalarSup
;
projectApplyFunctions
(
pExprSup
->
pExprInfo
,
pBlock
,
pBlock
,
pExprSup
->
pCtx
,
pExprSup
->
numOfExprs
,
NULL
);
doHandleTimeslice
(
pOperator
,
pBlock
);
if
(
checkWindowBoundReached
(
pSliceInfo
)
||
checkThresholdReached
(
pSliceInfo
,
pOperator
->
resultInfo
.
threshold
))
{
doFilter
(
pResBlock
,
pOperator
->
exprSupp
.
pFilterInfo
,
NULL
);
if
(
pResBlock
->
info
.
rows
!=
0
)
{
goto
_finished
;
}
int32_t
code
=
initKeeperInfo
(
pSliceInfo
,
pBlock
,
&
pOperator
->
exprSupp
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
T_LONG_JMP
(
pTaskInfo
->
env
,
code
);
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pSup
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
doTimesliceImpl
(
pOperator
,
pSliceInfo
,
pBlock
,
pTaskInfo
,
ignoreNull
);
copyPrevGroupKey
(
&
pOperator
->
exprSupp
,
pSliceInfo
->
pPrevGroupKey
,
pBlock
);
}
// post work for a specific group
// check if need to interpolate after last datablock
// except for fill(next), fill(linear)
...
...
@@ -851,11 +925,12 @@ static SSDataBlock* doTimeslice(SOperatorInfo* pOperator) {
// restore initial value for next group
resetTimesliceInfo
(
pSliceInfo
);
if
(
pResBlock
->
info
.
rows
>=
4096
)
{
if
(
pResBlock
->
info
.
rows
!=
0
)
{
break
;
}
}
_finished:
// restore the value
setTaskStatus
(
pOperator
->
pTaskInfo
,
TASK_COMPLETED
);
if
(
pResBlock
->
info
.
rows
==
0
)
{
...
...
@@ -911,6 +986,8 @@ SOperatorInfo* createTimeSliceOperatorInfo(SOperatorInfo* downstream, SPhysiNode
pInfo
->
groupId
=
0
;
pInfo
->
pPrevGroupKey
=
NULL
;
pInfo
->
pNextGroupRes
=
NULL
;
pInfo
->
pRemainRes
=
NULL
;
pInfo
->
remainIndex
=
0
;
if
(
downstream
->
operatorType
==
QUERY_NODE_PHYSICAL_PLAN_TABLE_SCAN
)
{
STableScanInfo
*
pScanInfo
=
(
STableScanInfo
*
)
downstream
->
info
;
...
...
source/libs/executor/src/timewindowoperator.c
浏览文件 @
ceb78fda
...
...
@@ -358,7 +358,7 @@ static void setNotInterpoWindowKey(SqlFunctionCtx* pCtx, int32_t numOfOutput, in
static
bool
setTimeWindowInterpolationStartTs
(
SIntervalAggOperatorInfo
*
pInfo
,
int32_t
pos
,
SSDataBlock
*
pBlock
,
const
TSKEY
*
tsCols
,
STimeWindow
*
win
,
SExprSupp
*
pSup
)
{
bool
ascQuery
=
(
pInfo
->
input
Order
==
TSDB_ORDER_ASC
);
bool
ascQuery
=
(
pInfo
->
binfo
.
inputTs
Order
==
TSDB_ORDER_ASC
);
TSKEY
curTs
=
tsCols
[
pos
];
...
...
@@ -388,7 +388,7 @@ static bool setTimeWindowInterpolationStartTs(SIntervalAggOperatorInfo* pInfo, i
static
bool
setTimeWindowInterpolationEndTs
(
SIntervalAggOperatorInfo
*
pInfo
,
SExprSupp
*
pSup
,
int32_t
endRowIndex
,
SArray
*
pDataBlock
,
const
TSKEY
*
tsCols
,
TSKEY
blockEkey
,
STimeWindow
*
win
)
{
int32_t
order
=
pInfo
->
input
Order
;
int32_t
order
=
pInfo
->
binfo
.
inputTs
Order
;
TSKEY
actualEndKey
=
tsCols
[
endRowIndex
];
TSKEY
key
=
(
order
==
TSDB_ORDER_ASC
)
?
win
->
ekey
:
win
->
skey
;
...
...
@@ -550,7 +550,7 @@ static void doWindowBorderInterpolation(SIntervalAggOperatorInfo* pInfo, SSDataB
if
(
!
done
)
{
int32_t
endRowIndex
=
startPos
+
forwardRows
-
1
;
TSKEY
endKey
=
(
pInfo
->
input
Order
==
TSDB_ORDER_ASC
)
?
pBlock
->
info
.
window
.
ekey
:
pBlock
->
info
.
window
.
skey
;
TSKEY
endKey
=
(
pInfo
->
binfo
.
inputTs
Order
==
TSDB_ORDER_ASC
)
?
pBlock
->
info
.
window
.
ekey
:
pBlock
->
info
.
window
.
skey
;
bool
interp
=
setTimeWindowInterpolationEndTs
(
pInfo
,
pSup
,
endRowIndex
,
pBlock
->
pDataBlock
,
tsCols
,
endKey
,
win
);
if
(
interp
)
{
setResultRowInterpo
(
pResult
,
RESULT_ROW_END_INTERP
);
...
...
@@ -888,12 +888,12 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
int32_t
numOfOutput
=
pSup
->
numOfExprs
;
int64_t
*
tsCols
=
extractTsCol
(
pBlock
,
pInfo
);
uint64_t
tableGroupId
=
pBlock
->
info
.
id
.
groupId
;
bool
ascScan
=
(
pInfo
->
input
Order
==
TSDB_ORDER_ASC
);
bool
ascScan
=
(
pInfo
->
binfo
.
inputTs
Order
==
TSDB_ORDER_ASC
);
TSKEY
ts
=
getStartTsKey
(
&
pBlock
->
info
.
window
,
tsCols
);
SResultRow
*
pResult
=
NULL
;
STimeWindow
win
=
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
input
Order
);
getActiveTimeWindow
(
pInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
ts
,
&
pInfo
->
interval
,
pInfo
->
binfo
.
inputTs
Order
);
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pSup
->
pCtx
,
numOfOutput
,
pSup
->
rowEntryInfoOffset
,
&
pInfo
->
aggSup
,
pTaskInfo
);
if
(
ret
!=
TSDB_CODE_SUCCESS
||
pResult
==
NULL
)
{
...
...
@@ -902,7 +902,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
TSKEY
ekey
=
ascScan
?
win
.
ekey
:
win
.
skey
;
int32_t
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
pInfo
->
input
Order
);
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
pInfo
->
binfo
.
inputTs
Order
);
// prev time window not interpolation yet.
if
(
pInfo
->
timeWindowInterpo
)
{
...
...
@@ -929,7 +929,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
STimeWindow
nextWin
=
win
;
while
(
1
)
{
int32_t
prevEndPos
=
forwardRows
-
1
+
startPos
;
startPos
=
getNextQualifiedWindow
(
&
pInfo
->
interval
,
&
nextWin
,
&
pBlock
->
info
,
tsCols
,
prevEndPos
,
pInfo
->
input
Order
);
startPos
=
getNextQualifiedWindow
(
&
pInfo
->
interval
,
&
nextWin
,
&
pBlock
->
info
,
tsCols
,
prevEndPos
,
pInfo
->
binfo
.
inputTs
Order
);
if
(
startPos
<
0
)
{
break
;
}
...
...
@@ -942,7 +942,7 @@ static void hashIntervalAgg(SOperatorInfo* pOperatorInfo, SResultRowInfo* pResul
ekey
=
ascScan
?
nextWin
.
ekey
:
nextWin
.
skey
;
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
pInfo
->
input
Order
);
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
pInfo
->
binfo
.
inputTs
Order
);
// window start(end) key interpolation
doWindowBorderInterpolation
(
pInfo
,
pBlock
,
pResult
,
&
nextWin
,
startPos
,
forwardRows
,
pSup
);
// TODO: add to open window? how to close the open windows after input blocks exhausted?
...
...
@@ -1035,7 +1035,7 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
break
;
}
getTableScanInfo
(
pOperator
,
&
pInfo
->
inputOrder
,
&
scanFlag
,
true
)
;
pInfo
->
binfo
.
pRes
->
info
.
scanFlag
=
scanFlag
=
pBlock
->
info
.
scanFlag
;
if
(
pInfo
->
scalarSupp
.
pExprInfo
!=
NULL
)
{
SExprSupp
*
pExprSup
=
&
pInfo
->
scalarSupp
;
...
...
@@ -1043,11 +1043,11 @@ static int32_t doOpenIntervalAgg(SOperatorInfo* pOperator) {
}
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pSup
,
pBlock
,
pInfo
->
input
Order
,
scanFlag
,
true
);
setInputDataBlock
(
pSup
,
pBlock
,
pInfo
->
binfo
.
inputTs
Order
,
scanFlag
,
true
);
hashIntervalAgg
(
pOperator
,
&
pInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
);
}
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
pInfo
->
resul
tTsOrder
);
initGroupedResultInfo
(
&
pInfo
->
groupResInfo
,
pInfo
->
aggSup
.
pResultRowHashTable
,
pInfo
->
binfo
.
outpu
tTsOrder
);
OPTR_SET_OPENED
(
pOperator
);
pOperator
->
cost
.
openCost
=
(
taosGetTimestampUs
()
-
st
)
/
1000
.
0
;
...
...
@@ -1161,7 +1161,7 @@ static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
SExecTaskInfo
*
pTaskInfo
=
pOperator
->
pTaskInfo
;
SExprSupp
*
pSup
=
&
pOperator
->
exprSupp
;
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
order
=
pInfo
->
binfo
.
inputTsOrder
;
int64_t
st
=
taosGetTimestampUs
();
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
...
...
@@ -1171,6 +1171,7 @@ static int32_t openStateWindowAggOptr(SOperatorInfo* pOperator) {
break
;
}
pInfo
->
binfo
.
pRes
->
info
.
scanFlag
=
pBlock
->
info
.
scanFlag
;
setInputDataBlock
(
pSup
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
tsSlotId
);
...
...
@@ -1653,8 +1654,8 @@ SOperatorInfo* createIntervalOperatorInfo(SOperatorInfo* downstream, SIntervalPh
};
pInfo
->
win
=
pTaskInfo
->
window
;
pInfo
->
inputOrder
=
(
pPhyNode
->
window
.
inputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pInfo
->
resultTsOrder
=
(
pPhyNode
->
window
.
outputTsOrder
==
ORDER_ASC
)
?
TSDB_ORDER_ASC
:
TSDB_ORDER_DESC
;
pInfo
->
binfo
.
inputTsOrder
=
pPhyNode
->
window
.
node
.
inputTsOrder
;
pInfo
->
binfo
.
outputTsOrder
=
pPhyNode
->
window
.
node
.
outputTsOrder
;
pInfo
->
interval
=
interval
;
pInfo
->
twAggSup
=
as
;
pInfo
->
binfo
.
mergeResultBlock
=
pPhyNode
->
window
.
mergeDataBlock
;
...
...
@@ -1805,7 +1806,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
}
int64_t
st
=
taosGetTimestampUs
();
int32_t
order
=
TSDB_ORDER_ASC
;
int32_t
order
=
pInfo
->
binfo
.
inputTsOrder
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
...
...
@@ -1815,6 +1816,7 @@ static SSDataBlock* doSessionWindowAgg(SOperatorInfo* pOperator) {
break
;
}
pBInfo
->
pRes
->
info
.
scanFlag
=
pBlock
->
info
.
scanFlag
;
// the pDataBlock are always the same one, no need to call this again
setInputDataBlock
(
pSup
,
pBlock
,
order
,
MAIN_SCAN
,
true
);
blockDataUpdateTsWindow
(
pBlock
,
pInfo
->
tsSlotId
);
...
...
@@ -1875,6 +1877,8 @@ SOperatorInfo* createStatewindowOperatorInfo(SOperatorInfo* downstream, SStateWi
if
(
pInfo
->
stateKey
.
pData
==
NULL
)
{
goto
_error
;
}
pInfo
->
binfo
.
inputTsOrder
=
pStateNode
->
window
.
node
.
inputTsOrder
;
pInfo
->
binfo
.
outputTsOrder
=
pStateNode
->
window
.
node
.
outputTsOrder
;
int32_t
code
=
filterInitFromNode
((
SNode
*
)
pStateNode
->
window
.
node
.
pConditions
,
&
pOperator
->
exprSupp
.
pFilterInfo
,
0
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
...
...
@@ -1973,6 +1977,8 @@ SOperatorInfo* createSessionAggOperatorInfo(SOperatorInfo* downstream, SSessionW
pInfo
->
binfo
.
pRes
=
pResBlock
;
pInfo
->
winSup
.
prevTs
=
INT64_MIN
;
pInfo
->
reptScan
=
false
;
pInfo
->
binfo
.
inputTsOrder
=
pSessionNode
->
window
.
node
.
inputTsOrder
;
pInfo
->
binfo
.
outputTsOrder
=
pSessionNode
->
window
.
node
.
outputTsOrder
;
code
=
filterInitFromNode
((
SNode
*
)
pSessionNode
->
window
.
node
.
pConditions
,
&
pOperator
->
exprSupp
.
pFilterInfo
,
0
);
if
(
code
!=
TSDB_CODE_SUCCESS
)
{
goto
_error
;
...
...
@@ -4497,7 +4503,6 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
SSDataBlock
*
pRes
=
pIaInfo
->
binfo
.
pRes
;
SResultRowInfo
*
pResultRowInfo
=
&
pIaInfo
->
binfo
.
resultRowInfo
;
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
int32_t
scanFlag
=
MAIN_SCAN
;
while
(
1
)
{
SSDataBlock
*
pBlock
=
NULL
;
...
...
@@ -4544,8 +4549,8 @@ static void doMergeAlignedIntervalAgg(SOperatorInfo* pOperator) {
}
}
getTableScanInfo
(
pOperator
,
&
pIaInfo
->
inputOrder
,
&
scanFlag
,
false
)
;
setInputDataBlock
(
pSup
,
pBlock
,
pIaInfo
->
inputOrder
,
scanFlag
,
true
);
pRes
->
info
.
scanFlag
=
pBlock
->
info
.
scanFlag
;
setInputDataBlock
(
pSup
,
pBlock
,
pIaInfo
->
binfo
.
inputTsOrder
,
pBlock
->
info
.
scanFlag
,
true
);
doMergeAlignedIntervalAggImpl
(
pOperator
,
&
pIaInfo
->
binfo
.
resultRowInfo
,
pBlock
,
pRes
);
doFilter
(
pRes
,
pOperator
->
exprSupp
.
pFilterInfo
,
NULL
);
...
...
@@ -4618,7 +4623,8 @@ SOperatorInfo* createMergeAlignedIntervalOperatorInfo(SOperatorInfo* downstream,
miaInfo
->
curTs
=
INT64_MIN
;
iaInfo
->
win
=
pTaskInfo
->
window
;
iaInfo
->
inputOrder
=
TSDB_ORDER_ASC
;
iaInfo
->
binfo
.
inputTsOrder
=
pNode
->
window
.
node
.
inputTsOrder
;
iaInfo
->
binfo
.
outputTsOrder
=
pNode
->
window
.
node
.
outputTsOrder
;
iaInfo
->
interval
=
interval
;
iaInfo
->
primaryTsIndex
=
((
SColumnNode
*
)
pNode
->
window
.
pTspk
)
->
slotId
;
iaInfo
->
binfo
.
mergeResultBlock
=
pNode
->
window
.
mergeDataBlock
;
...
...
@@ -4695,7 +4701,7 @@ static int32_t outputPrevIntervalResult(SOperatorInfo* pOperatorInfo, uint64_t t
STimeWindow
*
newWin
)
{
SMergeIntervalAggOperatorInfo
*
miaInfo
=
pOperatorInfo
->
info
;
SIntervalAggOperatorInfo
*
iaInfo
=
&
miaInfo
->
intervalAggOperatorInfo
;
bool
ascScan
=
(
iaInfo
->
input
Order
==
TSDB_ORDER_ASC
);
bool
ascScan
=
(
iaInfo
->
binfo
.
inputTs
Order
==
TSDB_ORDER_ASC
);
SGroupTimeWindow
groupTimeWindow
=
{.
groupId
=
tableGroupId
,
.
window
=
*
newWin
};
tdListAppend
(
miaInfo
->
groupIntervals
,
&
groupTimeWindow
);
...
...
@@ -4730,12 +4736,12 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
int32_t
numOfOutput
=
pExprSup
->
numOfExprs
;
int64_t
*
tsCols
=
extractTsCol
(
pBlock
,
iaInfo
);
uint64_t
tableGroupId
=
pBlock
->
info
.
id
.
groupId
;
bool
ascScan
=
(
iaInfo
->
input
Order
==
TSDB_ORDER_ASC
);
bool
ascScan
=
(
iaInfo
->
binfo
.
inputTs
Order
==
TSDB_ORDER_ASC
);
TSKEY
blockStartTs
=
getStartTsKey
(
&
pBlock
->
info
.
window
,
tsCols
);
SResultRow
*
pResult
=
NULL
;
STimeWindow
win
=
getActiveTimeWindow
(
iaInfo
->
aggSup
.
pResultBuf
,
pResultRowInfo
,
blockStartTs
,
&
iaInfo
->
interval
,
iaInfo
->
input
Order
);
iaInfo
->
binfo
.
inputTs
Order
);
int32_t
ret
=
setTimeWindowOutputBuf
(
pResultRowInfo
,
&
win
,
(
scanFlag
==
MAIN_SCAN
),
&
pResult
,
tableGroupId
,
pExprSup
->
pCtx
,
...
...
@@ -4746,7 +4752,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
TSKEY
ekey
=
ascScan
?
win
.
ekey
:
win
.
skey
;
int32_t
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
iaInfo
->
input
Order
);
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
iaInfo
->
binfo
.
inputTs
Order
);
ASSERT
(
forwardRows
>
0
);
// prev time window not interpolation yet.
...
...
@@ -4777,7 +4783,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
while
(
1
)
{
int32_t
prevEndPos
=
forwardRows
-
1
+
startPos
;
startPos
=
getNextQualifiedWindow
(
&
iaInfo
->
interval
,
&
nextWin
,
&
pBlock
->
info
,
tsCols
,
prevEndPos
,
iaInfo
->
input
Order
);
getNextQualifiedWindow
(
&
iaInfo
->
interval
,
&
nextWin
,
&
pBlock
->
info
,
tsCols
,
prevEndPos
,
iaInfo
->
binfo
.
inputTs
Order
);
if
(
startPos
<
0
)
{
break
;
}
...
...
@@ -4792,7 +4798,7 @@ static void doMergeIntervalAggImpl(SOperatorInfo* pOperatorInfo, SResultRowInfo*
ekey
=
ascScan
?
nextWin
.
ekey
:
nextWin
.
skey
;
forwardRows
=
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
iaInfo
->
input
Order
);
getNumOfRowsInTimeWindow
(
&
pBlock
->
info
,
tsCols
,
startPos
,
ekey
,
binarySearchForKey
,
NULL
,
iaInfo
->
binfo
.
inputTs
Order
);
// window start(end) key interpolation
doWindowBorderInterpolation
(
iaInfo
,
pBlock
,
pResult
,
&
nextWin
,
startPos
,
forwardRows
,
pExprSup
);
...
...
@@ -4828,7 +4834,6 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
if
(
!
miaInfo
->
inputBlocksFinished
)
{
SOperatorInfo
*
downstream
=
pOperator
->
pDownstream
[
0
];
int32_t
scanFlag
=
MAIN_SCAN
;
while
(
1
)
{
SSDataBlock
*
pBlock
=
NULL
;
if
(
miaInfo
->
prefetchedBlock
==
NULL
)
{
...
...
@@ -4853,9 +4858,9 @@ static SSDataBlock* doMergeIntervalAgg(SOperatorInfo* pOperator) {
break
;
}
getTableScanInfo
(
pOperator
,
&
iaInfo
->
inputOrder
,
&
scanFlag
,
false
)
;
setInputDataBlock
(
pExpSupp
,
pBlock
,
iaInfo
->
inputOrder
,
scanFlag
,
true
);
doMergeIntervalAggImpl
(
pOperator
,
&
iaInfo
->
binfo
.
resultRowInfo
,
pBlock
,
scanFlag
,
pRes
);
pRes
->
info
.
scanFlag
=
pBlock
->
info
.
scanFlag
;
setInputDataBlock
(
pExpSupp
,
pBlock
,
iaInfo
->
binfo
.
inputTsOrder
,
pBlock
->
info
.
scanFlag
,
true
);
doMergeIntervalAggImpl
(
pOperator
,
&
iaInfo
->
binfo
.
resultRowInfo
,
pBlock
,
pBlock
->
info
.
scanFlag
,
pRes
);
if
(
pRes
->
info
.
rows
>=
pOperator
->
resultInfo
.
threshold
)
{
break
;
...
...
@@ -4905,10 +4910,11 @@ SOperatorInfo* createMergeIntervalOperatorInfo(SOperatorInfo* downstream, SMerge
SIntervalAggOperatorInfo
*
pIntervalInfo
=
&
pMergeIntervalInfo
->
intervalAggOperatorInfo
;
pIntervalInfo
->
win
=
pTaskInfo
->
window
;
pIntervalInfo
->
inputOrder
=
TSDB_ORDER_ASC
;
pIntervalInfo
->
binfo
.
inputTsOrder
=
pIntervalPhyNode
->
window
.
node
.
inputTsOrder
;
pIntervalInfo
->
interval
=
interval
;
pIntervalInfo
->
binfo
.
mergeResultBlock
=
pIntervalPhyNode
->
window
.
mergeDataBlock
;
pIntervalInfo
->
primaryTsIndex
=
((
SColumnNode
*
)
pIntervalPhyNode
->
window
.
pTspk
)
->
slotId
;
pIntervalInfo
->
binfo
.
outputTsOrder
=
pIntervalPhyNode
->
window
.
node
.
outputTsOrder
;
SExprSupp
*
pExprSupp
=
&
pOperator
->
exprSupp
;
...
...
source/libs/executor/src/tsort.c
浏览文件 @
ceb78fda
...
...
@@ -887,6 +887,7 @@ void* tsortGetValue(STupleHandle* pVHandle, int32_t colIndex) {
}
uint64_t
tsortGetGroupId
(
STupleHandle
*
pVHandle
)
{
return
pVHandle
->
pBlock
->
info
.
id
.
groupId
;
}
void
*
tsortGetBlockInfo
(
STupleHandle
*
pVHandle
)
{
return
&
pVHandle
->
pBlock
->
info
;
}
SSortExecInfo
tsortGetSortExecInfo
(
SSortHandle
*
pHandle
)
{
SSortExecInfo
info
=
{
0
};
...
...
source/libs/nodes/src/nodesCloneFuncs.c
浏览文件 @
ceb78fda
...
...
@@ -110,6 +110,7 @@ static int32_t columnNodeCopy(const SColumnNode* pSrc, SColumnNode* pDst) {
COPY_SCALAR_FIELD
(
tableId
);
COPY_SCALAR_FIELD
(
tableType
);
COPY_SCALAR_FIELD
(
colId
);
COPY_SCALAR_FIELD
(
projIdx
);
COPY_SCALAR_FIELD
(
colType
);
COPY_SCALAR_FIELD
(
hasIndex
);
COPY_CHAR_ARRAY_FIELD
(
dbName
);
...
...
@@ -358,6 +359,8 @@ static int32_t logicNodeCopy(const SLogicNode* pSrc, SLogicNode* pDst) {
COPY_SCALAR_FIELD
(
requireDataOrder
);
COPY_SCALAR_FIELD
(
resultDataOrder
);
COPY_SCALAR_FIELD
(
groupAction
);
COPY_SCALAR_FIELD
(
inputTsOrder
);
COPY_SCALAR_FIELD
(
outputTsOrder
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -404,7 +407,6 @@ static int32_t logicJoinCopy(const SJoinLogicNode* pSrc, SJoinLogicNode* pDst) {
CLONE_NODE_FIELD
(
pOnConditions
);
CLONE_NODE_FIELD
(
pColEqualOnConditions
);
COPY_SCALAR_FIELD
(
isSingleTableJoin
);
COPY_SCALAR_FIELD
(
inputTsOrder
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -482,8 +484,6 @@ static int32_t logicWindowCopy(const SWindowLogicNode* pSrc, SWindowLogicNode* p
COPY_SCALAR_FIELD
(
igExpired
);
COPY_SCALAR_FIELD
(
igCheckUpdate
);
COPY_SCALAR_FIELD
(
windowAlgo
);
COPY_SCALAR_FIELD
(
inputTsOrder
);
COPY_SCALAR_FIELD
(
outputTsOrder
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -495,7 +495,6 @@ static int32_t logicFillCopy(const SFillLogicNode* pSrc, SFillLogicNode* pDst) {
CLONE_NODE_FIELD
(
pWStartTs
);
CLONE_NODE_FIELD
(
pValues
);
COPY_OBJECT_FIELD
(
timeRange
,
sizeof
(
STimeWindow
));
COPY_SCALAR_FIELD
(
inputTsOrder
);
return
TSDB_CODE_SUCCESS
;
}
...
...
@@ -544,6 +543,8 @@ static int32_t physiNodeCopy(const SPhysiNode* pSrc, SPhysiNode* pDst) {
CLONE_NODE_FIELD_EX
(
pOutputDataBlockDesc
,
SDataBlockDescNode
*
);
CLONE_NODE_FIELD
(
pConditions
);
CLONE_NODE_LIST_FIELD
(
pChildren
);
COPY_SCALAR_FIELD
(
inputTsOrder
);
COPY_SCALAR_FIELD
(
outputTsOrder
);
return
TSDB_CODE_SUCCESS
;
}
...
...
source/libs/nodes/src/nodesCodeFuncs.c
浏览文件 @
ceb78fda
...
...
@@ -1904,9 +1904,6 @@ static int32_t physiJoinNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkJoinPhysiPlanJoinType
,
pNode
->
joinType
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkJoinPhysiPlanInputTsOrder
,
pNode
->
inputTsOrder
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddObject
(
pJson
,
jkJoinPhysiPlanMergeCondition
,
nodeToJson
,
pNode
->
pMergeCondition
);
}
...
...
@@ -1929,9 +1926,6 @@ static int32_t jsonToPhysiJoinNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkJoinPhysiPlanJoinType
,
pNode
->
joinType
,
code
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkJoinPhysiPlanInputTsOrder
,
pNode
->
inputTsOrder
,
code
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
jsonToNodeObject
(
pJson
,
jkJoinPhysiPlanOnConditions
,
&
pNode
->
pOnConditions
);
}
...
...
@@ -2150,7 +2144,6 @@ static const char* jkWindowPhysiPlanWatermark = "Watermark";
static
const
char
*
jkWindowPhysiPlanDeleteMark
=
"DeleteMark"
;
static
const
char
*
jkWindowPhysiPlanIgnoreExpired
=
"IgnoreExpired"
;
static
const
char
*
jkWindowPhysiPlanInputTsOrder
=
"InputTsOrder"
;
static
const
char
*
jkWindowPhysiPlanOutputTsOrder
=
"outputTsOrder"
;
static
const
char
*
jkWindowPhysiPlanMergeDataBlock
=
"MergeDataBlock"
;
static
int32_t
physiWindowNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
...
...
@@ -2181,12 +2174,6 @@ static int32_t physiWindowNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkWindowPhysiPlanIgnoreExpired
,
pNode
->
igExpired
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkWindowPhysiPlanInputTsOrder
,
pNode
->
inputTsOrder
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkWindowPhysiPlanOutputTsOrder
,
pNode
->
outputTsOrder
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddBoolToObject
(
pJson
,
jkWindowPhysiPlanMergeDataBlock
,
pNode
->
mergeDataBlock
);
}
...
...
@@ -2222,12 +2209,6 @@ static int32_t jsonToPhysiWindowNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetTinyIntValue
(
pJson
,
jkWindowPhysiPlanIgnoreExpired
,
&
pNode
->
igExpired
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkWindowPhysiPlanInputTsOrder
,
pNode
->
inputTsOrder
,
code
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkWindowPhysiPlanOutputTsOrder
,
pNode
->
outputTsOrder
,
code
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBoolValue
(
pJson
,
jkWindowPhysiPlanMergeDataBlock
,
&
pNode
->
mergeDataBlock
);
}
...
...
@@ -2294,7 +2275,6 @@ static const char* jkFillPhysiPlanWStartTs = "WStartTs";
static
const
char
*
jkFillPhysiPlanValues
=
"Values"
;
static
const
char
*
jkFillPhysiPlanStartTime
=
"StartTime"
;
static
const
char
*
jkFillPhysiPlanEndTime
=
"EndTime"
;
static
const
char
*
jkFillPhysiPlanInputTsOrder
=
"inputTsOrder"
;
static
int32_t
physiFillNodeToJson
(
const
void
*
pObj
,
SJson
*
pJson
)
{
const
SFillPhysiNode
*
pNode
=
(
const
SFillPhysiNode
*
)
pObj
;
...
...
@@ -2321,9 +2301,6 @@ static int32_t physiFillNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkFillPhysiPlanEndTime
,
pNode
->
timeRange
.
ekey
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkFillPhysiPlanInputTsOrder
,
pNode
->
inputTsOrder
);
}
return
code
;
}
...
...
@@ -2353,9 +2330,6 @@ static int32_t jsonToPhysiFillNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetBigIntValue
(
pJson
,
jkFillPhysiPlanEndTime
,
&
pNode
->
timeRange
.
ekey
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkFillPhysiPlanInputTsOrder
,
pNode
->
inputTsOrder
,
code
);
}
return
code
;
}
...
...
@@ -3053,6 +3027,7 @@ static const char* jkColumnTableId = "TableId";
static
const
char
*
jkColumnTableType
=
"TableType"
;
static
const
char
*
jkColumnColId
=
"ColId"
;
static
const
char
*
jkColumnColType
=
"ColType"
;
static
const
char
*
jkColumnProjId
=
"ProjId"
;
static
const
char
*
jkColumnDbName
=
"DbName"
;
static
const
char
*
jkColumnTableName
=
"TableName"
;
static
const
char
*
jkColumnTableAlias
=
"TableAlias"
;
...
...
@@ -3073,6 +3048,9 @@ static int32_t columnNodeToJson(const void* pObj, SJson* pJson) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkColumnColId
,
pNode
->
colId
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkColumnProjId
,
pNode
->
projIdx
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonAddIntegerToObject
(
pJson
,
jkColumnColType
,
pNode
->
colType
);
}
...
...
@@ -3111,6 +3089,9 @@ static int32_t jsonToColumnNode(const SJson* pJson, void* pObj) {
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetSmallIntValue
(
pJson
,
jkColumnColId
,
&
pNode
->
colId
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
tjsonGetSmallIntValue
(
pJson
,
jkColumnProjId
,
&
pNode
->
projIdx
);
}
if
(
TSDB_CODE_SUCCESS
==
code
)
{
tjsonGetNumberValue
(
pJson
,
jkColumnColType
,
pNode
->
colType
,
code
);
}
...
...
source/libs/nodes/src/nodesMsgFuncs.c
浏览文件 @
ceb78fda
此差异已折叠。
点击以展开。
source/libs/parser/src/parTranslater.c
浏览文件 @
ceb78fda
...
...
@@ -3075,13 +3075,13 @@ static bool needFill(SNode* pNode) {
static
int32_t
convertFillValue
(
STranslateContext
*
pCxt
,
SDataType
dt
,
SNodeList
*
pValues
,
int32_t
index
)
{
SListCell
*
pCell
=
nodesListGetCell
(
pValues
,
index
);
if
(
dataTypeEqual
(
&
dt
,
&
((
SExprNode
*
)
pCell
->
pNode
)
->
resType
))
{
if
(
dataTypeEqual
(
&
dt
,
&
((
SExprNode
*
)
pCell
->
pNode
)
->
resType
)
&&
(
QUERY_NODE_VALUE
==
nodeType
(
pCell
->
pNode
))
)
{
return
TSDB_CODE_SUCCESS
;
}
SNode
*
pCas
e
Func
=
NULL
;
int32_t
code
=
createCastFunc
(
pCxt
,
pCell
->
pNode
,
dt
,
&
pCas
e
Func
);
SNode
*
pCas
t
Func
=
NULL
;
int32_t
code
=
createCastFunc
(
pCxt
,
pCell
->
pNode
,
dt
,
&
pCas
t
Func
);
if
(
TSDB_CODE_SUCCESS
==
code
)
{
code
=
scalarCalculateConstants
(
pCas
e
Func
,
&
pCell
->
pNode
);
code
=
scalarCalculateConstants
(
pCas
t
Func
,
&
pCell
->
pNode
);
}
if
(
TSDB_CODE_SUCCESS
==
code
&&
QUERY_NODE_VALUE
!=
nodeType
(
pCell
->
pNode
))
{
code
=
generateSyntaxErrMsgExt
(
&
pCxt
->
msgBuf
,
TSDB_CODE_PAR_WRONG_VALUE_TYPE
,
"Fill value can only accept constant"
);
...
...
source/libs/planner/src/planLogicCreater.c
浏览文件 @
ceb78fda
此差异已折叠。
点击以展开。
source/libs/planner/src/planOptimizer.c
浏览文件 @
ceb78fda
此差异已折叠。
点击以展开。
source/libs/planner/src/planPhysiCreater.c
浏览文件 @
ceb78fda
此差异已折叠。
点击以展开。
source/libs/planner/src/planSpliter.c
浏览文件 @
ceb78fda
此差异已折叠。
点击以展开。
source/libs/scheduler/src/schTask.c
浏览文件 @
ceb78fda
此差异已折叠。
点击以展开。
source/libs/stream/CMakeLists.txt
浏览文件 @
ceb78fda
此差异已折叠。
点击以展开。
source/util/src/terror.c
浏览文件 @
ceb78fda
此差异已折叠。
点击以展开。
tests/parallel_test/cases.task
浏览文件 @
ceb78fda
此差异已折叠。
点击以展开。
tests/script/tsim/query/explain_tsorder.sim
0 → 100644
浏览文件 @
ceb78fda
此差异已折叠。
点击以展开。
tests/script/tsim/query/r/explain_tsorder.result
0 → 100644
浏览文件 @
ceb78fda
此差异已折叠。
点击以展开。
tests/script/tsim/query/t/explain_tsorder.sql
0 → 100644
浏览文件 @
ceb78fda
此差异已折叠。
点击以展开。
tests/script/tsim/sma/tsmaCreateInsertQuery.sim
浏览文件 @
ceb78fda
此差异已折叠。
点击以展开。
tests/system-test/0-others/splitVGroup.py
0 → 100644
浏览文件 @
ceb78fda
此差异已折叠。
点击以展开。
tests/system-test/2-query/interp.py
浏览文件 @
ceb78fda
此差异已折叠。
点击以展开。
utils/tsim/src/simExe.c
浏览文件 @
ceb78fda
此差异已折叠。
点击以展开。
编辑
预览
Markdown
is supported
0%
请重试
或
添加新附件
.
添加附件
取消
You are about to add
0
people
to the discussion. Proceed with caution.
先完成此消息的编辑!
取消
想要评论请
注册
或
登录