提交 04ab8e19 编写于 作者: Y yihaoDeng

Merge branch 'enh/dev30' into enh/triggerCheckPoint2

...@@ -16,7 +16,6 @@ debug/ ...@@ -16,7 +16,6 @@ debug/
release/ release/
target/ target/
debs/ debs/
deps/
rpms/ rpms/
mac/ mac/
*.pyc *.pyc
......
cmake_minimum_required(VERSION 3.0) cmake_minimum_required(VERSION 3.0)
set(CMAKE_VERBOSE_MAKEFILE OFF) set(CMAKE_VERBOSE_MAKEFILE ON)
set(TD_BUILD_TAOSA_INTERNAL FALSE) set(TD_BUILD_TAOSA_INTERNAL FALSE)
#set output directory #set output directory
......
...@@ -172,5 +172,15 @@ ENDIF() ...@@ -172,5 +172,15 @@ ENDIF()
MESSAGE(STATUS "Platform arch:" ${PLATFORM_ARCH_STR}) MESSAGE(STATUS "Platform arch:" ${PLATFORM_ARCH_STR})
set(TD_DEPS_DIR "x86")
if (TD_LINUX)
IF (TD_ARM_64 OR TD_ARM_32)
set(TD_DEPS_DIR "arm")
ELSE()
set(TD_DEPS_DIR "x86")
ENDIF()
endif()
MESSAGE(STATUS "DEPS_DIR: " ${TD_DEPS_DIR})
MESSAGE("C Compiler: ${CMAKE_C_COMPILER} (${CMAKE_C_COMPILER_ID}, ${CMAKE_C_COMPILER_VERSION})") MESSAGE("C Compiler: ${CMAKE_C_COMPILER} (${CMAKE_C_COMPILER_ID}, ${CMAKE_C_COMPILER_VERSION})")
MESSAGE("CXX Compiler: ${CMAKE_CXX_COMPILER} (${CMAKE_C_COMPILER_ID}, ${CMAKE_CXX_COMPILER_VERSION})") MESSAGE("CXX Compiler: ${CMAKE_CXX_COMPILER} (${CMAKE_C_COMPILER_ID}, ${CMAKE_CXX_COMPILER_VERSION})")
...@@ -2,7 +2,7 @@ ...@@ -2,7 +2,7 @@
IF (DEFINED VERNUMBER) IF (DEFINED VERNUMBER)
SET(TD_VER_NUMBER ${VERNUMBER}) SET(TD_VER_NUMBER ${VERNUMBER})
ELSE () ELSE ()
SET(TD_VER_NUMBER "3.0.5.0") SET(TD_VER_NUMBER "3.0.6.0.alpha")
ENDIF () ENDIF ()
IF (DEFINED VERCOMPATIBLE) IF (DEFINED VERCOMPATIBLE)
......
# rocksdb # rocksdb
IF (NOT ${TD_LINUX})
ExternalProject_Add(rocksdb ExternalProject_Add(rocksdb
URL https://github.com/facebook/rocksdb/archive/refs/tags/v8.1.1.tar.gz URL https://github.com/facebook/rocksdb/archive/refs/tags/v8.1.1.tar.gz
URL_HASH MD5=3b4c97ee45df9c8a5517308d31ab008b URL_HASH MD5=3b4c97ee45df9c8a5517308d31ab008b
...@@ -11,3 +12,5 @@ ExternalProject_Add(rocksdb ...@@ -11,3 +12,5 @@ ExternalProject_Add(rocksdb
INSTALL_COMMAND "" INSTALL_COMMAND ""
TEST_COMMAND "" TEST_COMMAND ""
) )
ENDIF(NOT ${TD_LINUX})
...@@ -78,10 +78,18 @@ if(${BUILD_WITH_LEVELDB}) ...@@ -78,10 +78,18 @@ if(${BUILD_WITH_LEVELDB})
endif(${BUILD_WITH_LEVELDB}) endif(${BUILD_WITH_LEVELDB})
# rocksdb # rocksdb
IF (NOT ${TD_LINUX})
if(${BUILD_WITH_ROCKSDB}) if(${BUILD_WITH_ROCKSDB})
cat("${TD_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
add_definitions(-DUSE_ROCKSDB) add_definitions(-DUSE_ROCKSDB)
endif(${BUILD_WITH_ROCKSDB}) endif(${BUILD_WITH_ROCKSDB})
ELSE()
if(${BUILD_WITH_ROCKSDB})
#cat("${TD_SUPPORT_DIR}/rocksdb_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
add_definitions(-DUSE_ROCKSDB)
endif(${BUILD_WITH_ROCKSDB})
ENDIF(NOT ${TD_LINUX})
# canonical-raft # canonical-raft
if(${BUILD_WITH_CRAFT}) if(${BUILD_WITH_CRAFT})
...@@ -175,8 +183,8 @@ if(${BUILD_TEST}) ...@@ -175,8 +183,8 @@ if(${BUILD_TEST})
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cpp-stub/src_darwin> PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/cpp-stub/src_darwin>
) )
endif(${TD_DARWIN}) endif(${TD_DARWIN})
endif(${BUILD_TEST}) endif(${BUILD_TEST})
# cJson # cJson
...@@ -227,6 +235,8 @@ endif(${BUILD_WITH_LEVELDB}) ...@@ -227,6 +235,8 @@ endif(${BUILD_WITH_LEVELDB})
# rocksdb # rocksdb
# To support rocksdb build on ubuntu: sudo apt-get install libgflags-dev # To support rocksdb build on ubuntu: sudo apt-get install libgflags-dev
IF (NOT ${TD_LINUX})
if(${BUILD_WITH_ROCKSDB}) if(${BUILD_WITH_ROCKSDB})
if(${TD_LINUX}) if(${TD_LINUX})
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS_REL} -Wno-error=maybe-uninitialized -Wno-error=unused-but-set-variable -Wno-error=unused-variable -Wno-error=unused-function -Wno-errno=unused-private-field -Wno-error=unused-result") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS_REL} -Wno-error=maybe-uninitialized -Wno-error=unused-but-set-variable -Wno-error=unused-variable -Wno-error=unused-function -Wno-errno=unused-private-field -Wno-error=unused-result")
...@@ -245,7 +255,7 @@ if(${BUILD_WITH_ROCKSDB}) ...@@ -245,7 +255,7 @@ if(${BUILD_WITH_ROCKSDB})
endif(${TD_DARWIN_ARM64}) endif(${TD_DARWIN_ARM64})
if (${TD_WINDOWS}) if (${TD_WINDOWS})
SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4244 /wd4819") SET(CMAKE_CXX_FLAGS "${CMAKE_CXX_FLAGS} /wd4244 /wd4819")
endif(${TD_WINDOWS}) endif(${TD_WINDOWS})
...@@ -263,8 +273,8 @@ if(${BUILD_WITH_ROCKSDB}) ...@@ -263,8 +273,8 @@ if(${BUILD_WITH_ROCKSDB})
option(WITH_MD_LIBRARY "build with MD" OFF) option(WITH_MD_LIBRARY "build with MD" OFF)
set(SYSTEM_LIBS ${SYSTEM_LIBS} shlwapi.lib rpcrt4.lib) set(SYSTEM_LIBS ${SYSTEM_LIBS} shlwapi.lib rpcrt4.lib)
endif(${TD_WINDOWS}) endif(${TD_WINDOWS})
option(WITH_FALLOCATE "" OFF) option(WITH_FALLOCATE "" OFF)
option(WITH_JEMALLOC "" OFF) option(WITH_JEMALLOC "" OFF)
option(WITH_GFLAGS "" OFF) option(WITH_GFLAGS "" OFF)
...@@ -276,7 +286,7 @@ if(${BUILD_WITH_ROCKSDB}) ...@@ -276,7 +286,7 @@ if(${BUILD_WITH_ROCKSDB})
option(WITH_BENCHMARK_TOOLS "" OFF) option(WITH_BENCHMARK_TOOLS "" OFF)
option(WITH_TOOLS "" OFF) option(WITH_TOOLS "" OFF)
option(WITH_LIBURING "" OFF) option(WITH_LIBURING "" OFF)
IF (TD_LINUX) IF (TD_LINUX)
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF) option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF)
ELSE() ELSE()
option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF) option(ROCKSDB_BUILD_SHARED "Build shared versions of the RocksDB libraries" OFF)
...@@ -288,16 +298,17 @@ if(${BUILD_WITH_ROCKSDB}) ...@@ -288,16 +298,17 @@ if(${BUILD_WITH_ROCKSDB})
) )
endif(${BUILD_WITH_ROCKSDB}) endif(${BUILD_WITH_ROCKSDB})
ENDIF(NOT ${TD_LINUX})
# lucene # lucene
# To support build on ubuntu: sudo apt-get install libboost-all-dev # To support build on ubuntu: sudo apt-get install libboost-all-dev
if(${BUILD_WITH_LUCENE}) if(${BUILD_WITH_LUCENE})
option(ENABLE_TEST "Enable the tests" OFF) option(ENABLE_TEST "Enable the tests" OFF)
add_subdirectory(lucene EXCLUDE_FROM_ALL) add_subdirectory(lucene EXCLUDE_FROM_ALL)
target_include_directories( target_include_directories(
lucene++ lucene++
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/lucene/include> PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/lucene/include>
) )
endif(${BUILD_WITH_LUCENE}) endif(${BUILD_WITH_LUCENE})
# NuRaft # NuRaft
...@@ -357,7 +368,7 @@ if(${BUILD_MSVCREGEX}) ...@@ -357,7 +368,7 @@ if(${BUILD_MSVCREGEX})
target_include_directories(msvcregex target_include_directories(msvcregex
PRIVATE "msvcregex" PRIVATE "msvcregex"
) )
target_link_libraries(msvcregex target_link_libraries(msvcregex
INTERFACE Shell32 INTERFACE Shell32
) )
SET_TARGET_PROPERTIES(msvcregex PROPERTIES OUTPUT_NAME msvcregex) SET_TARGET_PROPERTIES(msvcregex PROPERTIES OUTPUT_NAME msvcregex)
...@@ -417,8 +428,8 @@ if(${BUILD_WITH_BDB}) ...@@ -417,8 +428,8 @@ if(${BUILD_WITH_BDB})
IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/bdb/libdb.a" IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/bdb/libdb.a"
INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/bdb" INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/bdb"
) )
target_link_libraries(bdb target_link_libraries(bdb
INTERFACE pthread INTERFACE pthread
) )
endif(${BUILD_WITH_BDB}) endif(${BUILD_WITH_BDB})
...@@ -430,12 +441,12 @@ if(${BUILD_WITH_SQLITE}) ...@@ -430,12 +441,12 @@ if(${BUILD_WITH_SQLITE})
IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/sqlite/.libs/libsqlite3.a" IMPORTED_LOCATION "${CMAKE_CURRENT_SOURCE_DIR}/sqlite/.libs/libsqlite3.a"
INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/sqlite" INTERFACE_INCLUDE_DIRECTORIES "${CMAKE_CURRENT_SOURCE_DIR}/sqlite"
) )
target_link_libraries(sqlite target_link_libraries(sqlite
INTERFACE m INTERFACE m
INTERFACE pthread INTERFACE pthread
) )
if(NOT TD_WINDOWS) if(NOT TD_WINDOWS)
target_link_libraries(sqlite target_link_libraries(sqlite
INTERFACE dl INTERFACE dl
) )
endif(NOT TD_WINDOWS) endif(NOT TD_WINDOWS)
...@@ -443,22 +454,22 @@ endif(${BUILD_WITH_SQLITE}) ...@@ -443,22 +454,22 @@ endif(${BUILD_WITH_SQLITE})
# addr2line # addr2line
if(${BUILD_ADDR2LINE}) if(${BUILD_ADDR2LINE})
if(NOT ${TD_WINDOWS}) if(NOT ${TD_WINDOWS})
check_include_file( "sys/types.h" HAVE_SYS_TYPES_H) check_include_file( "sys/types.h" HAVE_SYS_TYPES_H)
check_include_file( "sys/stat.h" HAVE_SYS_STAT_H ) check_include_file( "sys/stat.h" HAVE_SYS_STAT_H )
check_include_file( "inttypes.h" HAVE_INTTYPES_H ) check_include_file( "inttypes.h" HAVE_INTTYPES_H )
check_include_file( "stddef.h" HAVE_STDDEF_H ) check_include_file( "stddef.h" HAVE_STDDEF_H )
check_include_file( "stdlib.h" HAVE_STDLIB_H ) check_include_file( "stdlib.h" HAVE_STDLIB_H )
check_include_file( "string.h" HAVE_STRING_H ) check_include_file( "string.h" HAVE_STRING_H )
check_include_file( "memory.h" HAVE_MEMORY_H ) check_include_file( "memory.h" HAVE_MEMORY_H )
check_include_file( "strings.h" HAVE_STRINGS_H ) check_include_file( "strings.h" HAVE_STRINGS_H )
check_include_file( "stdint.h" HAVE_STDINT_H ) check_include_file( "stdint.h" HAVE_STDINT_H )
check_include_file( "unistd.h" HAVE_UNISTD_H ) check_include_file( "unistd.h" HAVE_UNISTD_H )
check_include_file( "sgidefs.h" HAVE_SGIDEFS_H ) check_include_file( "sgidefs.h" HAVE_SGIDEFS_H )
check_include_file( "stdafx.h" HAVE_STDAFX_H ) check_include_file( "stdafx.h" HAVE_STDAFX_H )
check_include_file( "elf.h" HAVE_ELF_H ) check_include_file( "elf.h" HAVE_ELF_H )
check_include_file( "libelf.h" HAVE_LIBELF_H ) check_include_file( "libelf.h" HAVE_LIBELF_H )
check_include_file( "libelf/libelf.h" HAVE_LIBELF_LIBELF_H) check_include_file( "libelf/libelf.h" HAVE_LIBELF_LIBELF_H)
check_include_file( "alloca.h" HAVE_ALLOCA_H ) check_include_file( "alloca.h" HAVE_ALLOCA_H )
check_include_file( "elfaccess.h" HAVE_ELFACCESS_H) check_include_file( "elfaccess.h" HAVE_ELFACCESS_H)
check_include_file( "sys/elf_386.h" HAVE_SYS_ELF_386_H ) check_include_file( "sys/elf_386.h" HAVE_SYS_ELF_386_H )
...@@ -466,7 +477,7 @@ if(${BUILD_ADDR2LINE}) ...@@ -466,7 +477,7 @@ if(${BUILD_ADDR2LINE})
check_include_file( "sys/elf_sparc.h" HAVE_SYS_ELF_SPARC_H) check_include_file( "sys/elf_sparc.h" HAVE_SYS_ELF_SPARC_H)
check_include_file( "sys/ia64/elf.h" HAVE_SYS_IA64_ELF_H ) check_include_file( "sys/ia64/elf.h" HAVE_SYS_IA64_ELF_H )
set(VERSION 0.3.1) set(VERSION 0.3.1)
set(PACKAGE_VERSION "\"${VERSION}\"") set(PACKAGE_VERSION "\"${VERSION}\"")
configure_file(libdwarf/cmake/config.h.cmake config.h) configure_file(libdwarf/cmake/config.h.cmake config.h)
file(GLOB_RECURSE LIBDWARF_SOURCES "libdwarf/src/lib/libdwarf/*.c") file(GLOB_RECURSE LIBDWARF_SOURCES "libdwarf/src/lib/libdwarf/*.c")
add_library(libdwarf STATIC ${LIBDWARF_SOURCES}) add_library(libdwarf STATIC ${LIBDWARF_SOURCES})
...@@ -497,6 +508,7 @@ if(${BUILD_GEOS}) ...@@ -497,6 +508,7 @@ if(${BUILD_GEOS})
endif(${TD_LINUX}) endif(${TD_LINUX})
option(BUILD_SHARED_LIBS "Build GEOS with shared libraries" OFF) option(BUILD_SHARED_LIBS "Build GEOS with shared libraries" OFF)
add_subdirectory(geos EXCLUDE_FROM_ALL) add_subdirectory(geos EXCLUDE_FROM_ALL)
unset(CMAKE_CXX_STANDARD CACHE) # undo libgeos's setting of global CMAKE_CXX_STANDARD
target_include_directories( target_include_directories(
geos_c geos_c
PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/geos/include> PUBLIC $<BUILD_INTERFACE:${CMAKE_CURRENT_SOURCE_DIR}/geos/include>
......
此差异已折叠。
此差异已折叠。
...@@ -667,6 +667,137 @@ Insert with req_id argument ...@@ -667,6 +667,137 @@ Insert with req_id argument
</TabItem> </TabItem>
</Tabs> </Tabs>
### Parameter Binding
The Python connector provides a parameter binding api for inserting data. Similar to most databases, TDengine currently only supports the question mark `?` to indicate the parameters to be bound.
<Tabs>
<TabItem value="native" label="native connection">
#### Create Stmt
Call the `statement` method in `Connection` to create the `stmt` for parameter binding.
```
import taos
conn = taos.connect()
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
```
#### parameter binding
Call the `new_multi_binds` function to create the parameter list for parameter bindings.
```
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
```
Call the `bind_param` (for a single row) method or the `bind_param_batch` (for multiple rows) method to set the values.
```
stmt.bind_param_batch(params)
```
#### execute sql
Call `execute` method to execute sql.
```
stmt.execute()
```
#### Close Stmt
```
stmt.close()
```
#### Example
```python
{{#include docs/examples/python/stmt_example.py}}
```
</TabItem>
<TabItem value="websocket" label="WebSocket connection">
#### Create Stmt
Call the `statement` method in `Connection` to create the `stmt` for parameter binding.
```
import taosws
conn = taosws.connect('taosws://localhost:6041/test')
stmt = conn.statement()
```
#### Prepare sql
Call `prepare` method in stmt to prepare sql.
```
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
```
#### parameter binding
Call the `bind_param` method to bind parameters.
```
stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])
```
Call the `add_batch` method to add parameters to the batch.
```
stmt.add_batch()
```
#### execute sql
Call `execute` method to execute sql.
```
stmt.execute()
```
#### Close Stmt
```
stmt.close()
```
#### Example
```python
{{#include docs/examples/python/stmt_websocket_example.py}}
```
</TabItem>
</Tabs>
### Other sample programs ### Other sample programs
| Example program links | Example program content | | Example program links | Example program content |
......
...@@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://w ...@@ -10,6 +10,10 @@ For TDengine 2.x installation packages by version, please visit [here](https://w
import Release from "/components/ReleaseV3"; import Release from "/components/ReleaseV3";
## 3.0.5.1
<Release type="tdengine" version="3.0.5.1" />
## 3.0.5.0 ## 3.0.5.0
<Release type="tdengine" version="3.0.5.0" /> <Release type="tdengine" version="3.0.5.0" />
......
...@@ -10,6 +10,10 @@ For other historical version installers, please visit [here](https://www.taosdat ...@@ -10,6 +10,10 @@ For other historical version installers, please visit [here](https://www.taosdat
import Release from "/components/ReleaseV3"; import Release from "/components/ReleaseV3";
## 2.5.2
<Release type="tools" version="2.5.2" />
## 2.5.1 ## 2.5.1
<Release type="tools" version="2.5.1" /> <Release type="tools" version="2.5.1" />
......
#!
import taosws
import taos
db_name = 'test_ws_stmt'
def before():
taos_conn = taos.connect()
taos_conn.execute("drop database if exists %s" % db_name)
taos_conn.execute("create database %s" % db_name)
taos_conn.select_db(db_name)
taos_conn.execute("create table t1 (ts timestamp, a int, b float, c varchar(10))")
taos_conn.execute(
"create table stb1 (ts timestamp, a int, b float, c varchar(10)) tags (t1 int, t2 binary(10))")
taos_conn.close()
def stmt_insert():
before()
conn = taosws.connect('taosws://root:taosdata@localhost:6041/%s' % db_name)
while True:
try:
stmt = conn.statement()
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])
stmt.add_batch()
rows = stmt.execute()
print(rows)
stmt.close()
except Exception as e:
if 'Retry needed' in e.args[0]: # deal with [0x0125] Retry needed
continue
else:
raise e
break
def stmt_insert_into_stable():
before()
conn = taosws.connect("taosws://root:taosdata@localhost:6041/%s" % db_name)
while True:
try:
stmt = conn.statement()
stmt.prepare("insert into ? using stb1 tags (?, ?) values (?, ?, ?, ?)")
stmt.set_tbname('stb1_1')
stmt.set_tags([
taosws.int_to_tag(1),
taosws.varchar_to_tag('aaa'),
])
stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])
stmt.add_batch()
rows = stmt.execute()
print(rows)
stmt.close()
except Exception as e:
if 'Retry needed' in e.args[0]: # deal with [0x0125] Retry needed
continue
else:
raise e
break
#!
import time
import taosws
import taos
def before_test(db_name):
taos_conn = taos.connect()
taos_conn.execute("drop database if exists %s" % db_name)
taos_conn.execute("create database %s" % db_name)
taos_conn.select_db(db_name)
taos_conn.execute("create table t1 (ts timestamp, a int, b float, c varchar(10))")
taos_conn.execute(
"create table stb1 (ts timestamp, a int, b float, c varchar(10)) tags (t1 int, t2 binary(10))")
taos_conn.close()
def after_test(db_name):
taos_conn = taos.connect()
taos_conn.execute("drop database if exists %s" % db_name)
taos_conn.close()
def stmt_insert():
db_name = 'test_ws_stmt_{}'.format(int(time.time()))
before_test(db_name)
conn = taosws.connect('taosws://root:taosdata@localhost:6041/%s' % db_name)
stmt = conn.statement()
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])
stmt.add_batch()
rows = stmt.execute()
assert rows == 4
stmt.close()
after_test(db_name)
def stmt_insert_into_stable():
db_name = 'test_ws_stmt_{}'.format(int(time.time()))
before_test(db_name)
conn = taosws.connect("taosws://root:taosdata@localhost:6041/%s" % db_name)
stmt = conn.statement()
stmt.prepare("insert into ? using stb1 tags (?, ?) values (?, ?, ?, ?)")
stmt.set_tbname('stb1_1')
stmt.set_tags([
taosws.int_to_tag(1),
taosws.varchar_to_tag('aaa'),
])
stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])
stmt.add_batch()
rows = stmt.execute()
assert rows == 4
stmt.close()
after_test(db_name)
if __name__ == '__main__':
stmt_insert()
stmt_insert_into_stable()
...@@ -672,6 +672,141 @@ consumer.close() ...@@ -672,6 +672,141 @@ consumer.close()
</TabItem> </TabItem>
</Tabs> </Tabs>
### 通过参数绑定写入数据
TDengine 的 Python 连接器支持参数绑定风格的 Prepare API 方式写入数据,和大多数数据库类似,目前仅支持用 `?` 来代表待绑定的参数。
<Tabs>
<TabItem value="native" label="原生连接">
#### 创建 stmt
Python 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。
```
import taos
conn = taos.connect()
stmt = conn.statement("insert into log values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?)")
```
#### 参数绑定
调用 `new_multi_binds` 函数创建 params 列表,用于参数绑定。
```
params = new_multi_binds(16)
params[0].timestamp((1626861392589, 1626861392590, 1626861392591))
params[1].bool((True, None, False))
params[2].tinyint([-128, -128, None]) # -128 is tinyint null
params[3].tinyint([0, 127, None])
params[4].smallint([3, None, 2])
params[5].int([3, 4, None])
params[6].bigint([3, 4, None])
params[7].tinyint_unsigned([3, 4, None])
params[8].smallint_unsigned([3, 4, None])
params[9].int_unsigned([3, 4, None])
params[10].bigint_unsigned([3, 4, None])
params[11].float([3, None, 1])
params[12].double([3, None, 1.2])
params[13].binary(["abc", "dddafadfadfadfadfa", None])
params[14].nchar(["涛思数据", None, "a long string with 中文字符"])
params[15].timestamp([None, None, 1626861392591])
```
调用 stmt 的 `bind_param` 以单行的方式设置 values 或 `bind_param_batch` 以多行的方式设置 values 方法绑定参数。
```
stmt.bind_param_batch(params)
```
#### 执行 sql
调用 stmt 的 `execute` 方法执行 sql
```
stmt.execute()
```
#### 关闭 stmt
最后需要关闭 stmt。
```
stmt.close()
```
#### 示例代码
```python
{{#include docs/examples/python/stmt_example.py}}
```
</TabItem>
<TabItem value="websocket" label="WebSocket 连接">
#### 创建 stmt
Python WebSocket 连接器的 `Connection` 提供了 `statement` 方法用于创建参数绑定对象 stmt,该方法接收 sql 字符串作为参数,sql 字符串目前仅支持用 `?` 来代表绑定的参数。
```
import taosws
conn = taosws.connect('taosws://localhost:6041/test')
stmt = conn.statement()
```
#### 解析 sql
调用 stmt 的 `prepare` 方法来解析 insert 语句。
```
stmt.prepare("insert into t1 values (?, ?, ?, ?)")
```
#### 参数绑定
调用 stmt 的 `bind_param` 方法绑定参数。
```
stmt.bind_param([
taosws.millis_timestamps_to_column([1686844800000, 1686844801000, 1686844802000, 1686844803000]),
taosws.ints_to_column([1, 2, 3, 4]),
taosws.floats_to_column([1.1, 2.2, 3.3, 4.4]),
taosws.varchar_to_column(['a', 'b', 'c', 'd']),
])
```
调用 stmt 的 `add_batch` 方法,将参数加入批处理。
```
stmt.add_batch()
```
#### 执行 sql
调用 stmt 的 `execute` 方法执行 sql
```
stmt.execute()
```
#### 关闭 stmt
最后需要关闭 stmt。
```
stmt.close()
```
#### 示例代码
```python
{{#include docs/examples/python/stmt_websocket_example.py}}
```
</TabItem>
</Tabs>
### 其它示例程序 ### 其它示例程序
| 示例程序链接 | 示例程序内容 | | 示例程序链接 | 示例程序内容 |
......
...@@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do ...@@ -10,6 +10,10 @@ TDengine 2.x 各版本安装包请访问[这里](https://www.taosdata.com/all-do
import Release from "/components/ReleaseV3"; import Release from "/components/ReleaseV3";
## 3.0.5.1
<Release type="tdengine" version="3.0.5.1" />
## 3.0.5.0 ## 3.0.5.0
<Release type="tdengine" version="3.0.5.0" /> <Release type="tdengine" version="3.0.5.0" />
......
...@@ -10,6 +10,10 @@ taosTools 各版本安装包下载链接如下: ...@@ -10,6 +10,10 @@ taosTools 各版本安装包下载链接如下:
import Release from "/components/ReleaseV3"; import Release from "/components/ReleaseV3";
## 2.5.2
<Release type="tools" version="2.5.2" />
## 2.5.1 ## 2.5.1
<Release type="tools" version="2.5.1" /> <Release type="tools" version="2.5.1" />
......
...@@ -198,6 +198,7 @@ typedef struct SDataBlockInfo { ...@@ -198,6 +198,7 @@ typedef struct SDataBlockInfo {
SBlockID id; SBlockID id;
int16_t hasVarCol; int16_t hasVarCol;
int16_t dataLoad; // denote if the data is loaded or not int16_t dataLoad; // denote if the data is loaded or not
uint8_t scanFlag;
// TODO: optimize and remove following // TODO: optimize and remove following
int64_t version; // used for stream, and need serialization int64_t version; // used for stream, and need serialization
......
...@@ -120,6 +120,7 @@ extern bool tsQueryUseNodeAllocator; ...@@ -120,6 +120,7 @@ extern bool tsQueryUseNodeAllocator;
extern bool tsKeepColumnName; extern bool tsKeepColumnName;
extern bool tsEnableQueryHb; extern bool tsEnableQueryHb;
extern bool tsEnableScience; extern bool tsEnableScience;
extern bool tsTtlChangeOnWrite;
extern int32_t tsRedirectPeriod; extern int32_t tsRedirectPeriod;
extern int32_t tsRedirectFactor; extern int32_t tsRedirectFactor;
extern int32_t tsRedirectMaxPeriod; extern int32_t tsRedirectMaxPeriod;
......
...@@ -945,7 +945,7 @@ int32_t tSerializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq); ...@@ -945,7 +945,7 @@ int32_t tSerializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq);
int32_t tDeserializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq); int32_t tDeserializeSVTrimDbReq(void* buf, int32_t bufLen, SVTrimDbReq* pReq);
typedef struct { typedef struct {
int32_t timestamp; int32_t timestampSec;
} SVDropTtlTableReq; } SVDropTtlTableReq;
int32_t tSerializeSVDropTtlTableReq(void* buf, int32_t bufLen, SVDropTtlTableReq* pReq); int32_t tSerializeSVDropTtlTableReq(void* buf, int32_t bufLen, SVDropTtlTableReq* pReq);
...@@ -2278,7 +2278,7 @@ typedef struct SVCreateTbReq { ...@@ -2278,7 +2278,7 @@ typedef struct SVCreateTbReq {
int32_t flags; int32_t flags;
char* name; char* name;
tb_uid_t uid; tb_uid_t uid;
int64_t ctime; int64_t btime;
int32_t ttl; int32_t ttl;
int32_t commentLen; int32_t commentLen;
char* comment; char* comment;
...@@ -2415,10 +2415,12 @@ typedef struct { ...@@ -2415,10 +2415,12 @@ typedef struct {
int32_t newTTL; int32_t newTTL;
int32_t newCommentLen; int32_t newCommentLen;
char* newComment; char* newComment;
int64_t ctimeMs; // fill by vnode
} SVAlterTbReq; } SVAlterTbReq;
int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq); int32_t tEncodeSVAlterTbReq(SEncoder* pEncoder, const SVAlterTbReq* pReq);
int32_t tDecodeSVAlterTbReq(SDecoder* pDecoder, SVAlterTbReq* pReq); int32_t tDecodeSVAlterTbReq(SDecoder* pDecoder, SVAlterTbReq* pReq);
int32_t tDecodeSVAlterTbReqSetCtime(SDecoder* pDecoder, SVAlterTbReq* pReq, int64_t ctimeMs);
typedef struct { typedef struct {
int32_t code; int32_t code;
...@@ -3436,6 +3438,7 @@ typedef struct SDeleteRes { ...@@ -3436,6 +3438,7 @@ typedef struct SDeleteRes {
int64_t affectedRows; int64_t affectedRows;
char tableFName[TSDB_TABLE_NAME_LEN]; char tableFName[TSDB_TABLE_NAME_LEN];
char tsColName[TSDB_COL_NAME_LEN]; char tsColName[TSDB_COL_NAME_LEN];
int64_t ctimeMs; // fill by vnode
} SDeleteRes; } SDeleteRes;
int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes); int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);
...@@ -3454,10 +3457,12 @@ int32_t tDecodeSSingleDeleteReq(SDecoder* pCoder, SSingleDeleteReq* pReq); ...@@ -3454,10 +3457,12 @@ int32_t tDecodeSSingleDeleteReq(SDecoder* pCoder, SSingleDeleteReq* pReq);
typedef struct { typedef struct {
int64_t suid; int64_t suid;
SArray* deleteReqs; // SArray<SSingleDeleteReq> SArray* deleteReqs; // SArray<SSingleDeleteReq>
int64_t ctimeMs; // fill by vnode
} SBatchDeleteReq; } SBatchDeleteReq;
int32_t tEncodeSBatchDeleteReq(SEncoder* pCoder, const SBatchDeleteReq* pReq); int32_t tEncodeSBatchDeleteReq(SEncoder* pCoder, const SBatchDeleteReq* pReq);
int32_t tDecodeSBatchDeleteReq(SDecoder* pCoder, SBatchDeleteReq* pReq); int32_t tDecodeSBatchDeleteReq(SDecoder* pCoder, SBatchDeleteReq* pReq);
int32_t tDecodeSBatchDeleteReqSetCtime(SDecoder* pDecoder, SBatchDeleteReq* pReq, int64_t ctimeMs);
typedef struct { typedef struct {
int32_t msgIdx; int32_t msgIdx;
...@@ -3525,6 +3530,7 @@ typedef struct { ...@@ -3525,6 +3530,7 @@ typedef struct {
SArray* aRowP; SArray* aRowP;
SArray* aCol; SArray* aCol;
}; };
int64_t ctimeMs;
} SSubmitTbData; } SSubmitTbData;
typedef struct { typedef struct {
......
...@@ -54,7 +54,7 @@ typedef struct SMetaEntry { ...@@ -54,7 +54,7 @@ typedef struct SMetaEntry {
SRSmaParam rsmaParam; SRSmaParam rsmaParam;
} stbEntry; } stbEntry;
struct { struct {
int64_t ctime; int64_t btime;
int32_t ttlDays; int32_t ttlDays;
int32_t commentLen; int32_t commentLen;
char* comment; char* comment;
...@@ -62,7 +62,7 @@ typedef struct SMetaEntry { ...@@ -62,7 +62,7 @@ typedef struct SMetaEntry {
uint8_t* pTags; uint8_t* pTags;
} ctbEntry; } ctbEntry;
struct { struct {
int64_t ctime; int64_t btime;
int32_t ttlDays; int32_t ttlDays;
int32_t commentLen; int32_t commentLen;
char* comment; char* comment;
......
...@@ -53,6 +53,8 @@ typedef struct SLogicNode { ...@@ -53,6 +53,8 @@ typedef struct SLogicNode {
EDataOrderLevel requireDataOrder; // requirements for input data EDataOrderLevel requireDataOrder; // requirements for input data
EDataOrderLevel resultDataOrder; // properties of the output data EDataOrderLevel resultDataOrder; // properties of the output data
EGroupAction groupAction; EGroupAction groupAction;
EOrder inputTsOrder;
EOrder outputTsOrder;
} SLogicNode; } SLogicNode;
typedef enum EScanType { typedef enum EScanType {
...@@ -111,7 +113,6 @@ typedef struct SJoinLogicNode { ...@@ -111,7 +113,6 @@ typedef struct SJoinLogicNode {
SNode* pMergeCondition; SNode* pMergeCondition;
SNode* pOnConditions; SNode* pOnConditions;
bool isSingleTableJoin; bool isSingleTableJoin;
EOrder inputTsOrder;
SNode* pColEqualOnConditions; SNode* pColEqualOnConditions;
} SJoinLogicNode; } SJoinLogicNode;
...@@ -229,8 +230,6 @@ typedef struct SWindowLogicNode { ...@@ -229,8 +230,6 @@ typedef struct SWindowLogicNode {
int8_t igExpired; int8_t igExpired;
int8_t igCheckUpdate; int8_t igCheckUpdate;
EWindowAlgorithm windowAlgo; EWindowAlgorithm windowAlgo;
EOrder inputTsOrder;
EOrder outputTsOrder;
} SWindowLogicNode; } SWindowLogicNode;
typedef struct SFillLogicNode { typedef struct SFillLogicNode {
...@@ -241,7 +240,6 @@ typedef struct SFillLogicNode { ...@@ -241,7 +240,6 @@ typedef struct SFillLogicNode {
SNode* pWStartTs; SNode* pWStartTs;
SNode* pValues; // SNodeListNode SNode* pValues; // SNodeListNode
STimeWindow timeRange; STimeWindow timeRange;
EOrder inputTsOrder;
} SFillLogicNode; } SFillLogicNode;
typedef struct SSortLogicNode { typedef struct SSortLogicNode {
...@@ -310,6 +308,8 @@ typedef struct SDataBlockDescNode { ...@@ -310,6 +308,8 @@ typedef struct SDataBlockDescNode {
typedef struct SPhysiNode { typedef struct SPhysiNode {
ENodeType type; ENodeType type;
EOrder inputTsOrder;
EOrder outputTsOrder;
SDataBlockDescNode* pOutputDataBlockDesc; SDataBlockDescNode* pOutputDataBlockDesc;
SNode* pConditions; SNode* pConditions;
SNodeList* pChildren; SNodeList* pChildren;
...@@ -406,7 +406,6 @@ typedef struct SSortMergeJoinPhysiNode { ...@@ -406,7 +406,6 @@ typedef struct SSortMergeJoinPhysiNode {
SNode* pMergeCondition; SNode* pMergeCondition;
SNode* pOnConditions; SNode* pOnConditions;
SNodeList* pTargets; SNodeList* pTargets;
EOrder inputTsOrder;
SNode* pColEqualOnConditions; SNode* pColEqualOnConditions;
} SSortMergeJoinPhysiNode; } SSortMergeJoinPhysiNode;
...@@ -460,8 +459,6 @@ typedef struct SWindowPhysiNode { ...@@ -460,8 +459,6 @@ typedef struct SWindowPhysiNode {
int64_t watermark; int64_t watermark;
int64_t deleteMark; int64_t deleteMark;
int8_t igExpired; int8_t igExpired;
EOrder inputTsOrder;
EOrder outputTsOrder;
bool mergeDataBlock; bool mergeDataBlock;
} SWindowPhysiNode; } SWindowPhysiNode;
...@@ -488,7 +485,6 @@ typedef struct SFillPhysiNode { ...@@ -488,7 +485,6 @@ typedef struct SFillPhysiNode {
SNode* pWStartTs; // SColumnNode SNode* pWStartTs; // SColumnNode
SNode* pValues; // SNodeListNode SNode* pValues; // SNodeListNode
STimeWindow timeRange; STimeWindow timeRange;
EOrder inputTsOrder;
} SFillPhysiNode; } SFillPhysiNode;
typedef SFillPhysiNode SStreamFillPhysiNode; typedef SFillPhysiNode SStreamFillPhysiNode;
......
...@@ -69,6 +69,7 @@ typedef struct SColumnNode { ...@@ -69,6 +69,7 @@ typedef struct SColumnNode {
uint64_t tableId; uint64_t tableId;
int8_t tableType; int8_t tableType;
col_id_t colId; col_id_t colId;
uint16_t projIdx; // the idx in project list, start from 1
EColumnType colType; // column or tag EColumnType colType; // column or tag
bool hasIndex; bool hasIndex;
char dbName[TSDB_DB_NAME_LEN]; char dbName[TSDB_DB_NAME_LEN];
......
...@@ -791,8 +791,8 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) { ...@@ -791,8 +791,8 @@ size_t blockDataGetRowSize(SSDataBlock* pBlock) {
* @return * @return
*/ */
size_t blockDataGetSerialMetaSize(uint32_t numOfCols) { size_t blockDataGetSerialMetaSize(uint32_t numOfCols) {
// | version | total length | total rows | total columns | flag seg| block group id | column schema | each column // | version | total length | total rows | total columns | flag seg| block group id | column schema
// length | // | each column length |
return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(uint64_t) + return sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(int32_t) + sizeof(uint64_t) +
numOfCols * (sizeof(int8_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t); numOfCols * (sizeof(int8_t) + sizeof(int32_t)) + numOfCols * sizeof(int32_t);
} }
......
...@@ -110,6 +110,7 @@ int32_t tsQueryRspPolicy = 0; ...@@ -110,6 +110,7 @@ int32_t tsQueryRspPolicy = 0;
int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT int64_t tsQueryMaxConcurrentTables = 200; // unit is TSDB_TABLE_NUM_UNIT
bool tsEnableQueryHb = false; bool tsEnableQueryHb = false;
bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true bool tsEnableScience = false; // on taos-cli show float and doulbe with scientific notation if true
bool tsTtlChangeOnWrite = false; // ttl delete time changes on last write if true
int32_t tsQuerySmaOptimize = 0; int32_t tsQuerySmaOptimize = 0;
int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data. int32_t tsQueryRsmaTolerance = 1000; // the tolerance time (ms) to judge from which level to query rsma data.
bool tsQueryPlannerTrace = false; bool tsQueryPlannerTrace = false;
...@@ -513,6 +514,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) { ...@@ -513,6 +514,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "mqRebalanceInterval", tsMqRebalanceInterval, 1, 10000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, 1) != 0) return -1; if (cfgAddInt32(pCfg, "ttlUnit", tsTtlUnit, 1, 86400 * 365, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 100000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "ttlPushInterval", tsTtlPushInterval, 1, 100000, 1) != 0) return -1;
if (cfgAddBool(pCfg, "ttlChangeOnWrite", tsTtlChangeOnWrite, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1; if (cfgAddInt32(pCfg, "uptimeInterval", tsUptimeInterval, 1, 100000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1; if (cfgAddInt32(pCfg, "queryRsmaTolerance", tsQueryRsmaTolerance, 0, 900000, 0) != 0) return -1;
...@@ -873,6 +875,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) { ...@@ -873,6 +875,7 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval; tsEnableTelem = cfgGetItem(pCfg, "telemetryReporting")->bval;
tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval; tsEnableCrashReport = cfgGetItem(pCfg, "crashReporting")->bval;
tsTtlChangeOnWrite = cfgGetItem(pCfg, "ttlChangeOnWrite")->bval;
tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32; tsTelemInterval = cfgGetItem(pCfg, "telemetryInterval")->i32;
tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN); tstrncpy(tsTelemServer, cfgGetItem(pCfg, "telemetryServer")->str, TSDB_FQDN_LEN);
tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32; tsTelemPort = (uint16_t)cfgGetItem(pCfg, "telemetryPort")->i32;
...@@ -978,6 +981,8 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) { ...@@ -978,6 +981,8 @@ int32_t taosApplyLocalCfg(SConfig *pCfg, char *name) {
taosSetCoreDump(enableCore); taosSetCoreDump(enableCore);
} else if (strcasecmp("enableQueryHb", name) == 0) { } else if (strcasecmp("enableQueryHb", name) == 0) {
tsEnableQueryHb = cfgGetItem(pCfg, "enableQueryHb")->bval; tsEnableQueryHb = cfgGetItem(pCfg, "enableQueryHb")->bval;
} else if (strcasecmp("ttlChangeOnWrite", name) == 0) {
tsTtlChangeOnWrite = cfgGetItem(pCfg, "ttlChangeOnWrite")->bval;
} }
break; break;
} }
......
...@@ -30,6 +30,9 @@ ...@@ -30,6 +30,9 @@
#include "tlog.h" #include "tlog.h"
static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq);
static int32_t tDecodeSBatchDeleteReqCommon(SDecoder *pDecoder, SBatchDeleteReq *pReq);
int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) { int32_t tInitSubmitMsgIter(const SSubmitReq *pMsg, SSubmitMsgIter *pIter) {
if (pMsg == NULL) { if (pMsg == NULL) {
terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP; terrno = TSDB_CODE_TDB_SUBMIT_MSG_MSSED_UP;
...@@ -1725,7 +1728,7 @@ int32_t tDeserializeSDropDnodeReq(void *buf, int32_t bufLen, SDropDnodeReq *pReq ...@@ -1725,7 +1728,7 @@ int32_t tDeserializeSDropDnodeReq(void *buf, int32_t bufLen, SDropDnodeReq *pReq
} else { } else {
pReq->unsafe = false; pReq->unsafe = false;
} }
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
...@@ -3161,7 +3164,7 @@ int32_t tSerializeSVDropTtlTableReq(void *buf, int32_t bufLen, SVDropTtlTableReq ...@@ -3161,7 +3164,7 @@ int32_t tSerializeSVDropTtlTableReq(void *buf, int32_t bufLen, SVDropTtlTableReq
tEncoderInit(&encoder, buf, bufLen); tEncoderInit(&encoder, buf, bufLen);
if (tStartEncode(&encoder) < 0) return -1; if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->timestamp) < 0) return -1; if (tEncodeI32(&encoder, pReq->timestampSec) < 0) return -1;
tEndEncode(&encoder); tEndEncode(&encoder);
int32_t tlen = encoder.pos; int32_t tlen = encoder.pos;
...@@ -3174,7 +3177,7 @@ int32_t tDeserializeSVDropTtlTableReq(void *buf, int32_t bufLen, SVDropTtlTableR ...@@ -3174,7 +3177,7 @@ int32_t tDeserializeSVDropTtlTableReq(void *buf, int32_t bufLen, SVDropTtlTableR
tDecoderInit(&decoder, buf, bufLen); tDecoderInit(&decoder, buf, bufLen);
if (tStartDecode(&decoder) < 0) return -1; if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->timestamp) < 0) return -1; if (tDecodeI32(&decoder, &pReq->timestampSec) < 0) return -1;
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
...@@ -4671,7 +4674,7 @@ int32_t tDeserializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnode ...@@ -4671,7 +4674,7 @@ int32_t tDeserializeSAlterVnodeReplicaReq(void *buf, int32_t bufLen, SAlterVnode
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1; if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
} }
} }
tEndDecode(&decoder); tEndDecode(&decoder);
tDecoderClear(&decoder); tDecoderClear(&decoder);
return 0; return 0;
...@@ -6409,7 +6412,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) { ...@@ -6409,7 +6412,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
if (tEncodeI32v(pCoder, pReq->flags) < 0) return -1; if (tEncodeI32v(pCoder, pReq->flags) < 0) return -1;
if (tEncodeCStr(pCoder, pReq->name) < 0) return -1; if (tEncodeCStr(pCoder, pReq->name) < 0) return -1;
if (tEncodeI64(pCoder, pReq->uid) < 0) return -1; if (tEncodeI64(pCoder, pReq->uid) < 0) return -1;
if (tEncodeI64(pCoder, pReq->ctime) < 0) return -1; if (tEncodeI64(pCoder, pReq->btime) < 0) return -1;
if (tEncodeI32(pCoder, pReq->ttl) < 0) return -1; if (tEncodeI32(pCoder, pReq->ttl) < 0) return -1;
if (tEncodeI8(pCoder, pReq->type) < 0) return -1; if (tEncodeI8(pCoder, pReq->type) < 0) return -1;
if (tEncodeI32(pCoder, pReq->commentLen) < 0) return -1; if (tEncodeI32(pCoder, pReq->commentLen) < 0) return -1;
...@@ -6444,7 +6447,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) { ...@@ -6444,7 +6447,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
if (tDecodeI32v(pCoder, &pReq->flags) < 0) return -1; if (tDecodeI32v(pCoder, &pReq->flags) < 0) return -1;
if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1; if (tDecodeCStr(pCoder, &pReq->name) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->uid) < 0) return -1; if (tDecodeI64(pCoder, &pReq->uid) < 0) return -1;
if (tDecodeI64(pCoder, &pReq->ctime) < 0) return -1; if (tDecodeI64(pCoder, &pReq->btime) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->ttl) < 0) return -1; if (tDecodeI32(pCoder, &pReq->ttl) < 0) return -1;
if (tDecodeI8(pCoder, &pReq->type) < 0) return -1; if (tDecodeI8(pCoder, &pReq->type) < 0) return -1;
if (tDecodeI32(pCoder, &pReq->commentLen) < 0) return -1; if (tDecodeI32(pCoder, &pReq->commentLen) < 0) return -1;
...@@ -6909,14 +6912,13 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) { ...@@ -6909,14 +6912,13 @@ int32_t tEncodeSVAlterTbReq(SEncoder *pEncoder, const SVAlterTbReq *pReq) {
default: default:
break; break;
} }
if (tEncodeI64(pEncoder, pReq->ctimeMs) < 0) return -1;
tEndEncode(pEncoder); tEndEncode(pEncoder);
return 0; return 0;
} }
int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) { static int32_t tDecodeSVAlterTbReqCommon(SDecoder *pDecoder, SVAlterTbReq *pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeCStr(pDecoder, &pReq->tbName) < 0) return -1; if (tDecodeCStr(pDecoder, &pReq->tbName) < 0) return -1;
if (tDecodeI8(pDecoder, &pReq->action) < 0) return -1; if (tDecodeI8(pDecoder, &pReq->action) < 0) return -1;
if (tDecodeI32(pDecoder, &pReq->colId) < 0) return -1; if (tDecodeI32(pDecoder, &pReq->colId) < 0) return -1;
...@@ -6960,6 +6962,28 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) { ...@@ -6960,6 +6962,28 @@ int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
default: default:
break; break;
} }
return 0;
}
int32_t tDecodeSVAlterTbReq(SDecoder *pDecoder, SVAlterTbReq *pReq) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeSVAlterTbReqCommon(pDecoder, pReq) < 0) return -1;
pReq->ctimeMs = 0;
if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
}
tEndDecode(pDecoder);
return 0;
}
int32_t tDecodeSVAlterTbReqSetCtime(SDecoder* pDecoder, SVAlterTbReq* pReq, int64_t ctimeMs) {
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeSVAlterTbReqCommon(pDecoder, pReq) < 0) return -1;
*(int64_t *)(pDecoder->data + pDecoder->pos) = ctimeMs;
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
tEndDecode(pDecoder); tEndDecode(pDecoder);
return 0; return 0;
...@@ -7238,6 +7262,7 @@ int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) { ...@@ -7238,6 +7262,7 @@ int32_t tEncodeDeleteRes(SEncoder *pCoder, const SDeleteRes *pRes) {
if (tEncodeCStr(pCoder, pRes->tableFName) < 0) return -1; if (tEncodeCStr(pCoder, pRes->tableFName) < 0) return -1;
if (tEncodeCStr(pCoder, pRes->tsColName) < 0) return -1; if (tEncodeCStr(pCoder, pRes->tsColName) < 0) return -1;
if (tEncodeI64(pCoder, pRes->ctimeMs) < 0) return -1;
return 0; return 0;
} }
...@@ -7257,6 +7282,11 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) { ...@@ -7257,6 +7282,11 @@ int32_t tDecodeDeleteRes(SDecoder *pCoder, SDeleteRes *pRes) {
if (tDecodeCStrTo(pCoder, pRes->tableFName) < 0) return -1; if (tDecodeCStrTo(pCoder, pRes->tableFName) < 0) return -1;
if (tDecodeCStrTo(pCoder, pRes->tsColName) < 0) return -1; if (tDecodeCStrTo(pCoder, pRes->tsColName) < 0) return -1;
pRes->ctimeMs = 0;
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeI64(pCoder, &pRes->ctimeMs) < 0) return -1;
}
return 0; return 0;
} }
...@@ -7480,10 +7510,11 @@ int32_t tEncodeSBatchDeleteReq(SEncoder *pEncoder, const SBatchDeleteReq *pReq) ...@@ -7480,10 +7510,11 @@ int32_t tEncodeSBatchDeleteReq(SEncoder *pEncoder, const SBatchDeleteReq *pReq)
SSingleDeleteReq *pOneReq = taosArrayGet(pReq->deleteReqs, i); SSingleDeleteReq *pOneReq = taosArrayGet(pReq->deleteReqs, i);
if (tEncodeSSingleDeleteReq(pEncoder, pOneReq) < 0) return -1; if (tEncodeSSingleDeleteReq(pEncoder, pOneReq) < 0) return -1;
} }
if (tEncodeI64(pEncoder, pReq->ctimeMs) < 0) return -1;
return 0; return 0;
} }
int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) { static int32_t tDecodeSBatchDeleteReqCommon(SDecoder *pDecoder, SBatchDeleteReq *pReq) {
if (tDecodeI64(pDecoder, &pReq->suid) < 0) return -1; if (tDecodeI64(pDecoder, &pReq->suid) < 0) return -1;
int32_t sz; int32_t sz;
if (tDecodeI32(pDecoder, &sz) < 0) return -1; if (tDecodeI32(pDecoder, &sz) < 0) return -1;
...@@ -7497,6 +7528,24 @@ int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) { ...@@ -7497,6 +7528,24 @@ int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) {
return 0; return 0;
} }
int32_t tDecodeSBatchDeleteReq(SDecoder *pDecoder, SBatchDeleteReq *pReq) {
if (tDecodeSBatchDeleteReqCommon(pDecoder, pReq)) return -1;
pReq->ctimeMs = 0;
if (!tDecodeIsEnd(pDecoder)) {
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
}
return 0;
}
int32_t tDecodeSBatchDeleteReqSetCtime(SDecoder *pDecoder, SBatchDeleteReq *pReq, int64_t ctimeMs) {
if (tDecodeSBatchDeleteReqCommon(pDecoder, pReq)) return -1;
*(int64_t *)(pDecoder->data + pDecoder->pos) = ctimeMs;
if (tDecodeI64(pDecoder, &pReq->ctimeMs) < 0) return -1;
return 0;
}
static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubmitTbData) { static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubmitTbData) {
if (tStartEncode(pCoder) < 0) return -1; if (tStartEncode(pCoder) < 0) return -1;
...@@ -7531,6 +7580,7 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm ...@@ -7531,6 +7580,7 @@ static int32_t tEncodeSSubmitTbData(SEncoder *pCoder, const SSubmitTbData *pSubm
pCoder->pos += rows[iRow]->len; pCoder->pos += rows[iRow]->len;
} }
} }
if (tEncodeI64(pCoder, pSubmitTbData->ctimeMs) < 0) return -1;
tEndEncode(pCoder); tEndEncode(pCoder);
return 0; return 0;
...@@ -7611,6 +7661,14 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa ...@@ -7611,6 +7661,14 @@ static int32_t tDecodeSSubmitTbData(SDecoder *pCoder, SSubmitTbData *pSubmitTbDa
} }
} }
pSubmitTbData->ctimeMs = 0;
if (!tDecodeIsEnd(pCoder)) {
if (tDecodeI64(pCoder, &pSubmitTbData->ctimeMs) < 0) {
code = TSDB_CODE_INVALID_MSG;
goto _exit;
}
}
tEndDecode(pCoder); tEndDecode(pCoder);
_exit: _exit:
......
...@@ -46,6 +46,7 @@ typedef struct { ...@@ -46,6 +46,7 @@ typedef struct {
int32_t vgId; int32_t vgId;
int32_t vgVersion; int32_t vgVersion;
int8_t dropped; int8_t dropped;
int32_t toVgId;
char path[PATH_MAX + 20]; char path[PATH_MAX + 20];
} SWrapperCfg; } SWrapperCfg;
...@@ -55,6 +56,7 @@ typedef struct { ...@@ -55,6 +56,7 @@ typedef struct {
int32_t refCount; int32_t refCount;
int8_t dropped; int8_t dropped;
int8_t disable; int8_t disable;
int32_t toVgId;
char *path; char *path;
SVnode *pImpl; SVnode *pImpl;
SMultiWorker pWriteW; SMultiWorker pWriteW;
...@@ -70,6 +72,7 @@ typedef struct { ...@@ -70,6 +72,7 @@ typedef struct {
int32_t vnodeNum; int32_t vnodeNum;
int32_t opened; int32_t opened;
int32_t failed; int32_t failed;
bool updateVnodesList;
int32_t threadIndex; int32_t threadIndex;
TdThread thread; TdThread thread;
SVnodeMgmt *pMgmt; SVnodeMgmt *pMgmt;
......
...@@ -71,6 +71,8 @@ static int32_t vmDecodeVnodeList(SJson *pJson, SVnodeMgmt *pMgmt, SWrapperCfg ** ...@@ -71,6 +71,8 @@ static int32_t vmDecodeVnodeList(SJson *pJson, SVnodeMgmt *pMgmt, SWrapperCfg **
if (code < 0) goto _OVER; if (code < 0) goto _OVER;
tjsonGetInt32ValueFromDouble(vnode, "vgVersion", pCfg->vgVersion, code); tjsonGetInt32ValueFromDouble(vnode, "vgVersion", pCfg->vgVersion, code);
if (code < 0) goto _OVER; if (code < 0) goto _OVER;
tjsonGetInt32ValueFromDouble(vnode, "toVgId", pCfg->toVgId, code);
if (code < 0) goto _OVER;
snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCfg->vgId); snprintf(pCfg->path, sizeof(pCfg->path), "%s%svnode%d", pMgmt->path, TD_DIRSEP, pCfg->vgId);
} }
...@@ -165,6 +167,7 @@ static int32_t vmEncodeVnodeList(SJson *pJson, SVnodeObj **ppVnodes, int32_t num ...@@ -165,6 +167,7 @@ static int32_t vmEncodeVnodeList(SJson *pJson, SVnodeObj **ppVnodes, int32_t num
if (tjsonAddDoubleToObject(vnode, "vgId", pVnode->vgId) < 0) return -1; if (tjsonAddDoubleToObject(vnode, "vgId", pVnode->vgId) < 0) return -1;
if (tjsonAddDoubleToObject(vnode, "dropped", pVnode->dropped) < 0) return -1; if (tjsonAddDoubleToObject(vnode, "dropped", pVnode->dropped) < 0) return -1;
if (tjsonAddDoubleToObject(vnode, "vgVersion", pVnode->vgVersion) < 0) return -1; if (tjsonAddDoubleToObject(vnode, "vgVersion", pVnode->vgVersion) < 0) return -1;
if (pVnode->toVgId && tjsonAddDoubleToObject(vnode, "toVgId", pVnode->toVgId) < 0) return -1;
if (tjsonAddItemToArray(vnodes, vnode) < 0) return -1; if (tjsonAddItemToArray(vnodes, vnode) < 0) return -1;
} }
...@@ -179,7 +182,7 @@ int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) { ...@@ -179,7 +182,7 @@ int32_t vmWriteVnodeListToFile(SVnodeMgmt *pMgmt) {
SVnodeObj **ppVnodes = NULL; SVnodeObj **ppVnodes = NULL;
char file[PATH_MAX] = {0}; char file[PATH_MAX] = {0};
char realfile[PATH_MAX] = {0}; char realfile[PATH_MAX] = {0};
snprintf(file, sizeof(file), "%s%svnodes.json.bak", pMgmt->path, TD_DIRSEP); snprintf(file, sizeof(file), "%s%svnodes_tmp.json", pMgmt->path, TD_DIRSEP);
snprintf(realfile, sizeof(realfile), "%s%svnodes.json", pMgmt->path, TD_DIRSEP); snprintf(realfile, sizeof(realfile), "%s%svnodes.json", pMgmt->path, TD_DIRSEP);
int32_t numOfVnodes = 0; int32_t numOfVnodes = 0;
...@@ -226,4 +229,4 @@ _OVER: ...@@ -226,4 +229,4 @@ _OVER:
dError("failed to write vnodes file:%s since %s, vnodes:%d", realfile, terrstr(), numOfVnodes); dError("failed to write vnodes file:%s since %s, vnodes:%d", realfile, terrstr(), numOfVnodes);
} }
return code; return code;
} }
\ No newline at end of file
...@@ -481,10 +481,18 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -481,10 +481,18 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
int32_t srcVgId = req.srcVgId; int32_t srcVgId = req.srcVgId;
int32_t dstVgId = req.dstVgId; int32_t dstVgId = req.dstVgId;
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, dstVgId);
if (pVnode != NULL) {
dError("vgId:%d, vnode already exist", dstVgId);
vmReleaseVnode(pMgmt, pVnode);
terrno = TSDB_CODE_VND_ALREADY_EXIST;
return -1;
}
dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd, dInfo("vgId:%d, start to alter vnode hashrange:[%u, %u], dstVgId:%d", req.srcVgId, req.hashBegin, req.hashEnd,
req.dstVgId); req.dstVgId);
pVnode = vmAcquireVnode(pMgmt, srcVgId);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, srcVgId);
if (pVnode == NULL) { if (pVnode == NULL) {
dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr()); dError("vgId:%d, failed to alter hashrange since %s", srcVgId, terrstr());
terrno = TSDB_CODE_VND_NOT_EXIST; terrno = TSDB_CODE_VND_NOT_EXIST;
...@@ -498,6 +506,13 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -498,6 +506,13 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}; };
tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path)); tstrncpy(wrapperCfg.path, pVnode->path, sizeof(wrapperCfg.path));
// prepare alter
pVnode->toVgId = dstVgId;
if (vmWriteVnodeListToFile(pMgmt) != 0) {
dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
return -1;
}
dInfo("vgId:%d, close vnode", srcVgId); dInfo("vgId:%d, close vnode", srcVgId);
vmCloseVnode(pMgmt, pVnode, true); vmCloseVnode(pMgmt, pVnode, true);
...@@ -529,6 +544,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) { ...@@ -529,6 +544,7 @@ int32_t vmProcessAlterHashRangeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1; return -1;
} }
// complete alter
if (vmWriteVnodeListToFile(pMgmt) != 0) { if (vmWriteVnodeListToFile(pMgmt) != 0) {
dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr()); dError("vgId:%d, failed to write vnode list since %s", dstVgId, terrstr());
return -1; return -1;
......
...@@ -158,6 +158,28 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal) ...@@ -158,6 +158,28 @@ void vmCloseVnode(SVnodeMgmt *pMgmt, SVnodeObj *pVnode, bool commitAndRemoveWal)
taosMemoryFree(pVnode); taosMemoryFree(pVnode);
} }
static int32_t vmRestoreVgroupId(SWrapperCfg *pCfg, STfs *pTfs) {
int32_t srcVgId = pCfg->vgId;
int32_t dstVgId = pCfg->toVgId;
if (dstVgId == 0) return 0;
char srcPath[TSDB_FILENAME_LEN];
char dstPath[TSDB_FILENAME_LEN];
snprintf(srcPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, srcVgId);
snprintf(dstPath, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, dstVgId);
int32_t vgId = vnodeRestoreVgroupId(srcPath, dstPath, srcVgId, dstVgId, pTfs);
if (vgId <= 0) {
dError("vgId:%d, failed to restore vgroup id. srcPath: %s", pCfg->vgId, srcPath);
return -1;
}
pCfg->vgId = vgId;
pCfg->toVgId = 0;
return 0;
}
static void *vmOpenVnodeInThread(void *param) { static void *vmOpenVnodeInThread(void *param) {
SVnodeThread *pThread = param; SVnodeThread *pThread = param;
SVnodeMgmt *pMgmt = pThread->pMgmt; SVnodeMgmt *pMgmt = pThread->pMgmt;
...@@ -174,17 +196,33 @@ static void *vmOpenVnodeInThread(void *param) { ...@@ -174,17 +196,33 @@ static void *vmOpenVnodeInThread(void *param) {
pMgmt->state.openVnodes, pMgmt->state.totalVnodes); pMgmt->state.openVnodes, pMgmt->state.totalVnodes);
tmsgReportStartup("vnode-open", stepDesc); tmsgReportStartup("vnode-open", stepDesc);
if (pCfg->toVgId) {
if (vmRestoreVgroupId(pCfg, pMgmt->pTfs) != 0) {
dError("vgId:%d, failed to restore vgroup id by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->failed++;
continue;
}
pThread->updateVnodesList = true;
}
snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId); snprintf(path, TSDB_FILENAME_LEN, "vnode%svnode%d", TD_DIRSEP, pCfg->vgId);
SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb); SVnode *pImpl = vnodeOpen(path, pMgmt->pTfs, pMgmt->msgCb);
if (pImpl == NULL) { if (pImpl == NULL) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex); dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->failed++; pThread->failed++;
} else { continue;
vmOpenVnode(pMgmt, pCfg, pImpl);
dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->opened++;
atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
} }
if (vmOpenVnode(pMgmt, pCfg, pImpl) != 0) {
dError("vgId:%d, failed to open vnode by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->failed++;
continue;
}
dInfo("vgId:%d, is opened by thread:%d", pCfg->vgId, pThread->threadIndex);
pThread->opened++;
atomic_add_fetch_32(&pMgmt->state.openVnodes, 1);
} }
dInfo("thread:%d, numOfVnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened, dInfo("thread:%d, numOfVnodes:%d, opened:%d failed:%d", pThread->threadIndex, pThread->vnodeNum, pThread->opened,
...@@ -242,6 +280,8 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { ...@@ -242,6 +280,8 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
taosThreadAttrDestroy(&thAttr); taosThreadAttrDestroy(&thAttr);
} }
bool updateVnodesList = false;
for (int32_t t = 0; t < threadNum; ++t) { for (int32_t t = 0; t < threadNum; ++t) {
SVnodeThread *pThread = &threads[t]; SVnodeThread *pThread = &threads[t];
if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) { if (pThread->vnodeNum > 0 && taosCheckPthreadValid(pThread->thread)) {
...@@ -249,6 +289,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { ...@@ -249,6 +289,7 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
taosThreadClear(&pThread->thread); taosThreadClear(&pThread->thread);
} }
taosMemoryFree(pThread->pCfgs); taosMemoryFree(pThread->pCfgs);
if (pThread->updateVnodesList) updateVnodesList = true;
} }
taosMemoryFree(threads); taosMemoryFree(threads);
taosMemoryFree(pCfgs); taosMemoryFree(pCfgs);
...@@ -256,10 +297,15 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) { ...@@ -256,10 +297,15 @@ static int32_t vmOpenVnodes(SVnodeMgmt *pMgmt) {
if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) { if (pMgmt->state.openVnodes != pMgmt->state.totalVnodes) {
dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes); dError("there are total vnodes:%d, opened:%d", pMgmt->state.totalVnodes, pMgmt->state.openVnodes);
return -1; return -1;
} else {
dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
return 0;
} }
if (updateVnodesList && vmWriteVnodeListToFile(pMgmt) != 0) {
dError("failed to write vnode list since %s", terrstr());
return -1;
}
dInfo("successfully opened %d vnodes", pMgmt->state.totalVnodes);
return 0;
} }
static void *vmCloseVnodeInThread(void *param) { static void *vmCloseVnodeInThread(void *param) {
......
...@@ -888,7 +888,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) { ...@@ -888,7 +888,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SVgObj *pVgroup = NULL; SVgObj *pVgroup = NULL;
void *pIter = NULL; void *pIter = NULL;
SVDropTtlTableReq ttlReq = {.timestamp = taosGetTimestampSec()}; SVDropTtlTableReq ttlReq = {.timestampSec = taosGetTimestampSec()};
int32_t reqLen = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq); int32_t reqLen = tSerializeSVDropTtlTableReq(NULL, 0, &ttlReq);
int32_t contLen = reqLen + sizeof(SMsgHead); int32_t contLen = reqLen + sizeof(SMsgHead);
...@@ -914,7 +914,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) { ...@@ -914,7 +914,7 @@ static int32_t mndProcessTtlTimer(SRpcMsg *pReq) {
if (code != 0) { if (code != 0) {
mError("vgId:%d, failed to send drop ttl table request to vnode since 0x%x", pVgroup->vgId, code); mError("vgId:%d, failed to send drop ttl table request to vnode since 0x%x", pVgroup->vgId, code);
} else { } else {
mInfo("vgId:%d, send drop ttl table request to vnode, time:%d", pVgroup->vgId, ttlReq.timestamp); mInfo("vgId:%d, send drop ttl table request to vnode, time:%" PRId32, pVgroup->vgId, ttlReq.timestampSec);
} }
sdbRelease(pSdb, pVgroup); sdbRelease(pSdb, pVgroup);
} }
...@@ -3161,7 +3161,6 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB ...@@ -3161,7 +3161,6 @@ static int32_t mndRetrieveStbCol(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pB
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
SStbObj *pStb = NULL; SStbObj *pStb = NULL;
int32_t numOfRows = 0; int32_t numOfRows = 0;
if (!pShow->sysDbRsp) { if (!pShow->sysDbRsp) {
numOfRows = buildSysDbColsInfo(pBlock, pShow->db, pShow->filterTb); numOfRows = buildSysDbColsInfo(pBlock, pShow->db, pShow->filterTb);
......
...@@ -1217,6 +1217,7 @@ static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, i ...@@ -1217,6 +1217,7 @@ static int32_t mndAddAlterVnodeHashRangeAction(SMnode *pMnode, STrans *pTrans, i
action.pCont = pReq; action.pCont = pReq;
action.contLen = contLen; action.contLen = contLen;
action.msgType = TDMT_VND_ALTER_HASHRANGE; action.msgType = TDMT_VND_ALTER_HASHRANGE;
action.acceptableCode = TSDB_CODE_VND_ALREADY_EXIST;
if (mndTransAppendRedoAction(pTrans, &action) != 0) { if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq); taosMemoryFree(pReq);
......
...@@ -27,6 +27,7 @@ target_sources( ...@@ -27,6 +27,7 @@ target_sources(
"src/meta/metaEntry.c" "src/meta/metaEntry.c"
"src/meta/metaSnapshot.c" "src/meta/metaSnapshot.c"
"src/meta/metaCache.c" "src/meta/metaCache.c"
"src/meta/metaTtl.c"
# sma # sma
"src/sma/smaEnv.c" "src/sma/smaEnv.c"
...@@ -80,6 +81,7 @@ IF (TD_VNODE_PLUGINS) ...@@ -80,6 +81,7 @@ IF (TD_VNODE_PLUGINS)
) )
ENDIF () ENDIF ()
IF (NOT ${TD_LINUX})
target_include_directories( target_include_directories(
vnode vnode
PUBLIC "inc" PUBLIC "inc"
...@@ -87,7 +89,25 @@ target_include_directories( ...@@ -87,7 +89,25 @@ target_include_directories(
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar" PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include" PUBLIC "${TD_SOURCE_DIR}/contrib/rocksdb/include"
) )
ELSE()
target_include_directories(
vnode
PUBLIC "inc"
PUBLIC "src/inc"
PUBLIC "${TD_SOURCE_DIR}/include/libs/scalar"
)
ENDIF (NOT ${TD_LINUX})
IF (TD_LINUX) IF (TD_LINUX)
target_include_directories(
vnode
PUBLIC "${TD_SOURCE_DIR}/deps/${TD_DEPS_DIR}/rocksdb_static"
)
target_link_directories(
vnode
PUBLIC "${TD_SOURCE_DIR}/deps/${TD_DEPS_DIR}/rocksdb_static"
)
target_link_libraries( target_link_libraries(
vnode vnode
PUBLIC os PUBLIC os
......
...@@ -54,6 +54,7 @@ void vnodeCleanup(); ...@@ -54,6 +54,7 @@ void vnodeCleanup();
int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs); int32_t vnodeCreate(const char *path, SVnodeCfg *pCfg, STfs *pTfs);
int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs); int32_t vnodeAlterReplica(const char *path, SAlterVnodeReplicaReq *pReq, STfs *pTfs);
int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs); int32_t vnodeAlterHashRange(const char *srcPath, const char *dstPath, SAlterVnodeHashRangeReq *pReq, STfs *pTfs);
int32_t vnodeRestoreVgroupId(const char *srcPath, const char *dstPath, int32_t srcVgId, int32_t dstVgId, STfs *pTfs);
void vnodeDestroy(const char *path, STfs *pTfs); void vnodeDestroy(const char *path, STfs *pTfs);
SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb); SVnode *vnodeOpen(const char *path, STfs *pTfs, SMsgCb msgCb);
void vnodePreClose(SVnode *pVnode); void vnodePreClose(SVnode *pVnode);
...@@ -114,6 +115,7 @@ int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName); ...@@ -114,6 +115,7 @@ int metaGetTableNameByUid(void *meta, uint64_t uid, char *tbName);
int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName); int metaGetTableSzNameByUid(void *meta, uint64_t uid, char *tbName);
int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid); int metaGetTableUidByName(void *pVnode, char *tbName, uint64_t *uid);
int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType); int metaGetTableTypeByName(void *meta, char *tbName, ETableType *tbType);
int metaGetTableTtlByUid(void *meta, uint64_t uid, int64_t *ttlDays);
bool metaIsTableExist(void* pVnode, tb_uid_t uid); bool metaIsTableExist(void* pVnode, tb_uid_t uid);
int32_t metaGetCachedTableUidList(void *pVnode, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList, int32_t metaGetCachedTableUidList(void *pVnode, tb_uid_t suid, const uint8_t *key, int32_t keyLen, SArray *pList,
bool *acquired); bool *acquired);
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#define _TD_VNODE_META_H_ #define _TD_VNODE_META_H_
#include "index.h" #include "index.h"
#include "metaTtl.h"
#include "vnodeInt.h" #include "vnodeInt.h"
#ifdef __cplusplus #ifdef __cplusplus
...@@ -89,10 +90,10 @@ struct SMeta { ...@@ -89,10 +90,10 @@ struct SMeta {
// ivt idx and idx // ivt idx and idx
void* pTagIvtIdx; void* pTagIvtIdx;
TTB* pTagIdx; TTB* pTagIdx;
TTB* pTtlIdx; STtlManger* pTtlMgr;
TTB* pCtimeIdx; // table created time idx TTB* pBtimeIdx; // table created time idx
TTB* pNcolIdx; // ncol of table idx, normal table only TTB* pNcolIdx; // ncol of table idx, normal table only
TTB* pSmaIdx; TTB* pSmaIdx;
...@@ -138,20 +139,15 @@ typedef struct { ...@@ -138,20 +139,15 @@ typedef struct {
} STagIdxKey; } STagIdxKey;
#pragma pack(pop) #pragma pack(pop)
typedef struct {
int64_t dtime;
tb_uid_t uid;
} STtlIdxKey;
typedef struct { typedef struct {
tb_uid_t uid; tb_uid_t uid;
int64_t smaUid; int64_t smaUid;
} SSmaIdxKey; } SSmaIdxKey;
typedef struct { typedef struct {
int64_t ctime; int64_t btime;
tb_uid_t uid; tb_uid_t uid;
} SCtimeIdxKey; } SBtimeIdxKey;
typedef struct { typedef struct {
int64_t ncol; int64_t ncol;
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_VNODE_TTL_H_
#define _TD_VNODE_TTL_H_
#include "taosdef.h"
#include "thash.h"
#include "tdb.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef enum DirtyEntryType {
ENTRY_TYPE_DEL = 1,
ENTRY_TYPE_UPSERT = 2,
} DirtyEntryType;
typedef struct STtlManger {
TdThreadRwlock lock;
TTB* pOldTtlIdx; // btree<{deleteTime, tuid}, NULL>
SHashObj* pTtlCache; // key: tuid, value: {ttl, ctime}
SHashObj* pDirtyUids; // dirty tuid
TTB* pTtlIdx; // btree<{deleteTime, tuid}, ttl>
} STtlManger;
typedef struct {
int64_t ttlDays;
int64_t changeTimeMs;
} STtlCacheEntry;
typedef struct {
DirtyEntryType type;
} STtlDirtyEntry;
typedef struct {
int64_t deleteTimeSec;
tb_uid_t uid;
} STtlIdxKey;
typedef struct {
int64_t deleteTimeMs;
tb_uid_t uid;
} STtlIdxKeyV1;
typedef struct {
int64_t ttlDays;
} STtlIdxValue;
typedef struct {
tb_uid_t uid;
int64_t changeTimeMs;
} STtlUpdCtimeCtx;
typedef struct {
tb_uid_t uid;
int64_t changeTimeMs;
int64_t ttlDays;
} STtlUpdTtlCtx;
typedef struct {
tb_uid_t uid;
TXN* pTxn;
} STtlDelTtlCtx;
int ttlMgrOpen(STtlManger** ppTtlMgr, TDB* pEnv, int8_t rollback);
int ttlMgrClose(STtlManger* pTtlMgr);
int ttlMgrBegin(STtlManger* pTtlMgr, void* pMeta);
int ttlMgrConvert(TTB* pOldTtlIdx, TTB* pNewTtlIdx, void* pMeta);
int ttlMgrFlush(STtlManger* pTtlMgr, TXN* pTxn);
int ttlMgrInsertTtl(STtlManger* pTtlMgr, const STtlUpdTtlCtx* pUpdCtx);
int ttlMgrDeleteTtl(STtlManger* pTtlMgr, const STtlDelTtlCtx* pDelCtx);
int ttlMgrUpdateChangeTime(STtlManger* pTtlMgr, const STtlUpdCtimeCtx* pUpdCtimeCtx);
int ttlMgrFindExpired(STtlManger* pTtlMgr, int64_t timePointMs, SArray* pTbUids);
#ifdef __cplusplus
}
#endif
#endif /*_TD_VNODE_TTL_H_*/
...@@ -149,8 +149,9 @@ int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq ...@@ -149,8 +149,9 @@ int metaDropSTable(SMeta* pMeta, int64_t verison, SVDropStbReq* pReq
int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp); int metaCreateTable(SMeta* pMeta, int64_t version, SVCreateTbReq* pReq, STableMetaRsp** pMetaRsp);
int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid); int metaDropTable(SMeta* pMeta, int64_t version, SVDropTbReq* pReq, SArray* tbUids, int64_t* tbUid);
int32_t metaTrimTables(SMeta* pMeta); int32_t metaTrimTables(SMeta* pMeta);
int metaTtlDropTable(SMeta* pMeta, int64_t ttl, SArray* tbUids); int metaTtlDropTable(SMeta* pMeta, int64_t timePointMs, SArray* tbUids);
int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp); int metaAlterTable(SMeta* pMeta, int64_t version, SVAlterTbReq* pReq, STableMetaRsp* pMetaRsp);
int metaUpdateChangeTime(SMeta* pMeta, tb_uid_t uid, int64_t changeTimeMs);
SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); SSchemaWrapper* metaGetTableSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock); STSchema* metaGetTbTSchema(SMeta* pMeta, tb_uid_t uid, int32_t sver, int lock);
int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema); int32_t metaGetTbTSchemaEx(SMeta* pMeta, tb_uid_t suid, tb_uid_t uid, int32_t sver, STSchema** ppTSchema);
...@@ -176,7 +177,6 @@ SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid); ...@@ -176,7 +177,6 @@ SArray* metaGetSmaIdsByTable(SMeta* pMeta, tb_uid_t uid);
SArray* metaGetSmaTbUids(SMeta* pMeta); SArray* metaGetSmaTbUids(SMeta* pMeta);
void* metaGetIdx(SMeta* pMeta); void* metaGetIdx(SMeta* pMeta);
void* metaGetIvtIdx(SMeta* pMeta); void* metaGetIvtIdx(SMeta* pMeta);
int metaTtlSmaller(SMeta* pMeta, uint64_t time, SArray* uidList);
void metaReaderInit(SMetaReader* pReader, SMeta* pMeta, int32_t flags); void metaReaderInit(SMetaReader* pReader, SMeta* pMeta, int32_t flags);
......
...@@ -40,6 +40,12 @@ int metaBegin(SMeta *pMeta, int8_t heap) { ...@@ -40,6 +40,12 @@ int metaBegin(SMeta *pMeta, int8_t heap) {
return -1; return -1;
} }
if (ttlMgrBegin(pMeta->pTtlMgr, pMeta) < 0) {
return -1;
}
tdbCommit(pMeta->pEnv, pMeta->txn);
return 0; return 0;
} }
...@@ -50,6 +56,7 @@ int metaFinishCommit(SMeta *pMeta, TXN *txn) { return tdbPostCommit(pMeta->pEnv ...@@ -50,6 +56,7 @@ int metaFinishCommit(SMeta *pMeta, TXN *txn) { return tdbPostCommit(pMeta->pEnv
int metaPrepareAsyncCommit(SMeta *pMeta) { int metaPrepareAsyncCommit(SMeta *pMeta) {
// return tdbPrepareAsyncCommit(pMeta->pEnv, pMeta->txn); // return tdbPrepareAsyncCommit(pMeta->pEnv, pMeta->txn);
int code = 0; int code = 0;
code = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
code = tdbCommit(pMeta->pEnv, pMeta->txn); code = tdbCommit(pMeta->pEnv, pMeta->txn);
return code; return code;
......
...@@ -31,7 +31,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { ...@@ -31,7 +31,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
if (tEncodeSRSmaParam(pCoder, &pME->stbEntry.rsmaParam) < 0) return -1; if (tEncodeSRSmaParam(pCoder, &pME->stbEntry.rsmaParam) < 0) return -1;
} }
} else if (pME->type == TSDB_CHILD_TABLE) { } else if (pME->type == TSDB_CHILD_TABLE) {
if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1; if (tEncodeI64(pCoder, pME->ctbEntry.btime) < 0) return -1;
if (tEncodeI32(pCoder, pME->ctbEntry.ttlDays) < 0) return -1; if (tEncodeI32(pCoder, pME->ctbEntry.ttlDays) < 0) return -1;
if (tEncodeI32v(pCoder, pME->ctbEntry.commentLen) < 0) return -1; if (tEncodeI32v(pCoder, pME->ctbEntry.commentLen) < 0) return -1;
if (pME->ctbEntry.commentLen > 0) { if (pME->ctbEntry.commentLen > 0) {
...@@ -40,7 +40,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) { ...@@ -40,7 +40,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
if (tEncodeI64(pCoder, pME->ctbEntry.suid) < 0) return -1; if (tEncodeI64(pCoder, pME->ctbEntry.suid) < 0) return -1;
if (tEncodeTag(pCoder, (const STag *)pME->ctbEntry.pTags) < 0) return -1; if (tEncodeTag(pCoder, (const STag *)pME->ctbEntry.pTags) < 0) return -1;
} else if (pME->type == TSDB_NORMAL_TABLE) { } else if (pME->type == TSDB_NORMAL_TABLE) {
if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1; if (tEncodeI64(pCoder, pME->ntbEntry.btime) < 0) return -1;
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1; if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
if (tEncodeI32v(pCoder, pME->ntbEntry.commentLen) < 0) return -1; if (tEncodeI32v(pCoder, pME->ntbEntry.commentLen) < 0) return -1;
if (pME->ntbEntry.commentLen > 0) { if (pME->ntbEntry.commentLen > 0) {
...@@ -76,7 +76,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { ...@@ -76,7 +76,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeSRSmaParam(pCoder, &pME->stbEntry.rsmaParam) < 0) return -1; if (tDecodeSRSmaParam(pCoder, &pME->stbEntry.rsmaParam) < 0) return -1;
} }
} else if (pME->type == TSDB_CHILD_TABLE) { } else if (pME->type == TSDB_CHILD_TABLE) {
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1; if (tDecodeI64(pCoder, &pME->ctbEntry.btime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1; if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
if (tDecodeI32v(pCoder, &pME->ctbEntry.commentLen) < 0) return -1; if (tDecodeI32v(pCoder, &pME->ctbEntry.commentLen) < 0) return -1;
if (pME->ctbEntry.commentLen > 0) { if (pME->ctbEntry.commentLen > 0) {
...@@ -85,7 +85,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) { ...@@ -85,7 +85,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1; if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1;
if (tDecodeTag(pCoder, (STag **)&pME->ctbEntry.pTags) < 0) return -1; // (TODO) if (tDecodeTag(pCoder, (STag **)&pME->ctbEntry.pTags) < 0) return -1; // (TODO)
} else if (pME->type == TSDB_NORMAL_TABLE) { } else if (pME->type == TSDB_NORMAL_TABLE) {
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1; if (tDecodeI64(pCoder, &pME->ntbEntry.btime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1; if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
if (tDecodeI32v(pCoder, &pME->ntbEntry.commentLen) < 0) return -1; if (tDecodeI32v(pCoder, &pME->ntbEntry.commentLen) < 0) return -1;
if (pME->ntbEntry.commentLen > 0) { if (pME->ntbEntry.commentLen > 0) {
......
...@@ -19,12 +19,11 @@ static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen ...@@ -19,12 +19,11 @@ static int tbDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen
static int skmDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int skmDbKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int ctbIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int ctbIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int uidIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int uidIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int smaIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int taskIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int ctimeIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int btimeIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int ncolIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2); static int ncolIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2);
static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); } static int32_t metaInitLock(SMeta *pMeta) { return taosThreadRwlockInit(&pMeta->lock, NULL); }
...@@ -128,8 +127,8 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { ...@@ -128,8 +127,8 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
goto _err; goto _err;
} }
// open pTtlIdx // open pTtlMgr ("ttlv1.idx")
ret = tdbTbOpen("ttl.idx", sizeof(STtlIdxKey), 0, ttlIdxKeyCmpr, pMeta->pEnv, &pMeta->pTtlIdx, 0); ret = ttlMgrOpen(&pMeta->pTtlMgr, pMeta->pEnv, 0);
if (ret < 0) { if (ret < 0) {
metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d, failed to open meta ttl index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
...@@ -143,7 +142,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) { ...@@ -143,7 +142,7 @@ int metaOpen(SVnode *pVnode, SMeta **ppMeta, int8_t rollback) {
} }
// idx table create time // idx table create time
ret = tdbTbOpen("ctime.idx", sizeof(SCtimeIdxKey), 0, ctimeIdxCmpr, pMeta->pEnv, &pMeta->pCtimeIdx, 0); ret = tdbTbOpen("ctime.idx", sizeof(SBtimeIdxKey), 0, btimeIdxCmpr, pMeta->pEnv, &pMeta->pBtimeIdx, 0);
if (ret < 0) { if (ret < 0) {
metaError("vgId:%d, failed to open meta ctime index since %s", TD_VID(pVnode), tstrerror(terrno)); metaError("vgId:%d, failed to open meta ctime index since %s", TD_VID(pVnode), tstrerror(terrno));
goto _err; goto _err;
...@@ -184,9 +183,9 @@ _err: ...@@ -184,9 +183,9 @@ _err:
if (pMeta->pIdx) metaCloseIdx(pMeta); if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx); if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx);
if (pMeta->pCtimeIdx) tdbTbClose(pMeta->pCtimeIdx); if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx); if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr);
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx); if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx); if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
...@@ -209,9 +208,9 @@ int metaClose(SMeta **ppMeta) { ...@@ -209,9 +208,9 @@ int metaClose(SMeta **ppMeta) {
if (pMeta->pIdx) metaCloseIdx(pMeta); if (pMeta->pIdx) metaCloseIdx(pMeta);
if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb); if (pMeta->pStreamDb) tdbTbClose(pMeta->pStreamDb);
if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx); if (pMeta->pNcolIdx) tdbTbClose(pMeta->pNcolIdx);
if (pMeta->pCtimeIdx) tdbTbClose(pMeta->pCtimeIdx); if (pMeta->pBtimeIdx) tdbTbClose(pMeta->pBtimeIdx);
if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx); if (pMeta->pSmaIdx) tdbTbClose(pMeta->pSmaIdx);
if (pMeta->pTtlIdx) tdbTbClose(pMeta->pTtlIdx); if (pMeta->pTtlMgr) ttlMgrClose(pMeta->pTtlMgr);
if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx); if (pMeta->pTagIvtIdx) indexClose(pMeta->pTagIvtIdx);
if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx); if (pMeta->pTagIdx) tdbTbClose(pMeta->pTagIdx);
if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx); if (pMeta->pCtbIdx) tdbTbClose(pMeta->pCtbIdx);
...@@ -399,37 +398,18 @@ static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL ...@@ -399,37 +398,18 @@ static int tagIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kL
return 0; return 0;
} }
static int ttlIdxKeyCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) { static int btimeIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
STtlIdxKey *pTtlIdxKey1 = (STtlIdxKey *)pKey1; SBtimeIdxKey *pBtimeIdxKey1 = (SBtimeIdxKey *)pKey1;
STtlIdxKey *pTtlIdxKey2 = (STtlIdxKey *)pKey2; SBtimeIdxKey *pBtimeIdxKey2 = (SBtimeIdxKey *)pKey2;
if (pBtimeIdxKey1->btime > pBtimeIdxKey2->btime) {
if (pTtlIdxKey1->dtime > pTtlIdxKey2->dtime) {
return 1;
} else if (pTtlIdxKey1->dtime < pTtlIdxKey2->dtime) {
return -1;
}
if (pTtlIdxKey1->uid > pTtlIdxKey2->uid) {
return 1;
} else if (pTtlIdxKey1->uid < pTtlIdxKey2->uid) {
return -1;
}
return 0;
}
static int ctimeIdxCmpr(const void *pKey1, int kLen1, const void *pKey2, int kLen2) {
SCtimeIdxKey *pCtimeIdxKey1 = (SCtimeIdxKey *)pKey1;
SCtimeIdxKey *pCtimeIdxKey2 = (SCtimeIdxKey *)pKey2;
if (pCtimeIdxKey1->ctime > pCtimeIdxKey2->ctime) {
return 1; return 1;
} else if (pCtimeIdxKey1->ctime < pCtimeIdxKey2->ctime) { } else if (pBtimeIdxKey1->btime < pBtimeIdxKey2->btime) {
return -1; return -1;
} }
if (pCtimeIdxKey1->uid > pCtimeIdxKey2->uid) { if (pBtimeIdxKey1->uid > pBtimeIdxKey2->uid) {
return 1; return 1;
} else if (pCtimeIdxKey1->uid < pCtimeIdxKey2->uid) { } else if (pBtimeIdxKey1->uid < pBtimeIdxKey2->uid) {
return -1; return -1;
} }
......
...@@ -212,6 +212,29 @@ int metaReadNext(SMetaReader *pReader) { ...@@ -212,6 +212,29 @@ int metaReadNext(SMetaReader *pReader) {
return 0; return 0;
} }
int metaGetTableTtlByUid(void *meta, uint64_t uid, int64_t *ttlDays) {
int code = -1;
SMetaReader mr = {0};
metaReaderInit(&mr, (SMeta *)meta, 0);
code = metaReaderGetTableEntryByUid(&mr, uid);
if (code < 0) {
goto _exit;
}
if (mr.me.type == TSDB_CHILD_TABLE) {
*ttlDays = mr.me.ctbEntry.ttlDays;
} else if (mr.me.type == TSDB_NORMAL_TABLE) {
*ttlDays = mr.me.ntbEntry.ttlDays;
} else {
goto _exit;
}
code = 0;
_exit:
metaReaderClear(&mr);
return code;
}
#if 1 // =================================================== #if 1 // ===================================================
SMTbCursor *metaOpenTbCursor(void *pVnode) { SMTbCursor *metaOpenTbCursor(void *pVnode) {
SMTbCursor *pTbCur = NULL; SMTbCursor *pTbCur = NULL;
...@@ -387,37 +410,6 @@ _err: ...@@ -387,37 +410,6 @@ _err:
return NULL; return NULL;
} }
int metaTtlSmaller(SMeta *pMeta, uint64_t ttl, SArray *uidList) {
TBC *pCur;
int ret = tdbTbcOpen(pMeta->pTtlIdx, &pCur, NULL);
if (ret < 0) {
return ret;
}
STtlIdxKey ttlKey = {0};
ttlKey.dtime = ttl;
ttlKey.uid = INT64_MAX;
int c = 0;
tdbTbcMoveTo(pCur, &ttlKey, sizeof(ttlKey), &c);
if (c < 0) {
tdbTbcMoveToPrev(pCur);
}
void *pKey = NULL;
int kLen = 0;
while (1) {
ret = tdbTbcPrev(pCur, &pKey, &kLen, NULL, NULL);
if (ret < 0) {
break;
}
ttlKey = *(STtlIdxKey *)pKey;
taosArrayPush(uidList, &ttlKey.uid);
}
tdbFree(pKey);
tdbTbcClose(pCur);
return 0;
}
struct SMCtbCursor { struct SMCtbCursor {
SMeta *pMeta; SMeta *pMeta;
TBC *pCur; TBC *pCur;
...@@ -1018,17 +1010,17 @@ int32_t metaFilterCreateTime(void *pVnode, SMetaFltParam *arg, SArray *pUids) { ...@@ -1018,17 +1010,17 @@ int32_t metaFilterCreateTime(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
pCursor->type = param->type; pCursor->type = param->type;
metaRLock(pMeta); metaRLock(pMeta);
ret = tdbTbcOpen(pMeta->pCtimeIdx, &pCursor->pCur, NULL); ret = tdbTbcOpen(pMeta->pBtimeIdx, &pCursor->pCur, NULL);
if (ret != 0) { if (ret != 0) {
goto END; goto END;
} }
int64_t uidLimit = param->reverse ? INT64_MAX : 0; int64_t uidLimit = param->reverse ? INT64_MAX : 0;
SCtimeIdxKey ctimeKey = {.ctime = *(int64_t *)(param->val), .uid = uidLimit}; SBtimeIdxKey btimeKey = {.btime = *(int64_t *)(param->val), .uid = uidLimit};
SCtimeIdxKey *pCtimeKey = &ctimeKey; SBtimeIdxKey *pBtimeKey = &btimeKey;
int cmp = 0; int cmp = 0;
if (tdbTbcMoveTo(pCursor->pCur, &ctimeKey, sizeof(ctimeKey), &cmp) < 0) { if (tdbTbcMoveTo(pCursor->pCur, &btimeKey, sizeof(btimeKey), &cmp) < 0) {
goto END; goto END;
} }
...@@ -1042,10 +1034,10 @@ int32_t metaFilterCreateTime(void *pVnode, SMetaFltParam *arg, SArray *pUids) { ...@@ -1042,10 +1034,10 @@ int32_t metaFilterCreateTime(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, NULL, NULL); valid = tdbTbcGet(pCursor->pCur, (const void **)&entryKey, &nEntryKey, NULL, NULL);
if (valid < 0) break; if (valid < 0) break;
SCtimeIdxKey *p = entryKey; SBtimeIdxKey *p = entryKey;
if (count > TRY_ERROR_LIMIT) break; if (count > TRY_ERROR_LIMIT) break;
int32_t cmp = (*param->filterFunc)((void *)&p->ctime, (void *)&pCtimeKey->ctime, param->type); int32_t cmp = (*param->filterFunc)((void *)&p->btime, (void *)&pBtimeKey->btime, param->type);
if (cmp == 0) if (cmp == 0)
taosArrayPush(pUids, &p->uid); taosArrayPush(pUids, &p->uid);
else { else {
...@@ -1149,7 +1141,7 @@ int32_t metaFilterTtl(void *pVnode, SMetaFltParam *arg, SArray *pUids) { ...@@ -1149,7 +1141,7 @@ int32_t metaFilterTtl(void *pVnode, SMetaFltParam *arg, SArray *pUids) {
pCursor->type = param->type; pCursor->type = param->type;
metaRLock(pMeta); metaRLock(pMeta);
ret = tdbTbcOpen(pMeta->pTtlIdx, &pCursor->pCur, NULL); //ret = tdbTbcOpen(pMeta->pTtlIdx, &pCursor->pCur, NULL);
END: END:
if (pCursor->pMeta) metaULock(pCursor->pMeta); if (pCursor->pMeta) metaULock(pCursor->pMeta);
......
...@@ -20,7 +20,7 @@ static int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, con ...@@ -20,7 +20,7 @@ static int metaDelJsonVarFromIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry, con
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME); static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateUidIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME);
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME); static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateSuidIdx(SMeta *pMeta, const SMetaEntry *pME);
...@@ -28,8 +28,8 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry); ...@@ -28,8 +28,8 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry);
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type); static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type);
static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey); static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey);
// opt ins_tables query // opt ins_tables query
static int metaUpdateCtimeIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateBtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaDeleteCtimeIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME); static int metaDeleteNcolIdx(SMeta *pMeta, const SMetaEntry *pME);
...@@ -734,7 +734,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs ...@@ -734,7 +734,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
me.uid = pReq->uid; me.uid = pReq->uid;
me.name = pReq->name; me.name = pReq->name;
if (me.type == TSDB_CHILD_TABLE) { if (me.type == TSDB_CHILD_TABLE) {
me.ctbEntry.ctime = pReq->ctime; me.ctbEntry.btime = pReq->btime;
me.ctbEntry.ttlDays = pReq->ttl; me.ctbEntry.ttlDays = pReq->ttl;
me.ctbEntry.commentLen = pReq->commentLen; me.ctbEntry.commentLen = pReq->commentLen;
me.ctbEntry.comment = pReq->comment; me.ctbEntry.comment = pReq->comment;
...@@ -770,7 +770,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs ...@@ -770,7 +770,7 @@ int metaCreateTable(SMeta *pMeta, int64_t ver, SVCreateTbReq *pReq, STableMetaRs
metaTbGroupCacheClear(pMeta, me.ctbEntry.suid); metaTbGroupCacheClear(pMeta, me.ctbEntry.suid);
metaULock(pMeta); metaULock(pMeta);
} else { } else {
me.ntbEntry.ctime = pReq->ctime; me.ntbEntry.btime = pReq->btime;
me.ntbEntry.ttlDays = pReq->ttl; me.ntbEntry.ttlDays = pReq->ttl;
me.ntbEntry.commentLen = pReq->commentLen; me.ntbEntry.commentLen = pReq->commentLen;
me.ntbEntry.comment = pReq->comment; me.ntbEntry.comment = pReq->comment;
...@@ -923,50 +923,40 @@ end: ...@@ -923,50 +923,40 @@ end:
return code; return code;
} }
int metaTtlDropTable(SMeta *pMeta, int64_t ttl, SArray *tbUids) { int metaTtlDropTable(SMeta *pMeta, int64_t timePointMs, SArray *tbUids) {
int ret = metaTtlSmaller(pMeta, ttl, tbUids); int ret = ttlMgrFlush(pMeta->pTtlMgr, pMeta->txn);
if (ret != 0) { if (ret != 0) {
metaError("ttl failed to flush, ret:%d", ret);
return ret;
}
ret = ttlMgrFindExpired(pMeta->pTtlMgr, timePointMs, tbUids);
if (ret != 0) {
metaError("ttl failed to find expired table, ret:%d", ret);
return ret; return ret;
} }
if (TARRAY_SIZE(tbUids) == 0) { if (TARRAY_SIZE(tbUids) == 0) {
return 0; return 0;
} }
metaInfo("ttl find expired table count: %zu" , TARRAY_SIZE(tbUids));
metaDropTables(pMeta, tbUids); metaDropTables(pMeta, tbUids);
return 0; return 0;
} }
static void metaBuildTtlIdxKey(STtlIdxKey *ttlKey, const SMetaEntry *pME) { static int metaBuildBtimeIdxKey(SBtimeIdxKey *btimeKey, const SMetaEntry *pME) {
int64_t ttlDays = 0; int64_t btime;
int64_t ctime = 0;
if (pME->type == TSDB_CHILD_TABLE) { if (pME->type == TSDB_CHILD_TABLE) {
ctime = pME->ctbEntry.ctime; btime = pME->ctbEntry.btime;
ttlDays = pME->ctbEntry.ttlDays;
} else if (pME->type == TSDB_NORMAL_TABLE) { } else if (pME->type == TSDB_NORMAL_TABLE) {
ctime = pME->ntbEntry.ctime; btime = pME->ntbEntry.btime;
ttlDays = pME->ntbEntry.ttlDays;
} else {
metaError("meta/table: invalide table type: %" PRId8 " build ttl idx key failed.", pME->type);
return;
}
if (ttlDays <= 0) return;
ttlKey->dtime = ctime / 1000 + ttlDays * tsTtlUnit;
ttlKey->uid = pME->uid;
}
static int metaBuildCtimeIdxKey(SCtimeIdxKey *ctimeKey, const SMetaEntry *pME) {
int64_t ctime;
if (pME->type == TSDB_CHILD_TABLE) {
ctime = pME->ctbEntry.ctime;
} else if (pME->type == TSDB_NORMAL_TABLE) {
ctime = pME->ntbEntry.ctime;
} else { } else {
return -1; return -1;
} }
ctimeKey->ctime = ctime; btimeKey->btime = btime;
ctimeKey->uid = pME->uid; btimeKey->uid = pME->uid;
return 0; return 0;
} }
...@@ -980,11 +970,9 @@ static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) { ...@@ -980,11 +970,9 @@ static int metaBuildNColIdxKey(SNcolIdxKey *ncolKey, const SMetaEntry *pME) {
return 0; return 0;
} }
static int metaDeleteTtlIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaDeleteTtl(SMeta *pMeta, const SMetaEntry *pME) {
STtlIdxKey ttlKey = {0}; STtlDelTtlCtx ctx = {.uid = pME->uid, .pTxn = pMeta->txn};
metaBuildTtlIdxKey(&ttlKey, pME); return ttlMgrDeleteTtl(pMeta->pTtlMgr, &ctx);
if (ttlKey.dtime == 0) return 0;
return tdbTbDelete(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), pMeta->txn);
} }
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
...@@ -1066,10 +1054,10 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { ...@@ -1066,10 +1054,10 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
tdbTbDelete(pMeta->pNameIdx, e.name, strlen(e.name) + 1, pMeta->txn); tdbTbDelete(pMeta->pNameIdx, e.name, strlen(e.name) + 1, pMeta->txn);
tdbTbDelete(pMeta->pUidIdx, &uid, sizeof(uid), pMeta->txn); tdbTbDelete(pMeta->pUidIdx, &uid, sizeof(uid), pMeta->txn);
if (e.type == TSDB_CHILD_TABLE || e.type == TSDB_NORMAL_TABLE) metaDeleteCtimeIdx(pMeta, &e); if (e.type == TSDB_CHILD_TABLE || e.type == TSDB_NORMAL_TABLE) metaDeleteBtimeIdx(pMeta, &e);
if (e.type == TSDB_NORMAL_TABLE) metaDeleteNcolIdx(pMeta, &e); if (e.type == TSDB_NORMAL_TABLE) metaDeleteNcolIdx(pMeta, &e);
if (e.type != TSDB_SUPER_TABLE) metaDeleteTtlIdx(pMeta, &e); if (e.type != TSDB_SUPER_TABLE) metaDeleteTtl(pMeta, &e);
if (e.type == TSDB_CHILD_TABLE) { if (e.type == TSDB_CHILD_TABLE) {
tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), pMeta->txn); tdbTbDelete(pMeta->pCtbIdx, &(SCtbIdxKey){.suid = e.ctbEntry.suid, .uid = uid}, sizeof(SCtbIdxKey), pMeta->txn);
...@@ -1102,23 +1090,23 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) { ...@@ -1102,23 +1090,23 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
return 0; return 0;
} }
// opt ins_tables // opt ins_tables
int metaUpdateCtimeIdx(SMeta *pMeta, const SMetaEntry *pME) { int metaUpdateBtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
SCtimeIdxKey ctimeKey = {0}; SBtimeIdxKey btimeKey = {0};
if (metaBuildCtimeIdxKey(&ctimeKey, pME) < 0) { if (metaBuildBtimeIdxKey(&btimeKey, pME) < 0) {
return 0; return 0;
} }
metaTrace("vgId:%d, start to save version:%" PRId64 " uid:%" PRId64 " ctime:%" PRId64, TD_VID(pMeta->pVnode), metaTrace("vgId:%d, start to save version:%" PRId64 " uid:%" PRId64 " btime:%" PRId64, TD_VID(pMeta->pVnode),
pME->version, pME->uid, ctimeKey.ctime); pME->version, pME->uid, btimeKey.btime);
return tdbTbUpsert(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), NULL, 0, pMeta->txn); return tdbTbUpsert(pMeta->pBtimeIdx, &btimeKey, sizeof(btimeKey), NULL, 0, pMeta->txn);
} }
int metaDeleteCtimeIdx(SMeta *pMeta, const SMetaEntry *pME) { int metaDeleteBtimeIdx(SMeta *pMeta, const SMetaEntry *pME) {
SCtimeIdxKey ctimeKey = {0}; SBtimeIdxKey btimeKey = {0};
if (metaBuildCtimeIdxKey(&ctimeKey, pME) < 0) { if (metaBuildBtimeIdxKey(&btimeKey, pME) < 0) {
return 0; return 0;
} }
return tdbTbDelete(pMeta->pCtimeIdx, &ctimeKey, sizeof(ctimeKey), pMeta->txn); return tdbTbDelete(pMeta->pBtimeIdx, &btimeKey, sizeof(btimeKey), pMeta->txn);
} }
int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME) { int metaUpdateNcolIdx(SMeta *pMeta, const SMetaEntry *pME) {
SNcolIdxKey ncolKey = {0}; SNcolIdxKey ncolKey = {0};
...@@ -1328,6 +1316,8 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl ...@@ -1328,6 +1316,8 @@ static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAl
metaULock(pMeta); metaULock(pMeta);
metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs);
metaUpdateMetaRsp(uid, pAlterTbReq->tbName, pSchema, pMetaRsp); metaUpdateMetaRsp(uid, pAlterTbReq->tbName, pSchema, pMetaRsp);
if (entry.pBuf) taosMemoryFree(entry.pBuf); if (entry.pBuf) taosMemoryFree(entry.pBuf);
...@@ -1515,6 +1505,8 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA ...@@ -1515,6 +1505,8 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaULock(pMeta); metaULock(pMeta);
metaUpdateChangeTime(pMeta, ctbEntry.uid, pAlterTbReq->ctimeMs);
tDecoderClear(&dc1); tDecoderClear(&dc1);
tDecoderClear(&dc2); tDecoderClear(&dc2);
taosMemoryFree((void *)ctbEntry.ctbEntry.pTags); taosMemoryFree((void *)ctbEntry.ctbEntry.pTags);
...@@ -1603,9 +1595,9 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p ...@@ -1603,9 +1595,9 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
// build SMetaEntry // build SMetaEntry
if (entry.type == TSDB_CHILD_TABLE) { if (entry.type == TSDB_CHILD_TABLE) {
if (pAlterTbReq->updateTTL) { if (pAlterTbReq->updateTTL) {
metaDeleteTtlIdx(pMeta, &entry); metaDeleteTtl(pMeta, &entry);
entry.ctbEntry.ttlDays = pAlterTbReq->newTTL; entry.ctbEntry.ttlDays = pAlterTbReq->newTTL;
metaUpdateTtlIdx(pMeta, &entry); metaUpdateTtl(pMeta, &entry);
} }
if (pAlterTbReq->newCommentLen >= 0) { if (pAlterTbReq->newCommentLen >= 0) {
entry.ctbEntry.commentLen = pAlterTbReq->newCommentLen; entry.ctbEntry.commentLen = pAlterTbReq->newCommentLen;
...@@ -1613,9 +1605,9 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p ...@@ -1613,9 +1605,9 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
} }
} else { } else {
if (pAlterTbReq->updateTTL) { if (pAlterTbReq->updateTTL) {
metaDeleteTtlIdx(pMeta, &entry); metaDeleteTtl(pMeta, &entry);
entry.ntbEntry.ttlDays = pAlterTbReq->newTTL; entry.ntbEntry.ttlDays = pAlterTbReq->newTTL;
metaUpdateTtlIdx(pMeta, &entry); metaUpdateTtl(pMeta, &entry);
} }
if (pAlterTbReq->newCommentLen >= 0) { if (pAlterTbReq->newCommentLen >= 0) {
entry.ntbEntry.commentLen = pAlterTbReq->newCommentLen; entry.ntbEntry.commentLen = pAlterTbReq->newCommentLen;
...@@ -1628,6 +1620,8 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p ...@@ -1628,6 +1620,8 @@ static int metaUpdateTableOptions(SMeta *pMeta, int64_t version, SVAlterTbReq *p
metaUpdateUidIdx(pMeta, &entry); metaUpdateUidIdx(pMeta, &entry);
metaULock(pMeta); metaULock(pMeta);
metaUpdateChangeTime(pMeta, entry.uid, pAlterTbReq->ctimeMs);
tdbTbcClose(pTbDbc); tdbTbcClose(pTbDbc);
tdbTbcClose(pUidIdxc); tdbTbcClose(pUidIdxc);
tDecoderClear(&dc); tDecoderClear(&dc);
...@@ -1967,11 +1961,28 @@ static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -1967,11 +1961,28 @@ static int metaUpdateNameIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbTbUpsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), pMeta->txn); return tdbTbUpsert(pMeta->pNameIdx, pME->name, strlen(pME->name) + 1, &pME->uid, sizeof(tb_uid_t), pMeta->txn);
} }
static int metaUpdateTtlIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateTtl(SMeta *pMeta, const SMetaEntry *pME) {
STtlIdxKey ttlKey = {0}; if (pME->type != TSDB_CHILD_TABLE && pME->type != TSDB_NORMAL_TABLE) return 0;
metaBuildTtlIdxKey(&ttlKey, pME);
if (ttlKey.dtime == 0) return 0; STtlUpdTtlCtx ctx = {.uid = pME->uid};
return tdbTbUpsert(pMeta->pTtlIdx, &ttlKey, sizeof(ttlKey), NULL, 0, pMeta->txn);
if (pME->type == TSDB_CHILD_TABLE) {
ctx.ttlDays = pME->ctbEntry.ttlDays;
ctx.changeTimeMs = pME->ctbEntry.btime;
} else {
ctx.ttlDays = pME->ntbEntry.ttlDays;
ctx.changeTimeMs = pME->ntbEntry.btime;
}
return ttlMgrInsertTtl(pMeta->pTtlMgr, &ctx);
}
int metaUpdateChangeTime(SMeta *pMeta, tb_uid_t uid, int64_t changeTimeMs) {
if (!tsTtlChangeOnWrite) return 0;
STtlUpdCtimeCtx ctx = {.uid = uid, .changeTimeMs = changeTimeMs};
return ttlMgrUpdateChangeTime(pMeta->pTtlMgr, &ctx);
} }
static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) { static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
...@@ -2182,7 +2193,7 @@ int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -2182,7 +2193,7 @@ int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
} }
} }
code = metaUpdateCtimeIdx(pMeta, pME); code = metaUpdateBtimeIdx(pMeta, pME);
VND_CHECK_CODE(code, line, _err); VND_CHECK_CODE(code, line, _err);
if (pME->type == TSDB_NORMAL_TABLE) { if (pME->type == TSDB_NORMAL_TABLE) {
...@@ -2191,7 +2202,7 @@ int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) { ...@@ -2191,7 +2202,7 @@ int metaHandleEntry(SMeta *pMeta, const SMetaEntry *pME) {
} }
if (pME->type != TSDB_SUPER_TABLE) { if (pME->type != TSDB_SUPER_TABLE) {
code = metaUpdateTtlIdx(pMeta, pME); code = metaUpdateTtl(pMeta, pME);
VND_CHECK_CODE(code, line, _err); VND_CHECK_CODE(code, line, _err);
} }
......
此差异已折叠。
...@@ -29,7 +29,7 @@ int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) { ...@@ -29,7 +29,7 @@ int32_t tdProcessTSmaInsert(SSma *pSma, int64_t indexUid, const char *msg) {
int32_t code = TSDB_CODE_SUCCESS; int32_t code = TSDB_CODE_SUCCESS;
if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) { if ((code = tdProcessTSmaInsertImpl(pSma, indexUid, msg)) < 0) {
smaWarn("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno)); smaError("vgId:%d, insert tsma data failed since %s", SMA_VID(pSma), tstrerror(terrno));
} }
return code; return code;
...@@ -346,6 +346,43 @@ _end: ...@@ -346,6 +346,43 @@ _end:
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
static int32_t tsmaProcessDelReq(SSma *pSma, int64_t indexUid, SBatchDeleteReq *pDelReq) {
int32_t code = 0;
int32_t lino = 0;
if (taosArrayGetSize(pDelReq->deleteReqs) > 0) {
int32_t len = 0;
tEncodeSize(tEncodeSBatchDeleteReq, pDelReq, len, code);
TSDB_CHECK_CODE(code, lino, _exit);
void *pBuf = rpcMallocCont(len + sizeof(SMsgHead));
if (!pBuf) {
code = terrno;
TSDB_CHECK_CODE(code, lino, _exit);
}
SEncoder encoder;
tEncoderInit(&encoder, POINTER_SHIFT(pBuf, sizeof(SMsgHead)), len);
tEncodeSBatchDeleteReq(&encoder, pDelReq);
tEncoderClear(&encoder);
((SMsgHead *)pBuf)->vgId = TD_VID(pSma->pVnode);
SRpcMsg delMsg = {.msgType = TDMT_VND_BATCH_DEL, .pCont = pBuf, .contLen = len + sizeof(SMsgHead)};
code = tmsgPutToQueue(&pSma->pVnode->msgCb, WRITE_QUEUE, &delMsg);
TSDB_CHECK_CODE(code, lino, _exit);
}
_exit:
taosArrayDestroy(pDelReq->deleteReqs);
if (code) {
smaError("vgId:%d, failed at line %d to process delete req for smaIndex %" PRIi64 " since %s", SMA_VID(pSma), lino,
indexUid, tstrerror(code));
}
return code;
}
/** /**
* @brief Insert/Update Time-range-wise SMA data. * @brief Insert/Update Time-range-wise SMA data.
* *
...@@ -355,7 +392,6 @@ _end: ...@@ -355,7 +392,6 @@ _end:
*/ */
static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) { static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char *msg) {
const SArray *pDataBlocks = (const SArray *)msg; const SArray *pDataBlocks = (const SArray *)msg;
// TODO: destroy SSDataBlocks(msg)
if (!pDataBlocks) { if (!pDataBlocks) {
terrno = TSDB_CODE_TSMA_INVALID_PTR; terrno = TSDB_CODE_TSMA_INVALID_PTR;
smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is NULL", SMA_VID(pSma)); smaWarn("vgId:%d, insert tsma data failed since pDataBlocks is NULL", SMA_VID(pSma));
...@@ -419,8 +455,10 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char ...@@ -419,8 +455,10 @@ static int32_t tdProcessTSmaInsertImpl(SSma *pSma, int64_t indexUid, const char
goto _err; goto _err;
} }
// TODO deleteReq if ((terrno = tsmaProcessDelReq(pSma, indexUid, &deleteReq)) != 0) {
taosArrayDestroy(deleteReq.deleteReqs); goto _err;
}
#if 0 #if 0
if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) { if (!strncasecmp("td.tsma.rst.tb", pTsmaStat->pTSma->dstTbName, 14)) {
terrno = TSDB_CODE_APP_ERROR; terrno = TSDB_CODE_APP_ERROR;
......
...@@ -145,7 +145,7 @@ typedef struct SExplainCtx { ...@@ -145,7 +145,7 @@ typedef struct SExplainCtx {
SHashObj *groupHash; // Hash<SExplainGroup> SHashObj *groupHash; // Hash<SExplainGroup>
} SExplainCtx; } SExplainCtx;
#define EXPLAIN_ORDER_STRING(_order) ((ORDER_ASC == _order) ? "asc" : "desc") #define EXPLAIN_ORDER_STRING(_order) ((ORDER_ASC == _order) ? "asc" : ORDER_DESC == _order ? "desc" : "unknown")
#define EXPLAIN_JOIN_STRING(_type) ((JOIN_TYPE_INNER == _type) ? "Inner join" : "Join") #define EXPLAIN_JOIN_STRING(_type) ((JOIN_TYPE_INNER == _type) ? "Inner join" : "Join")
#define INVERAL_TIME_FROM_PRECISION_TO_UNIT(_t, _u, _p) (((_u) == 'n' || (_u) == 'y') ? (_t) : (convertTimeFromPrecisionToUnit(_t, _p, _u))) #define INVERAL_TIME_FROM_PRECISION_TO_UNIT(_t, _u, _p) (((_u) == 'n' || (_u) == 'y') ? (_t) : (convertTimeFromPrecisionToUnit(_t, _p, _u)))
......
...@@ -499,6 +499,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -499,6 +499,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pPrjNode->pProjections->length); EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pPrjNode->pProjections->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pPrjNode->node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pPrjNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pPrjNode->node.inputTsOrder));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
...@@ -544,6 +547,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -544,6 +547,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pJoinNode->pTargets->length); EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, pJoinNode->pTargets->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pJoinNode->node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pJoinNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pJoinNode->node.inputTsOrder));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
...@@ -597,6 +603,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -597,6 +603,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_GROUPS_FORMAT, pAggNode->pGroupKeys->length); EXPLAIN_ROW_APPEND(EXPLAIN_GROUPS_FORMAT, pAggNode->pGroupKeys->length);
} }
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pAggNode->node.inputTsOrder));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
...@@ -716,6 +725,11 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -716,6 +725,11 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
case QUERY_NODE_PHYSICAL_PLAN_SORT: { case QUERY_NODE_PHYSICAL_PLAN_SORT: {
SSortPhysiNode *pSortNode = (SSortPhysiNode *)pNode; SSortPhysiNode *pSortNode = (SSortPhysiNode *)pNode;
EXPLAIN_ROW_NEW(level, EXPLAIN_SORT_FORMAT); EXPLAIN_ROW_NEW(level, EXPLAIN_SORT_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pSortNode->node.inputTsOrder));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT, EXPLAIN_ORDER_STRING(pSortNode->node.outputTsOrder));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_LEFT_PARENTHESIS_FORMAT);
if (pResNode->pExecInfo) { if (pResNode->pExecInfo) {
QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen)); QRY_ERR_RET(qExplainBufAppendExecInfo(pResNode->pExecInfo, tbuf, &tlen));
...@@ -796,9 +810,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -796,9 +810,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIntNode->window.node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIntNode->window.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pIntNode->window.inputTsOrder)); EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pIntNode->window.node.inputTsOrder));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT, EXPLAIN_ORDER_STRING(pIntNode->window.node.outputTsOrder));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT, EXPLAIN_ORDER_STRING(pIntNode->window.outputTsOrder));
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
...@@ -847,6 +862,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -847,6 +862,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pIntNode->window.pFuncs->length); EXPLAIN_ROW_APPEND(EXPLAIN_FUNCTIONS_FORMAT, pIntNode->window.pFuncs->length);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIntNode->window.node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pIntNode->window.node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pIntNode->window.node.inputTsOrder));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT, EXPLAIN_ORDER_STRING(pIntNode->window.node.outputTsOrder));
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
...@@ -895,6 +914,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -895,6 +914,9 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_MODE_FORMAT, nodesGetFillModeString(pFillNode->mode)); EXPLAIN_ROW_APPEND(EXPLAIN_MODE_FORMAT, nodesGetFillModeString(pFillNode->mode));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pFillNode->node.pOutputDataBlockDesc->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pFillNode->node.pOutputDataBlockDesc->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pFillNode->node.inputTsOrder));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
...@@ -1080,6 +1102,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i ...@@ -1080,6 +1102,10 @@ int32_t qExplainResNodeToRowsImpl(SExplainResNode *pResNode, SExplainCtx *ctx, i
EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, nodesGetOutputNumFromSlotList(pDescNode->pSlots)); EXPLAIN_ROW_APPEND(EXPLAIN_COLUMNS_FORMAT, nodesGetOutputNumFromSlotList(pDescNode->pSlots));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDescNode->totalRowSize); EXPLAIN_ROW_APPEND(EXPLAIN_WIDTH_FORMAT, pDescNode->totalRowSize);
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_INPUT_ORDER_FORMAT, EXPLAIN_ORDER_STRING(pMergeNode->node.inputTsOrder));
EXPLAIN_ROW_APPEND(EXPLAIN_BLANK_FORMAT);
EXPLAIN_ROW_APPEND(EXPLAIN_OUTPUT_ORDER_TYPE_FORMAT, EXPLAIN_ORDER_STRING(pMergeNode->node.outputTsOrder));
EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT); EXPLAIN_ROW_APPEND(EXPLAIN_RIGHT_PARENTHESIS_FORMAT);
EXPLAIN_ROW_END(); EXPLAIN_ROW_END();
QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level)); QRY_ERR_RET(qExplainResAppendRow(ctx, tbuf, tlen, level));
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册