提交 c2f70230 编写于 作者: L Liu Jicong

Merge branch '3.0' into feature/tq

......@@ -35,7 +35,7 @@ endif(${BUILD_TEST})
add_subdirectory(source)
add_subdirectory(tools)
add_subdirectory(tests)
add_subdirectory(example)
add_subdirectory(examples/c)
# docs
add_subdirectory(docs)
......
......@@ -46,7 +46,7 @@ ENDIF ()
IF (TD_WINDOWS)
MESSAGE("${Yellow} set compiler flag for Windows! ${ColourReset}")
SET(COMMON_FLAGS "/w /D_WIN32 /Zi")
SET(COMMON_FLAGS "/w /D_WIN32 /DWIN32 /Zi")
SET(CMAKE_EXE_LINKER_FLAGS "${CMAKE_EXE_LINKER_FLAGS} /MANIFEST:NO")
# IF (MSVC AND (MSVC_VERSION GREATER_EQUAL 1900))
# SET(COMMON_FLAGS "${COMMON_FLAGS} /Wv:18")
......
---
sidebar_label: SQL 函数
title: SQL 函数
sidebar_label: 函数
title: 函数
toc_max_heading_level: 4
---
......@@ -20,7 +20,7 @@ toc_max_heading_level: 4
**返回结果类型**:如果输入值为整数,输出值是 UBIGINT 类型。如果输入值是 FLOAT/DOUBLE 数据类型,输出值是 DOUBLE 数据类型。
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -38,7 +38,7 @@ toc_max_heading_level: 4
**返回结果类型**:DOUBLE。如果输入值为 NULL,输出值也为 NULL
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -56,7 +56,7 @@ toc_max_heading_level: 4
**返回结果类型**:DOUBLE。如果输入值为 NULL,输出值也为 NULL
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -75,7 +75,7 @@ toc_max_heading_level: 4
**返回结果类型**:DOUBLE。如果输入值为 NULL,输出值也为 NULL
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -94,7 +94,7 @@ SELECT CEIL(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:与指定列的原始数据类型一致。例如,如果指定列的原始数据类型为 Float,那么返回的数据类型也为 Float;如果指定列的原始数据类型为 Double,那么返回的数据类型也为 Double。
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列,无论 tag 列的类型是什么类型。
**适用数据类型**数值类型。
**适用于**: 普通表、超级表。
......@@ -115,7 +115,7 @@ SELECT CEIL(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:DOUBLE。如果输入值为 NULL,输出值也为 NULL
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -142,7 +142,7 @@ SELECT FLOOR(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:DOUBLE。如果输入值为 NULL,输出值也为 NULL
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -161,7 +161,7 @@ SELECT FLOOR(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:DOUBLE。如果输入值为 NULL,输出值也为 NULL
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -190,7 +190,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:DOUBLE。如果输入值为 NULL,输出值也为 NULL
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -208,7 +208,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:DOUBLE。如果输入值为 NULL,输出值也为 NULL
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -226,7 +226,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:DOUBLE。如果输入值为 NULL,输出值也为 NULL
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在 tag 列
**适用数据类型**数值类型。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -248,7 +248,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:INT。如果输入值为NULL,输出值为NULL。
**适用数据类型**输入参数是 BINARY 类型或者 NCHAR 类型的字符串或者列。不能应用在 TAG 列。
**适用数据类型**VARCHAR, NCHAR
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -262,9 +262,9 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**功能说明**:字符串连接函数。
**返回结果类型**:如果所有参数均为BINARY类型,则结果类型为BINARY。如果参数包含NCHAR类型,则结果类型为NCHAR。如果输入值为NULL,输出值为NULL。
**返回结果类型**:如果所有参数均为 VARCHAR 类型,则结果类型为 VARCHAR。如果参数包含NCHAR类型,则结果类型为NCHAR。如果输入值为NULL,输出值为NULL。
**适用数据类型**BINARY, NCHAR。不能应用在 TAG 列。 该函数最小参数个数为2个,最大参数个数为8个。
**适用数据类型**VARCHAR, NCHAR。 该函数最小参数个数为2个,最大参数个数为8个。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -279,9 +279,9 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**功能说明**:带分隔符的字符串连接函数。
**返回结果类型**:如果所有参数均为BINARY类型,则结果类型为BINARY。如果参数包含NCHAR类型,则结果类型为NCHAR。如果输入值为NULL,输出值为NULL。如果separator值不为NULL,其他输入为NULL,输出为空串。
**返回结果类型**:如果所有参数均为VARCHAR类型,则结果类型为VARCHAR。如果参数包含NCHAR类型,则结果类型为NCHAR。如果输入值为NULL,输出值为NULL。如果separator值不为NULL,其他输入为NULL,输出为空串。
**适用数据类型**BINARY, NCHAR。不能应用在 TAG 列。 该函数最小参数个数为3个,最大参数个数为9个。
**适用数据类型**VARCHAR, NCHAR。 该函数最小参数个数为3个,最大参数个数为9个。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -298,7 +298,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:INT。
**适用数据类型**:输入参数是 BINARY 类型或者 NCHAR 类型的字符串或者列。不能应用在 TAG 列。
**适用数据类型**:输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -315,7 +315,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:同输入类型。如果输入值为NULL,输出值为NULL。
**适用数据类型**:输入参数是 BINARY 类型或者 NCHAR 类型的字符串或者列。不能应用在 TAG 列。
**适用数据类型**:输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -332,7 +332,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:同输入类型。如果输入值为NULL,输出值为NULL。
**适用数据类型**:输入参数是 BINARY 类型或者 NCHAR 类型的字符串或者列。不能应用在 TAG 列。
**适用数据类型**:输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -349,7 +349,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:同输入类型。如果输入值为NULL,输出值为NULL。
**适用数据类型**:输入参数是 BINARY 类型或者 NCHAR 类型的字符串或者列。不能应用在 TAG 列。
**适用数据类型**:输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -366,7 +366,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:同输入类型。如果输入值为NULL,输出值为NULL。
**适用数据类型**:输入参数是 BINARY 类型或者 NCHAR 类型的字符串或者列。不能应用在 TAG 列。输入参数pos可以为正数,也可以为负数。如果pos是正数,表示开始位置从字符串开头正数计算。如果pos为负数,表示开始位置从字符串结尾倒数计算。如果输入参数len被忽略,返回的子串包含从pos开始的整个字串。
**适用数据类型**:输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列。输入参数pos可以为正数,也可以为负数。如果pos是正数,表示开始位置从字符串开头正数计算。如果pos为负数,表示开始位置从字符串结尾倒数计算。如果输入参数len被忽略,返回的子串包含从pos开始的整个字串。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -383,7 +383,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
**返回结果类型**:同输入类型。如果输入值为NULL,输出值为NULL。
**适用数据类型**:输入参数是 BINARY 类型或者 NCHAR 类型的字符串或者列。不能应用在 TAG 列。
**适用数据类型**:输入参数是 VARCHAR 类型或者 NCHAR 类型的字符串或者列。
**嵌套子查询支持**:适用于内层查询和外层查询。
......@@ -400,7 +400,7 @@ SELECT ROUND(field_name) FROM { tb_name | stb_name } [WHERE clause];
SELECT CAST(expression AS type_name) FROM { tb_name | stb_name } [WHERE clause]
```
**功能说明**:数据类型转换函数,输入参数 expression 支持普通列、常量、标量函数及它们之间的四则运算,不支持 tag 列,只适用于 select 子句中。
**功能说明**:数据类型转换函数,输入参数 expression 支持普通列、常量、标量函数及它们之间的四则运算,只适用于 select 子句中。
**返回结果类型**:CAST 中指定的类型(type_name),可以是 BIGINT、BIGINT UNSIGNED、BINARY、VARCHAR、NCHAR和TIMESTAMP。
......@@ -423,7 +423,7 @@ SELECT TO_ISO8601(ts_val | ts_col) FROM { tb_name | stb_name } [WHERE clause];
**功能说明**:将 UNIX 时间戳转换成为 ISO8601 标准的日期时间格式,并附加客户端时区信息。
**返回结果数据类型**BINARY 类型。
**返回结果数据类型**VARCHAR 类型。
**适用数据类型**:UNIX 时间戳常量或是 TIMESTAMP 类型的列
......@@ -462,7 +462,7 @@ SELECT TO_UNIXTIMESTAMP(datetime_string | ts_col) FROM { tb_name | stb_name } [W
**返回结果数据类型**:长整型 INT64。
**应用字段**:字符串常量或是 BINARY/NCHAR 类型的列。
**应用字段**:字符串常量或是 VARCHAR/NCHAR 类型的列。
**适用于**:表、超级表。
......@@ -549,7 +549,7 @@ SELECT TIMEZONE() FROM { tb_name | stb_name } [WHERE clause];
**功能说明**:返回客户端当前时区信息。
**返回结果数据类型**BINARY 类型。
**返回结果数据类型**VARCHAR 类型。
**应用字段**:无
......@@ -668,7 +668,7 @@ SELECT LEASTSQUARES(field_name, start_val, step_val) FROM tb_name [WHERE clause]
SELECT MODE(field_name) FROM tb_name [WHERE clause];
```
**功能说明**:返回出现频率最高的值,若存在多个频率相同的最高值,输出空。不能匹配标签、时间戳输出。
**功能说明**:返回出现频率最高的值,若存在多个频率相同的最高值,输出空。
**返回数据类型**:同应用的字段。
......@@ -808,6 +808,25 @@ SELECT BOTTOM(field_name, K) FROM { tb_name | stb_name } [WHERE clause];
- 系统同时返回该记录关联的时间戳列;
- 限制:BOTTOM 函数不支持 FILL 子句。
### FIRST
```
SELECT FIRST(field_name) FROM { tb_name | stb_name } [WHERE clause];
```
**功能说明**:统计表/超级表中某列的值最先写入的非 NULL 值。
**返回数据类型**:同应用的字段。
**适用数据类型**:所有字段。
**适用于**:表和超级表。
**使用说明**:
- 如果要返回各个列的首个(时间戳最小)非 NULL 值,可以使用 FIRST(\*);
- 如果结果集中的某列全部为 NULL 值,则该列的返回结果也是 NULL;
- 如果结果集中所有列全部为 NULL 值,则不返回结果。
### INTERP
......@@ -919,25 +938,6 @@ SELECT PERCENTILE(field_name, P) FROM { tb_name } [WHERE clause];
**使用说明***P*值取值范围 0≤*P*≤100,为 0 的时候等同于 MIN,为 100 的时候等同于 MAX。
### FIRST
```
SELECT FIRST(field_name) FROM { tb_name | stb_name } [WHERE clause];
```
**功能说明**:统计表/超级表中某列的值最先写入的非 NULL 值。
**返回数据类型**:同应用的字段。
**适用数据类型**:所有字段。
**适用于**:表和超级表。
**使用说明**:
- 如果要返回各个列的首个(时间戳最小)非 NULL 值,可以使用 FIRST(\*);
- 如果结果集中的某列全部为 NULL 值,则该列的返回结果也是 NULL;
- 如果结果集中所有列全部为 NULL 值,则不返回结果。
### TAIL
......@@ -1005,7 +1005,7 @@ SELECT UNIQUE(field_name) FROM {tb_name | stb_name} [WHERE clause];
**返回结果类型**: 输入列如果是整数类型返回值为长整型 (int64_t),浮点数返回值为双精度浮点数(Double)。无符号整数类型返回值为无符号长整型(uint64_t)。 返回结果中同时带有每行记录对应的时间戳。
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上;在超级表查询中使用时,不能应用在标签之上
**适用数据类型**数值类型
**嵌套子查询支持**: 适用于内层查询和外层查询。
......@@ -1076,7 +1076,7 @@ SELECT IRATE(field_name) FROM tb_name WHERE clause;
**返回结果类型**: 返回双精度浮点数类型。
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型上;在超级表查询中使用时,不能应用在标签之上
**适用数据类型**数值类型
**嵌套子查询支持**: 适用于内层查询和外层查询。
......@@ -1124,7 +1124,7 @@ SELECT STATECOUNT(field_name, oper, val) FROM { tb_name | stb_name } [WHERE clau
**返回结果类型**:整形。
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上
**适用数据类型**数值类型
**嵌套子查询支持**:不支持应用在子查询上。
......@@ -1152,7 +1152,7 @@ SELECT stateDuration(field_name, oper, val, unit) FROM { tb_name | stb_name } [W
**返回结果类型**:整形。
**适用数据类型**不能应用在 timestamp、binary、nchar、bool 类型字段上
**适用数据类型**数值类型
**嵌套子查询支持**:不支持应用在子查询上。
......
......@@ -35,8 +35,8 @@ TDengine 支持 `UNION ALL` 和 `UNION` 操作符。UNION ALL 将查询返回的
| --- | :---------------: | -------------------------------------------------------------------- | -------------------- |
| 1 | = | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 相等 |
| 2 | <\>, != | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型,且不可以为表的时间戳主键列 | 不相等 |
| 3 | \>, \< | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 大于,小于 |
| 4 | \>=, \<= | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 大于等于,小于等于 |
| 3 | \>, < | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 大于,小于 |
| 4 | \>=, <= | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 大于等于,小于等于 |
| 5 | IS [NOT] NULL | 所有类型 | 是否为空值 |
| 6 | [NOT] BETWEEN AND | 除 BOOL、BLOB、MEDIUMBLOB 和 JSON 外的所有类型 | 闭区间比较 |
| 7 | IN | 除 BLOB、MEDIUMBLOB 和 JSON 外的所有类型,且不可以为表的时间戳主键列 | 与列表内的任意值相等 |
......
此差异已折叠。
......@@ -35,8 +35,8 @@ TDengine provides 2 set operators: `UNION ALL` and `UNION`. `UNION ALL` combines
| --- | :---------------: | ------------------------------------------------------------------- | ----------------------------------------------- |
| 1 | = | Except for BLOB, MEDIUMBLOB and JSON | Equal |
| 2 | <\>, != | Except for BLOB, MEDIUMBLOB, JSON and primary key of timestamp type | Not equal |
| 3 | \>, \< | Except for BLOB, MEDIUMBLOB and JSON | Greater than, less than |
| 4 | \>=, \<= | Except for BLOB, MEDIUMBLOB and JSON | Greater than or equal to, less than or equal to |
| 3 | \>, < | Except for BLOB, MEDIUMBLOB and JSON | Greater than, less than |
| 4 | \>=, <= | Except for BLOB, MEDIUMBLOB and JSON | Greater than or equal to, less than or equal to |
| 5 | IS [NOT] NULL | Any types | Is NULL or NOT |
| 6 | [NOT] BETWEEN AND | Except for BLOB, MEDIUMBLOB and JSON | In a value range or not |
| 7 | IN | Except for BLOB, MEDIUMBLOB, JSON and primary key of timestamp type | In a list of values or not |
......
add_executable(tmq "")
add_executable(tstream "")
add_executable(demoapi "")
target_sources(tmq
PRIVATE
"src/tmq.c"
)
target_sources(tstream
PRIVATE
"src/tstream.c"
)
target_sources(demoapi
PRIVATE
"src/demoapi.c"
)
target_link_libraries(tmq
taos_static
)
target_link_libraries(tstream
taos_static
)
target_link_libraries(demoapi
taos_static
)
target_include_directories(tmq
PUBLIC "${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_include_directories(tstream
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_include_directories(demoapi
PUBLIC "${TD_SOURCE_DIR}/include/client"
PUBLIC "${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq)
SET_TARGET_PROPERTIES(tstream PROPERTIES OUTPUT_NAME tstream)
SET_TARGET_PROPERTIES(demoapi PROPERTIES OUTPUT_NAME demoapi)
......@@ -3,20 +3,70 @@ PROJECT(TDengine)
IF (TD_LINUX)
INCLUDE_DIRECTORIES(. ${TD_SOURCE_DIR}/src/inc ${TD_SOURCE_DIR}/src/client/inc ${TD_SOURCE_DIR}/inc)
AUX_SOURCE_DIRECTORY(. SRC)
ADD_EXECUTABLE(demo apitest.c)
TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread )
ADD_EXECUTABLE(sml schemaless.c)
TARGET_LINK_LIBRARIES(sml taos_static trpc tutil pthread )
ADD_EXECUTABLE(subscribe subscribe.c)
TARGET_LINK_LIBRARIES(subscribe taos_static trpc tutil pthread )
ADD_EXECUTABLE(epoll epoll.c)
TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
# ADD_EXECUTABLE(demo apitest.c)
#TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread )
#ADD_EXECUTABLE(sml schemaless.c)
#TARGET_LINK_LIBRARIES(sml taos_static trpc tutil pthread )
#ADD_EXECUTABLE(subscribe subscribe.c)
#TARGET_LINK_LIBRARIES(subscribe taos_static trpc tutil pthread )
#ADD_EXECUTABLE(epoll epoll.c)
#TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
add_executable(tmq "")
add_executable(tstream "")
add_executable(demoapi "")
target_sources(tmq
PRIVATE
"tmq.c"
)
target_sources(tstream
PRIVATE
"tstream.c"
)
target_sources(demoapi
PRIVATE
"demoapi.c"
)
target_link_libraries(tmq
taos_static
)
target_link_libraries(tstream
taos_static
)
target_link_libraries(demoapi
taos_static
)
target_include_directories(tmq
PUBLIC "${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_include_directories(tstream
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
target_include_directories(demoapi
PUBLIC "${TD_SOURCE_DIR}/include/client"
PUBLIC "${TD_SOURCE_DIR}/include/os"
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/inc"
)
SET_TARGET_PROPERTIES(tmq PROPERTIES OUTPUT_NAME tmq)
SET_TARGET_PROPERTIES(tstream PROPERTIES OUTPUT_NAME tstream)
SET_TARGET_PROPERTIES(demoapi PROPERTIES OUTPUT_NAME demoapi)
ENDIF ()
IF (TD_DARWIN)
INCLUDE_DIRECTORIES(. ${TD_SOURCE_DIR}/src/inc ${TD_SOURCE_DIR}/src/client/inc ${TD_SOURCE_DIR}/inc)
AUX_SOURCE_DIRECTORY(. SRC)
ADD_EXECUTABLE(demo demo.c)
TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread lua)
ADD_EXECUTABLE(epoll epoll.c)
TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
#ADD_EXECUTABLE(demo demo.c)
#TARGET_LINK_LIBRARIES(demo taos_static trpc tutil pthread lua)
#ADD_EXECUTABLE(epoll epoll.c)
#TARGET_LINK_LIBRARIES(epoll taos_static trpc tutil pthread lua)
ENDIF ()
......@@ -85,6 +85,14 @@ typedef struct taosField {
int32_t bytes;
} TAOS_FIELD;
typedef struct TAOS_FIELD_E {
char name[65];
int8_t type;
uint8_t precision;
uint8_t scale;
int32_t bytes;
} TAOS_FIELD_E;
#ifdef WINDOWS
#define DLL_EXPORT __declspec(dllexport)
#else
......@@ -134,7 +142,10 @@ DLL_EXPORT TAOS_STMT *taos_stmt_init(TAOS *taos);
DLL_EXPORT int taos_stmt_prepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
DLL_EXPORT int taos_stmt_set_tbname_tags(TAOS_STMT *stmt, const char *name, TAOS_MULTI_BIND *tags);
DLL_EXPORT int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags);
DLL_EXPORT int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name);
DLL_EXPORT int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields);
DLL_EXPORT int taos_stmt_get_col_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields);
DLL_EXPORT int taos_stmt_is_insert(TAOS_STMT *stmt, int *insert);
DLL_EXPORT int taos_stmt_num_params(TAOS_STMT *stmt, int *nums);
......
......@@ -18,6 +18,7 @@
#include "os.h"
#include "talgo.h"
#include "tarray.h"
#include "tencode.h"
#include "ttypes.h"
#include "tutil.h"
......@@ -29,6 +30,7 @@ extern "C" {
typedef struct SSchema SSchema;
typedef struct STColumn STColumn;
typedef struct STSchema STSchema;
typedef struct SValue SValue;
typedef struct SColVal SColVal;
typedef struct STSRow2 STSRow2;
typedef struct STSRowBuilder STSRowBuilder;
......@@ -39,32 +41,36 @@ typedef struct STag STag;
int32_t tTSchemaCreate(int32_t sver, SSchema *pSchema, int32_t nCols, STSchema **ppTSchema);
void tTSchemaDestroy(STSchema *pTSchema);
// SColVal
#define ColValNONE ((SColVal){.type = COL_VAL_NONE, .nData = 0, .pData = NULL})
#define ColValNULL ((SColVal){.type = COL_VAL_NULL, .nData = 0, .pData = NULL})
#define ColValDATA(nData, pData) ((SColVal){.type = COL_VAL_DATA, .nData = (nData), .pData = (pData)})
// STSRow2
#define COL_VAL_NONE(CID) ((SColVal){.cid = (CID), .isNone = 1})
#define COL_VAL_NULL(CID) ((SColVal){.cid = (CID), .isNull = 1})
#define COL_VAL_VALUE(CID, V) ((SColVal){.cid = (CID), .value = (V)})
int32_t tTSRowClone(const STSRow2 *pRow, STSRow2 **ppRow);
void tTSRowFree(STSRow2 *pRow);
void tTSRowGet(STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
int32_t tTSRowToArray(STSRow2 *pRow, STSchema *pTSchema, SArray **ppArray);
int32_t tPutTSRow(uint8_t *p, STSRow2 *pRow);
int32_t tGetTSRow(uint8_t *p, STSRow2 *pRow);
int32_t tTSRowDup(const STSRow2 *pRow, STSRow2 **ppRow);
void tTSRowFree(STSRow2 *pRow);
int32_t tTSRowGet(const STSRow2 *pRow, STSchema *pTSchema, int32_t iCol, SColVal *pColVal);
// STSRowBuilder
#if 0
int32_t tTSRowBuilderInit(STSRowBuilder *pBuilder, int32_t sver, int32_t nCols, SSchema *pSchema);
void tTSRowBuilderClear(STSRowBuilder *pBuilder);
void tTSRowBuilderReset(STSRowBuilder *pBuilder);
int32_t tTSRowBuilderPut(STSRowBuilder *pBuilder, int32_t cid, uint8_t *pData, uint32_t nData);
int32_t tTSRowBuilderGetRow(STSRowBuilder *pBuilder, const STSRow2 **ppRow);
#endif
// STag
int32_t tTagNew(STagVal *pTagVals, int16_t nTag, STag **ppTag);
int32_t tTagNew(SArray *pArray, int32_t version, int8_t isJson, STag **ppTag);
void tTagFree(STag *pTag);
int32_t tTagSet(STag *pTag, SSchema *pSchema, int32_t nCols, int iCol, uint8_t *pData, uint32_t nData, STag **ppTag);
void tTagGet(STag *pTag, int16_t cid, int8_t type, uint8_t **ppData, uint32_t *nData);
bool tTagGet(const STag *pTag, STagVal *pTagVal);
char* tTagValToData(const STagVal *pTagVal, bool isJson);
int32_t tEncodeTag(SEncoder *pEncoder, const STag *pTag);
int32_t tDecodeTag(SDecoder *pDecoder, STag **ppTag);
int32_t tTagToValArray(const STag *pTag, SArray **ppArray);
void debugPrintSTag(STag *pTag, const char *tag, int32_t ln);
// STRUCT =================
struct STColumn {
......@@ -87,7 +93,9 @@ struct STSchema {
#define TSROW_HAS_NONE ((uint8_t)0x1)
#define TSROW_HAS_NULL ((uint8_t)0x2U)
#define TSROW_HAS_VAL ((uint8_t)0x4U)
#define TSROW_KV_ROW ((uint8_t)0x10U)
#define TSROW_KV_SMALL ((uint8_t)0x10U)
#define TSROW_KV_MID ((uint8_t)0x20U)
#define TSROW_KV_BIG ((uint8_t)0x40U)
struct STSRow2 {
TSKEY ts;
uint8_t flags;
......@@ -110,20 +118,60 @@ struct STSRowBuilder {
STSRow2 row;
};
typedef enum { COL_VAL_NONE = 0, COL_VAL_NULL = 1, COL_VAL_DATA = 2 } EColValT;
struct SValue {
union {
int8_t i8; // TSDB_DATA_TYPE_BOOL||TSDB_DATA_TYPE_TINYINT
uint8_t u8; // TSDB_DATA_TYPE_UTINYINT
int16_t i16; // TSDB_DATA_TYPE_SMALLINT
uint16_t u16; // TSDB_DATA_TYPE_USMALLINT
int32_t i32; // TSDB_DATA_TYPE_INT
uint32_t u32; // TSDB_DATA_TYPE_UINT
int64_t i64; // TSDB_DATA_TYPE_BIGINT
uint64_t u64; // TSDB_DATA_TYPE_UBIGINT
TSKEY ts; // TSDB_DATA_TYPE_TIMESTAMP
float f; // TSDB_DATA_TYPE_FLOAT
double d; // TSDB_DATA_TYPE_DOUBLE
struct {
uint32_t nData;
uint8_t *pData;
};
};
};
struct SColVal {
EColValT type;
uint32_t nData;
uint8_t *pData;
int16_t cid;
int8_t isNone;
int8_t isNull;
SValue value;
};
#pragma pack(push, 1)
struct STagVal {
int16_t cid;
int8_t type;
uint32_t nData;
uint8_t *pData;
union {
int16_t cid;
char *pKey;
};
int8_t type;
union {
int64_t i64;
struct {
uint32_t nData;
uint8_t *pData;
};
};
};
#define TD_TAG_JSON ((int8_t)0x40) // distinguish JSON string and JSON value with the highest bit
#define TD_TAG_LARGE ((int8_t)0x20)
struct STag {
int8_t flags;
int16_t len;
int16_t nTag;
int32_t ver;
int8_t idx[];
};
#pragma pack(pop)
#if 1 //================================================================================================================================================
// Imported since 3.0 and use bitmap to demonstrate None/Null/Norm, while use Null/Norm below 3.0 without of bitmap.
#define TD_SUPPORT_BITMAP
......@@ -366,109 +414,6 @@ SDataCols *tdFreeDataCols(SDataCols *pCols);
int32_t tdMergeDataCols(SDataCols *target, SDataCols *source, int32_t rowsToMerge, int32_t *pOffset, bool update,
TDRowVerT maxVer);
// ----------------- K-V data row structure
/* |<-------------------------------------- len -------------------------------------------->|
* |<----- header ----->|<--------------------------- body -------------------------------->|
* +----------+----------+---------------------------------+---------------------------------+
* | uint16_t | int16_t | | |
* +----------+----------+---------------------------------+---------------------------------+
* | len | ncols | cols index | data part |
* +----------+----------+---------------------------------+---------------------------------+
*/
typedef void *SKVRow;
typedef struct {
int16_t colId;
uint16_t offset;
} SColIdx;
#define TD_KV_ROW_HEAD_SIZE (sizeof(uint16_t) + sizeof(int16_t))
#define kvRowLen(r) (*(uint16_t *)(r))
#define kvRowNCols(r) (*(int16_t *)POINTER_SHIFT(r, sizeof(uint16_t)))
#define kvRowSetLen(r, len) kvRowLen(r) = (len)
#define kvRowSetNCols(r, n) kvRowNCols(r) = (n)
#define kvRowColIdx(r) (SColIdx *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE)
#define kvRowValues(r) POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE + sizeof(SColIdx) * kvRowNCols(r))
#define kvRowCpy(dst, r) memcpy((dst), (r), kvRowLen(r))
#define kvRowColVal(r, colIdx) POINTER_SHIFT(kvRowValues(r), (colIdx)->offset)
#define kvRowColIdxAt(r, i) (kvRowColIdx(r) + (i))
#define kvRowFree(r) taosMemoryFreeClear(r)
#define kvRowEnd(r) POINTER_SHIFT(r, kvRowLen(r))
#define kvRowValLen(r) (kvRowLen(r) - TD_KV_ROW_HEAD_SIZE - sizeof(SColIdx) * kvRowNCols(r))
#define kvRowTKey(r) (*(TKEY *)(kvRowValues(r)))
#define kvRowKey(r) tdGetKey(kvRowTKey(r))
#define kvRowKeys(r) POINTER_SHIFT(r, *(uint16_t *)POINTER_SHIFT(r, TD_KV_ROW_HEAD_SIZE + sizeof(int16_t)))
#define kvRowDeleted(r) TKEY_IS_DELETED(kvRowTKey(r))
SKVRow tdKVRowDup(SKVRow row);
int32_t tdSetKVRowDataOfCol(SKVRow *orow, int16_t colId, int8_t type, void *value);
int32_t tdEncodeKVRow(void **buf, SKVRow row);
void *tdDecodeKVRow(void *buf, SKVRow *row);
void tdSortKVRowByColIdx(SKVRow row);
static FORCE_INLINE int32_t comparTagId(const void *key1, const void *key2) {
if (*(int16_t *)key1 > ((SColIdx *)key2)->colId) {
return 1;
} else if (*(int16_t *)key1 < ((SColIdx *)key2)->colId) {
return -1;
} else {
return 0;
}
}
static FORCE_INLINE void *tdGetKVRowValOfCol(const SKVRow row, int16_t colId) {
void *ret = taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ);
if (ret == NULL) return NULL;
return kvRowColVal(row, (SColIdx *)ret);
}
static FORCE_INLINE void *tdGetKVRowIdxOfCol(SKVRow row, int16_t colId) {
return taosbsearch(&colId, kvRowColIdx(row), kvRowNCols(row), sizeof(SColIdx), comparTagId, TD_EQ);
}
// ----------------- K-V data row builder
typedef struct {
int16_t tCols;
int16_t nCols;
SColIdx *pColIdx;
uint16_t alloc;
uint16_t size;
void *buf;
} SKVRowBuilder;
int32_t tdInitKVRowBuilder(SKVRowBuilder *pBuilder);
void tdDestroyKVRowBuilder(SKVRowBuilder *pBuilder);
void tdResetKVRowBuilder(SKVRowBuilder *pBuilder);
SKVRow tdGetKVRowFromBuilder(SKVRowBuilder *pBuilder);
static FORCE_INLINE int32_t tdAddColToKVRow(SKVRowBuilder *pBuilder, col_id_t colId, const void *value, int32_t tlen) {
if (pBuilder->nCols >= pBuilder->tCols) {
pBuilder->tCols *= 2;
SColIdx *pColIdx = (SColIdx *)taosMemoryRealloc((void *)(pBuilder->pColIdx), sizeof(SColIdx) * pBuilder->tCols);
if (pColIdx == NULL) return -1;
pBuilder->pColIdx = pColIdx;
}
pBuilder->pColIdx[pBuilder->nCols].colId = colId;
pBuilder->pColIdx[pBuilder->nCols].offset = pBuilder->size;
pBuilder->nCols++;
if (tlen > pBuilder->alloc - pBuilder->size) {
while (tlen > pBuilder->alloc - pBuilder->size) {
pBuilder->alloc *= 2;
}
void *buf = taosMemoryRealloc(pBuilder->buf, pBuilder->alloc);
if (buf == NULL) return -1;
pBuilder->buf = buf;
}
memcpy(POINTER_SHIFT(pBuilder->buf, pBuilder->size), value, tlen);
pBuilder->size += tlen;
return 0;
}
#endif
#ifdef __cplusplus
......@@ -476,3 +421,4 @@ static FORCE_INLINE int32_t tdAddColToKVRow(SKVRowBuilder *pBuilder, col_id_t co
#endif
#endif /*_TD_COMMON_DATA_FORMAT_H_*/
......@@ -287,7 +287,7 @@ typedef struct SSchema {
char name[TSDB_COL_NAME_LEN];
} SSchema;
#define COL_IS_SET(FLG) ((FLG) & (COL_SET_VAL | COL_SET_NULL) != 0)
#define COL_IS_SET(FLG) (((FLG) & (COL_SET_VAL | COL_SET_NULL)) != 0)
#define COL_CLR_SET(FLG) ((FLG) &= (~(COL_SET_VAL | COL_SET_NULL)))
#define IS_BSMA_ON(s) (((s)->flags & 0x01) == COL_SMA_ON)
......@@ -1001,7 +1001,6 @@ typedef struct {
typedef struct {
int32_t vgId;
int32_t dnodeId;
char db[TSDB_DB_FNAME_LEN];
int64_t dbUid;
int32_t vgVersion;
......@@ -1024,16 +1023,14 @@ typedef struct {
int8_t compression;
int8_t strict;
int8_t cacheLastRow;
int8_t isTsma;
int8_t standby;
int8_t replica;
int8_t selfIndex;
SReplica replicas[TSDB_MAX_REPLICA];
int32_t numOfRetensions;
SArray* pRetensions; // SRetention
// for tsma
int8_t isTsma;
void* pTsma;
void* pTsma;
} SCreateVnodeReq;
int32_t tSerializeSCreateVnodeReq(void* buf, int32_t bufLen, SCreateVnodeReq* pReq);
......@@ -1071,8 +1068,8 @@ typedef struct {
int8_t walLevel;
int8_t strict;
int8_t cacheLastRow;
int8_t replica;
int8_t selfIndex;
int8_t replica;
SReplica replicas[TSDB_MAX_REPLICA];
} SAlterVnodeReq;
......@@ -1777,6 +1774,15 @@ typedef struct SVCreateTbReq {
int tEncodeSVCreateTbReq(SEncoder* pCoder, const SVCreateTbReq* pReq);
int tDecodeSVCreateTbReq(SDecoder* pCoder, SVCreateTbReq* pReq);
static FORCE_INLINE void tdDestroySVCreateTbReq(SVCreateTbReq* req) {
taosMemoryFreeClear(req->name);
if (req->type == TSDB_CHILD_TABLE) {
taosMemoryFreeClear(req->ctb.pTag);
} else if (req->type == TSDB_NORMAL_TABLE) {
taosMemoryFreeClear(req->ntb.schemaRow.pSchema);
}
}
typedef struct {
int32_t nReqs;
union {
......@@ -2295,6 +2301,7 @@ typedef struct {
int8_t intervalUnit; // MACRO: TIME_UNIT_XXX
int8_t slidingUnit; // MACRO: TIME_UNIT_XXX
int8_t timezoneInt; // sma data expired if timezone changes.
int32_t dstVgId;
char indexName[TSDB_INDEX_NAME_LEN];
int32_t exprLen;
int32_t tagsFilterLen;
......@@ -2652,6 +2659,23 @@ typedef struct {
int32_t tEncodeSVSubmitReq(SEncoder* pCoder, const SVSubmitReq* pReq);
int32_t tDecodeSVSubmitReq(SDecoder* pCoder, SVSubmitReq* pReq);
// TDMT_VND_DELETE
typedef struct {
TSKEY sKey;
TSKEY eKey;
// super table
char* stbName;
// child/normal
char* tbName;
} SVDeleteReq;
typedef struct {
int32_t code;
// TODO
} SVDeleteRsp;
#pragma pack(pop)
#ifdef __cplusplus
......
......@@ -221,9 +221,11 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_APPLY_MSG, "vnode-sync-apply-msg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_CONFIG_CHANGE, "vnode-sync-config-change", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_SYNC_VNODE, "vnode-sync-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_VNODE, "vnode-alter-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT_VNODE, "vnode-compact-vnode", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_CONFIG, "vnode-alter-config", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_REPLICA, "vnode-alter-replica", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_COMPACT, "vnode-compact", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DELETE, "vnode-delete-data", SVDeleteReq, SVDeleteRsp)
// Requests handled by QNODE
TD_NEW_MSG_SEG(TDMT_QND_MSG)
......
......@@ -77,8 +77,8 @@ int32_t qStmtParseQuerySql(SParseContext* pCxt, SQuery* pQuery);
int32_t qBindStmtColsValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen);
int32_t qBindStmtSingleColValue(void* pBlock, TAOS_MULTI_BIND* bind, char* msgBuf, int32_t msgBufLen, int32_t colIdx,
int32_t rowNum);
int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD** fields);
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD** fields);
int32_t qBuildStmtColFields(void* pDataBlock, int32_t* fieldNum, TAOS_FIELD_E** fields);
int32_t qBuildStmtTagFields(void* pBlock, void* boundTags, int32_t* fieldNum, TAOS_FIELD_E** fields);
int32_t qBindStmtTagsValue(void* pBlock, void* boundTags, int64_t suid, char* tName, TAOS_MULTI_BIND* bind,
char* msgBuf, int32_t msgBufLen);
void destroyBoundColumnInfo(void* pBoundInfo);
......
......@@ -33,8 +33,19 @@ extern "C" {
#ifdef WINDOWS
#define TD_TMP_DIR_PATH "C:\\Windows\\Temp\\"
#define TD_CFG_DIR_PATH "C:\\TDengine\\cfg\\"
#define TD_DATA_DIR_PATH "C:\\TDengine\\data\\"
#define TD_LOG_DIR_PATH "C:\\TDengine\\log\\"
#elif defined(_TD_DARWIN_64)
#define TD_TMP_DIR_PATH "/tmp/taosd/"
#define TD_CFG_DIR_PATH "/usr/local/etc/taos/"
#define TD_DATA_DIR_PATH "/usr/local/var/lib/taos/"
#define TD_LOG_DIR_PATH "/usr/local/var/log/taos/"
#else
#define TD_TMP_DIR_PATH "/tmp/"
#define TD_CFG_DIR_PATH "/etc/taos/"
#define TD_DATA_DIR_PATH "/var/lib/taos/"
#define TD_LOG_DIR_PATH "/var/log/taos/"
#endif
typedef struct TdDir *TdDirPtr;
......
......@@ -70,6 +70,7 @@ int32_t* taosGetErrno();
#define TSDB_CODE_NEED_RETRY TAOS_DEF_ERROR_CODE(0, 0x0028)
#define TSDB_CODE_OUT_OF_RPC_MEMORY_QUEUE TAOS_DEF_ERROR_CODE(0, 0x0029)
#define TSDB_CODE_INVALID_TIMESTAMP TAOS_DEF_ERROR_CODE(0, 0x0030)
#define TSDB_CODE_MSG_DECODE_ERROR TAOS_DEF_ERROR_CODE(0, 0x0031)
#define TSDB_CODE_REF_NO_MEMORY TAOS_DEF_ERROR_CODE(0, 0x0040)
#define TSDB_CODE_REF_FULL TAOS_DEF_ERROR_CODE(0, 0x0041)
......
......@@ -534,6 +534,26 @@ static FORCE_INLINE int32_t tPutI64(uint8_t* p, int64_t v) {
return sizeof(int64_t);
}
static FORCE_INLINE int32_t tPutFloat(uint8_t* p, float f) {
union {
uint32_t ui;
float f;
} v;
v.f = f;
return tPutU32(p, v.ui);
}
static FORCE_INLINE int32_t tPutDouble(uint8_t* p, double d) {
union {
uint64_t ui;
double d;
} v;
v.d = d;
return tPutU64(p, v.ui);
}
static FORCE_INLINE int32_t tPutU16v(uint8_t* p, uint16_t v) { tPutV(p, v); }
static FORCE_INLINE int32_t tPutI16v(uint8_t* p, int16_t v) { return tPutU16v(p, ZIGZAGE(int16_t, v)); }
......@@ -623,6 +643,34 @@ static FORCE_INLINE int32_t tGetI64v(uint8_t* p, int64_t* v) {
return n;
}
static FORCE_INLINE int32_t tGetFloat(uint8_t* p, float* f) {
int32_t n = 0;
union {
uint32_t ui;
float f;
} v;
n = tGetU32(p, &v.ui);
*f = v.f;
return n;
}
static FORCE_INLINE int32_t tGetDouble(uint8_t* p, double* d) {
int32_t n = 0;
union {
uint64_t ui;
double d;
} v;
n = tGetU64(p, &v.ui);
*d = v.d;
return n;
}
// =====================
static FORCE_INLINE int32_t tPutBinary(uint8_t* p, uint8_t* pData, uint32_t nData) {
int n = 0;
......@@ -646,6 +694,11 @@ static FORCE_INLINE int32_t tGetBinary(uint8_t* p, uint8_t** ppData, uint32_t* n
return n;
}
static FORCE_INLINE int32_t tPutCStr(uint8_t* p, char* pData) {
return tPutBinary(p, (uint8_t*)pData, strlen(pData) + 1);
}
static FORCE_INLINE int32_t tGetCStr(uint8_t* p, char** ppData) { return tGetBinary(p, (uint8_t**)ppData, NULL); }
#ifdef __cplusplus
}
#endif
......
......@@ -116,8 +116,11 @@ int stmtAffectedRowsOnce(TAOS_STMT *stmt);
int stmtPrepare(TAOS_STMT *stmt, const char *sql, unsigned long length);
int stmtSetTbName(TAOS_STMT *stmt, const char *tbName);
int stmtSetTbTags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags);
int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields);
int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields);
int stmtIsInsert(TAOS_STMT *stmt, int *insert);
int stmtGetParamNum(TAOS_STMT *stmt, int *nums);
int stmtGetParam(TAOS_STMT *stmt, int idx, int *type, int *bytes);
int stmtAddBatch(TAOS_STMT *stmt);
TAOS_RES *stmtUseResult(TAOS_STMT *stmt);
int stmtBindBatch(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind, int32_t colIdx);
......
......@@ -1033,27 +1033,20 @@ static char* parseTagDatatoJson(void* p) {
goto end;
}
int16_t nCols = kvRowNCols(p);
SArray* pTagVals = NULL;
if (tTagToValArray((const STag*)p, &pTagVals) != 0) {
goto end;
}
int16_t nCols = taosArrayGetSize(pTagVals);
char tagJsonKey[256] = {0};
for (int j = 0; j < nCols; ++j) {
SColIdx* pColIdx = kvRowColIdxAt(p, j);
char* val = (char*)(kvRowColVal(p, pColIdx));
if (j == 0) {
if (*val == TSDB_DATA_TYPE_NULL) {
string = taosMemoryCalloc(1, 8);
sprintf(string, "%s", TSDB_DATA_NULL_STR_L);
goto end;
}
continue;
}
STagVal* pTagVal = (STagVal*)taosArrayGet(pTagVals, j);
// json key encode by binary
memset(tagJsonKey, 0, sizeof(tagJsonKey));
memcpy(tagJsonKey, varDataVal(val), varDataLen(val));
memcpy(tagJsonKey, pTagVal->pKey, strlen(pTagVal->pKey));
// json value
val += varDataTLen(val);
char* realData = POINTER_SHIFT(val, CHAR_BYTES);
char type = *val;
char type = pTagVal->type;
if (type == TSDB_DATA_TYPE_NULL) {
cJSON* value = cJSON_CreateNull();
if (value == NULL) {
......@@ -1062,11 +1055,11 @@ static char* parseTagDatatoJson(void* p) {
cJSON_AddItemToObject(json, tagJsonKey, value);
} else if (type == TSDB_DATA_TYPE_NCHAR) {
cJSON* value = NULL;
if (varDataLen(realData) > 0) {
char* tagJsonValue = taosMemoryCalloc(varDataLen(realData), 1);
int32_t length = taosUcs4ToMbs((TdUcs4*)varDataVal(realData), varDataLen(realData), tagJsonValue);
if (pTagVal->nData > 0) {
char* tagJsonValue = taosMemoryCalloc(pTagVal->nData, 1);
int32_t length = taosUcs4ToMbs((TdUcs4*)pTagVal->pData, pTagVal->nData, tagJsonValue);
if (length < 0) {
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, val);
tscError("charset:%s to %s. val:%s convert json value failed.", DEFAULT_UNICODE_ENCODEC, tsCharset, pTagVal->pData);
taosMemoryFree(tagJsonValue);
goto end;
}
......@@ -1075,7 +1068,7 @@ static char* parseTagDatatoJson(void* p) {
if (value == NULL) {
goto end;
}
} else if (varDataLen(realData) == 0) {
} else if (pTagVal->nData == 0) {
value = cJSON_CreateString("");
} else {
ASSERT(0);
......@@ -1083,22 +1076,14 @@ static char* parseTagDatatoJson(void* p) {
cJSON_AddItemToObject(json, tagJsonKey, value);
} else if (type == TSDB_DATA_TYPE_DOUBLE) {
double jsonVd = *(double*)(realData);
double jsonVd = *(double*)(&pTagVal->i64);
cJSON* value = cJSON_CreateNumber(jsonVd);
if (value == NULL) {
goto end;
}
cJSON_AddItemToObject(json, tagJsonKey, value);
// }else if(type == TSDB_DATA_TYPE_BIGINT){
// int64_t jsonVd = *(int64_t*)(realData);
// cJSON* value = cJSON_CreateNumber((double)jsonVd);
// if (value == NULL)
// {
// goto end;
// }
// cJSON_AddItemToObject(json, tagJsonKey, value);
} else if (type == TSDB_DATA_TYPE_BOOL) {
char jsonVd = *(char*)(realData);
char jsonVd = *(char*)(&pTagVal->i64);
cJSON* value = cJSON_CreateBool(jsonVd);
if (value == NULL) {
goto end;
......@@ -1163,7 +1148,7 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
if (jsonInnerType == TSDB_DATA_TYPE_NULL) {
sprintf(varDataVal(dst), "%s", TSDB_DATA_NULL_STR_L);
varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_JSON) {
} else if (jsonInnerType == TD_TAG_JSON) {
char* jsonString = parseTagDatatoJson(jsonInnerData);
STR_TO_VARSTR(dst, jsonString);
taosMemoryFree(jsonString);
......@@ -1182,10 +1167,6 @@ static int32_t doConvertUCS4(SReqResultInfo* pResultInfo, int32_t numOfRows, int
double jsonVd = *(double*)(jsonInnerData);
sprintf(varDataVal(dst), "%.9lf", jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_BIGINT) {
int64_t jsonVd = *(int64_t*)(jsonInnerData);
sprintf(varDataVal(dst), "%" PRId64, jsonVd);
varDataSetLen(dst, strlen(varDataVal(dst)));
} else if (jsonInnerType == TSDB_DATA_TYPE_BOOL) {
sprintf(varDataVal(dst), "%s", (*((char*)jsonInnerData) == 1) ? "true" : "false");
varDataSetLen(dst, strlen(varDataVal(dst)));
......
......@@ -666,8 +666,39 @@ int taos_stmt_set_tbname(TAOS_STMT *stmt, const char *name) {
return stmtSetTbName(stmt, name);
}
int taos_stmt_set_tags(TAOS_STMT *stmt, TAOS_MULTI_BIND *tags) {
if (stmt == NULL || tags == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtSetTbTags(stmt, tags);
}
int taos_stmt_set_sub_tbname(TAOS_STMT *stmt, const char *name) { return taos_stmt_set_tbname(stmt, name); }
int taos_stmt_get_tag_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields) {
if (stmt == NULL || NULL == fieldNum) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtGetTagFields(stmt, fieldNum, fields);
}
int taos_stmt_get_col_fields(TAOS_STMT *stmt, int* fieldNum, TAOS_FIELD_E** fields) {
if (stmt == NULL || NULL == fieldNum) {
tscError("NULL parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtGetColFields(stmt, fieldNum, fields);
}
int taos_stmt_bind_param(TAOS_STMT *stmt, TAOS_MULTI_BIND *bind) {
if (stmt == NULL || bind == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
......@@ -772,6 +803,16 @@ int taos_stmt_num_params(TAOS_STMT *stmt, int *nums) {
return stmtGetParamNum(stmt, nums);
}
int taos_stmt_get_param(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
if (stmt == NULL || type == NULL || NULL == bytes || idx < 0) {
tscError("invalid parameter for %s", __FUNCTION__);
terrno = TSDB_CODE_INVALID_PARA;
return terrno;
}
return stmtGetParam(stmt, idx, type, bytes);
}
TAOS_RES *taos_stmt_use_result(TAOS_STMT *stmt) {
if (stmt == NULL) {
tscError("NULL parameter for %s", __FUNCTION__);
......
......@@ -17,7 +17,7 @@ int32_t stmtSwitchStatus(STscStmt* pStmt, STMT_STATUS newStatus) {
}
break;
case STMT_SETTAGS:
if (STMT_STATUS_NE(SETTBNAME)) {
if (STMT_STATUS_NE(SETTBNAME) && STMT_STATUS_NE(FETCH_FIELDS)) {
code = TSDB_CODE_TSC_STMT_API_ERROR;
}
break;
......@@ -540,6 +540,8 @@ int stmtSetTbName(TAOS_STMT* stmt, const char* tbName) {
if (pStmt->bInfo.needParse) {
strncpy(pStmt->bInfo.tbName, tbName, sizeof(pStmt->bInfo.tbName) - 1);
pStmt->bInfo.tbName[sizeof(pStmt->bInfo.tbName) - 1] = 0;
STMT_ERR_RET(stmtParseSql(pStmt));
}
return TSDB_CODE_SUCCESS;
......@@ -550,10 +552,6 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_SETTAGS));
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
if (pStmt->bInfo.inExecCache) {
return TSDB_CODE_SUCCESS;
}
......@@ -571,7 +569,7 @@ int stmtSetTbTags(TAOS_STMT* stmt, TAOS_MULTI_BIND* tags) {
return TSDB_CODE_SUCCESS;
}
int32_t stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD** fields) {
int stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
if (STMT_TYPE_QUERY == pStmt->sql.type) {
tscError("invalid operation to get query tag fileds");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
......@@ -589,7 +587,7 @@ int32_t stmtFetchTagFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD** fiel
return TSDB_CODE_SUCCESS;
}
int32_t stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD** fields) {
int stmtFetchColFields(STscStmt* pStmt, int32_t* fieldNum, TAOS_FIELD_E** fields) {
if (STMT_TYPE_QUERY == pStmt->sql.type) {
tscError("invalid operation to get query column fileds");
STMT_ERR_RET(TSDB_CODE_TSC_STMT_API_ERROR);
......@@ -852,6 +850,71 @@ int stmtIsInsert(TAOS_STMT* stmt, int* insert) {
return TSDB_CODE_SUCCESS;
}
int stmtGetTagFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
STscStmt* pStmt = (STscStmt*)stmt;
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false;
}
if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
taos_free_result(pStmt->exec.pRequest);
pStmt->exec.pRequest = NULL;
}
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
STMT_ERR_RET(stmtFetchTagFields(stmt, nums, fields));
return TSDB_CODE_SUCCESS;
}
int stmtGetColFields(TAOS_STMT* stmt, int* nums, TAOS_FIELD_E** fields) {
STscStmt* pStmt = (STscStmt*)stmt;
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false;
}
if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
taos_free_result(pStmt->exec.pRequest);
pStmt->exec.pRequest = NULL;
}
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
STMT_ERR_RET(stmtFetchColFields(stmt, nums, fields));
return TSDB_CODE_SUCCESS;
}
int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
STscStmt* pStmt = (STscStmt*)stmt;
......@@ -884,6 +947,50 @@ int stmtGetParamNum(TAOS_STMT* stmt, int* nums) {
return TSDB_CODE_SUCCESS;
}
int stmtGetParam(TAOS_STMT *stmt, int idx, int *type, int *bytes) {
STscStmt* pStmt = (STscStmt*)stmt;
if (STMT_TYPE_QUERY == pStmt->sql.type) {
STMT_RET(TSDB_CODE_TSC_STMT_API_ERROR);
}
STMT_ERR_RET(stmtSwitchStatus(pStmt, STMT_FETCH_FIELDS));
if (pStmt->bInfo.needParse && pStmt->sql.runTimes && pStmt->sql.type > 0 &&
STMT_TYPE_MULTI_INSERT != pStmt->sql.type) {
pStmt->bInfo.needParse = false;
}
if (pStmt->exec.pRequest && STMT_TYPE_QUERY == pStmt->sql.type && pStmt->sql.runTimes) {
taos_free_result(pStmt->exec.pRequest);
pStmt->exec.pRequest = NULL;
}
if (NULL == pStmt->exec.pRequest) {
STMT_ERR_RET(buildRequest(pStmt->taos, pStmt->sql.sqlStr, pStmt->sql.sqlLen, &pStmt->exec.pRequest));
}
if (pStmt->bInfo.needParse) {
STMT_ERR_RET(stmtParseSql(pStmt));
}
int32_t nums = 0;
TAOS_FIELD_E *pField = NULL;
STMT_ERR_RET(stmtFetchColFields(stmt, &nums, &pField));
if (idx >= nums) {
tscError("idx %d is too big", idx);
taosMemoryFree(pField);
STMT_ERR_RET(TSDB_CODE_INVALID_PARA);
}
*type = pField[idx].type;
*bytes = pField[idx].bytes;
taosMemoryFree(pField);
return TSDB_CODE_SUCCESS;
}
TAOS_RES* stmtUseResult(TAOS_STMT* stmt) {
STscStmt* pStmt = (STscStmt*)stmt;
......
......@@ -116,22 +116,23 @@ int32_t colDataAppend(SColumnInfoData* pColumnInfoData, uint32_t currentRow, con
int32_t type = pColumnInfoData->info.type;
if (IS_VAR_DATA_TYPE(type)) {
int32_t dataLen = varDataTLen(pData);
int32_t dataLen = 0;
if (type == TSDB_DATA_TYPE_JSON) {
if (*pData == TSDB_DATA_TYPE_NULL) {
dataLen = 0;
dataLen = CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_NCHAR) {
dataLen = varDataTLen(pData + CHAR_BYTES);
dataLen = varDataTLen(pData + CHAR_BYTES) + CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_DOUBLE) {
dataLen = DOUBLE_BYTES;
dataLen = DOUBLE_BYTES + CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_BOOL) {
dataLen = CHAR_BYTES;
} else if (*pData == TSDB_DATA_TYPE_JSON) {
dataLen = kvRowLen(pData + CHAR_BYTES);
dataLen = CHAR_BYTES + CHAR_BYTES;
} else if (*pData == TD_TAG_JSON) { // json string
dataLen = ((STag*)(pData))->len;
} else {
ASSERT(0);
}
dataLen += CHAR_BYTES;
}else {
dataLen = varDataTLen(pData);
}
SVarColAttr* pAttr = &pColumnInfoData->varmeta;
......@@ -1634,6 +1635,11 @@ int32_t buildSubmitReqFromDataBlock(SSubmitReq** pReq, const SArray* pDataBlocks
SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, bool createTb, int64_t suid,
const char* stbFullName, int32_t vgId) {
SSubmitReq* ret = NULL;
SArray* tagArray = taosArrayInit(1, sizeof(STagVal));
if(!tagArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
// cal size
int32_t cap = sizeof(SSubmitReq);
......@@ -1655,18 +1661,33 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
ASSERT(0);
STagVal tagVal = {.cid = 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.pData = (uint8_t*)&pDataBlock->info.groupId,
.nData = sizeof(uint64_t)};
STag* pTag = NULL;
taosArrayClear(tagArray);
taosArrayPush(tagArray, &tagVal);
tTagNew(tagArray, 1, false, &pTag);
if (!pTag) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
return NULL;
}
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t));
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
createTbReq.ctb.pTag = (uint8_t*)pTag;
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) return NULL;
taosMemoryFree(cname);
tdDestroySVCreateTbReq(&createTbReq);
if (code < 0) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
return NULL;
}
}
cap += sizeof(SSubmitBlk) + schemaLen + rows * maxLen;
......@@ -1709,22 +1730,42 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
createTbReq.type = TSDB_CHILD_TABLE;
createTbReq.ctb.suid = suid;
SKVRowBuilder kvRowBuilder = {0};
if (tdInitKVRowBuilder(&kvRowBuilder) < 0) {
ASSERT(0);
STagVal tagVal = {.cid = 1,
.type = TSDB_DATA_TYPE_UBIGINT,
.pData = (uint8_t*)&pDataBlock->info.groupId,
.nData = sizeof(uint64_t)};
taosArrayClear(tagArray);
taosArrayPush(tagArray, &tagVal);
STag* pTag = NULL;
tTagNew(tagArray, 1, false, &pTag);
if (!pTag) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
}
tdAddColToKVRow(&kvRowBuilder, 1, &pDataBlock->info.groupId, sizeof(uint64_t));
createTbReq.ctb.pTag = tdGetKVRowFromBuilder(&kvRowBuilder);
tdDestroyKVRowBuilder(&kvRowBuilder);
createTbReq.ctb.pTag = (uint8_t*)pTag;
int32_t code;
tEncodeSize(tEncodeSVCreateTbReq, &createTbReq, schemaLen, code);
if (code < 0) return NULL;
if (code < 0) {
tdDestroySVCreateTbReq(&createTbReq);
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
}
SEncoder encoder = {0};
tEncoderInit(&encoder, blockData, schemaLen);
if (tEncodeSVCreateTbReq(&encoder, &createTbReq) < 0) return NULL;
code = tEncodeSVCreateTbReq(&encoder, &createTbReq);
tEncoderClear(&encoder);
tdDestroySVCreateTbReq(&createTbReq);
if (code < 0) {
taosArrayDestroy(tagArray);
taosMemoryFreeClear(ret);
return NULL;
}
}
blkHead->schemaLen = htonl(schemaLen);
......@@ -1759,5 +1800,6 @@ SSubmitReq* tdBlockToSubmit(const SArray* pBlocks, const STSchema* pTSchema, boo
}
ret->length = htonl(ret->length);
taosArrayDestroy(tagArray);
return ret;
}
此差异已折叠。
......@@ -2933,7 +2933,6 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tStartEncode(&encoder) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgId) < 0) return -1;
if (tEncodeI32(&encoder, pReq->dnodeId) < 0) return -1;
if (tEncodeCStr(&encoder, pReq->db) < 0) return -1;
if (tEncodeI64(&encoder, pReq->dbUid) < 0) return -1;
if (tEncodeI32(&encoder, pReq->vgVersion) < 0) return -1;
......@@ -2956,6 +2955,7 @@ int32_t tSerializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *pR
if (tEncodeI8(&encoder, pReq->compression) < 0) return -1;
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
if (tEncodeI8(&encoder, pReq->standby) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
......@@ -2990,7 +2990,6 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tStartDecode(&decoder) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgId) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->dnodeId) < 0) return -1;
if (tDecodeCStrTo(&decoder, pReq->db) < 0) return -1;
if (tDecodeI64(&decoder, &pReq->dbUid) < 0) return -1;
if (tDecodeI32(&decoder, &pReq->vgVersion) < 0) return -1;
......@@ -3013,6 +3012,7 @@ int32_t tDeserializeSCreateVnodeReq(void *buf, int32_t bufLen, SCreateVnodeReq *
if (tDecodeI8(&decoder, &pReq->compression) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->standby) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
......@@ -3133,8 +3133,8 @@ int32_t tSerializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pReq
if (tEncodeI8(&encoder, pReq->walLevel) < 0) return -1;
if (tEncodeI8(&encoder, pReq->strict) < 0) return -1;
if (tEncodeI8(&encoder, pReq->cacheLastRow) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
if (tEncodeI8(&encoder, pReq->selfIndex) < 0) return -1;
if (tEncodeI8(&encoder, pReq->replica) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SReplica *pReplica = &pReq->replicas[i];
if (tEncodeSReplica(&encoder, pReplica) < 0) return -1;
......@@ -3164,8 +3164,8 @@ int32_t tDeserializeSAlterVnodeReq(void *buf, int32_t bufLen, SAlterVnodeReq *pR
if (tDecodeI8(&decoder, &pReq->walLevel) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->strict) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->cacheLastRow) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->selfIndex) < 0) return -1;
if (tDecodeI8(&decoder, &pReq->replica) < 0) return -1;
for (int32_t i = 0; i < TSDB_MAX_REPLICA; ++i) {
SReplica *pReplica = &pReq->replicas[i];
if (tDecodeSReplica(&decoder, pReplica) < 0) return -1;
......@@ -3653,6 +3653,7 @@ int32_t tEncodeTSma(SEncoder *pCoder, const STSma *pSma) {
if (tEncodeI8(pCoder, pSma->intervalUnit) < 0) return -1;
if (tEncodeI8(pCoder, pSma->slidingUnit) < 0) return -1;
if (tEncodeI8(pCoder, pSma->timezoneInt) < 0) return -1;
if (tEncodeI32(pCoder, pSma->dstVgId) < 0) return -1;
if (tEncodeCStr(pCoder, pSma->indexName) < 0) return -1;
if (tEncodeI32(pCoder, pSma->exprLen) < 0) return -1;
if (tEncodeI32(pCoder, pSma->tagsFilterLen) < 0) return -1;
......@@ -3675,6 +3676,7 @@ int32_t tDecodeTSma(SDecoder *pCoder, STSma *pSma) {
if (tDecodeI8(pCoder, &pSma->version) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->intervalUnit) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->slidingUnit) < 0) return -1;
if (tDecodeI32(pCoder, &pSma->dstVgId) < 0) return -1;
if (tDecodeI8(pCoder, &pSma->timezoneInt) < 0) return -1;
if (tDecodeCStrTo(pCoder, pSma->indexName) < 0) return -1;
if (tDecodeI32(pCoder, &pSma->exprLen) < 0) return -1;
......@@ -3908,7 +3910,7 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
if (pReq->type == TSDB_CHILD_TABLE) {
if (tEncodeI64(pCoder, pReq->ctb.suid) < 0) return -1;
if (tEncodeBinary(pCoder, pReq->ctb.pTag, kvRowLen(pReq->ctb.pTag)) < 0) return -1;
if (tEncodeTag(pCoder, (const STag *)pReq->ctb.pTag) < 0) return -1;
} else if (pReq->type == TSDB_NORMAL_TABLE) {
if (tEncodeSSchemaWrapper(pCoder, &pReq->ntb.schemaRow) < 0) return -1;
} else {
......@@ -3920,8 +3922,6 @@ int tEncodeSVCreateTbReq(SEncoder *pCoder, const SVCreateTbReq *pReq) {
}
int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
uint32_t len;
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI32v(pCoder, &pReq->flags) < 0) return -1;
......@@ -3933,7 +3933,7 @@ int tDecodeSVCreateTbReq(SDecoder *pCoder, SVCreateTbReq *pReq) {
if (pReq->type == TSDB_CHILD_TABLE) {
if (tDecodeI64(pCoder, &pReq->ctb.suid) < 0) return -1;
if (tDecodeBinary(pCoder, &pReq->ctb.pTag, &len) < 0) return -1;
if (tDecodeTag(pCoder, (STag **)&pReq->ctb.pTag) < 0) return -1;
} else if (pReq->type == TSDB_NORMAL_TABLE) {
if (tDecodeSSchemaWrapper(pCoder, &pReq->ntb.schemaRow) < 0) return -1;
} else {
......
......@@ -219,9 +219,9 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_DROP_TASK, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_QUERY_HEARTBEAT, mmPutNodeMsgToQueryQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_RSP, mmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_TIMEOUT, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_SYNC_PING, mmPutNodeMsgToSyncQueue, 1) == NULL) goto _OVER;
......
......@@ -150,20 +150,26 @@ static void vmGenerateVnodeCfg(SCreateVnodeReq *pCreate, SVnodeCfg *pCfg) {
pCfg->tsdbCfg.minRows = pCreate->minRows;
pCfg->tsdbCfg.maxRows = pCreate->maxRows;
for (size_t i = 0; i < taosArrayGetSize(pCreate->pRetensions); ++i) {
memcpy(&pCfg->tsdbCfg.retentions[i], taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
SRetention *pRetention = &pCfg->tsdbCfg.retentions[i];
memcpy(pRetention, taosArrayGet(pCreate->pRetensions, i), sizeof(SRetention));
if (i == 0) {
if ((pRetention->freq > 0 && pRetention->keep > 0)) pCfg->isRsma = 1;
}
}
pCfg->walCfg.vgId = pCreate->vgId;
pCfg->hashBegin = pCreate->hashBegin;
pCfg->hashEnd = pCreate->hashEnd;
pCfg->hashMethod = pCreate->hashMethod;
pCfg->standby = pCfg->standby;
pCfg->syncCfg.myIndex = pCreate->selfIndex;
pCfg->syncCfg.replicaNum = pCreate->replica;
memset(&pCfg->syncCfg.nodeInfo, 0, sizeof(pCfg->syncCfg.nodeInfo));
for (int i = 0; i < pCreate->replica; ++i) {
pCfg->syncCfg.nodeInfo[i].nodePort = pCreate->replicas[i].port;
snprintf(pCfg->syncCfg.nodeInfo[i].nodeFqdn, sizeof(pCfg->syncCfg.nodeInfo[i].nodeFqdn), "%s",
pCreate->replicas[i].fqdn);
SNodeInfo *pNode = &pCfg->syncCfg.nodeInfo[i];
pNode->nodePort = pCreate->replicas[i].port;
tstrncpy(pNode->nodeFqdn, pCreate->replicas[i].fqdn, sizeof(pNode->nodeFqdn));
}
}
......@@ -176,6 +182,8 @@ static void vmGenerateWrapperCfg(SVnodeMgmt *pMgmt, SCreateVnodeReq *pCreate, SW
int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
SCreateVnodeReq createReq = {0};
SVnodeCfg vnodeCfg = {0};
SWrapperCfg wrapperCfg = {0};
int32_t code = -1;
char path[TSDB_FILENAME_LEN] = {0};
......@@ -184,12 +192,9 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
return -1;
}
dDebug("vgId:%d, create vnode req is received, tsma:%d", createReq.vgId, createReq.isTsma);
SVnodeCfg vnodeCfg = {0};
dDebug("vgId:%d, create vnode req is received, tsma:%d standby:%d", createReq.vgId, createReq.isTsma,
createReq.standby);
vmGenerateVnodeCfg(&createReq, &vnodeCfg);
SWrapperCfg wrapperCfg = {0};
vmGenerateWrapperCfg(pMgmt, &createReq, &wrapperCfg);
SVnodeObj *pVnode = vmAcquireVnode(pMgmt, createReq.vgId);
......@@ -218,9 +223,20 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
code = vmOpenVnode(pMgmt, &wrapperCfg, pImpl);
if (code != 0) {
dError("vgId:%d, failed to open vnode since %s", createReq.vgId, terrstr());
code = terrno;
goto _OVER;
}
if (createReq.isTsma) {
SMsgHead *smaMsg = createReq.pTsma;
uint32_t contLen = (uint32_t)(htonl(smaMsg->contLen) - sizeof(SMsgHead));
if (vnodeProcessCreateTSma(pImpl, POINTER_SHIFT(smaMsg, sizeof(SMsgHead)), contLen) < 0) {
dError("vgId:%d, failed to create tsma since %s", createReq.vgId, terrstr());
code = terrno;
goto _OVER;
};
}
code = vnodeStart(pImpl);
if (code != 0) {
dError("vgId:%d, failed to start sync since %s", createReq.vgId, terrstr());
......@@ -228,7 +244,10 @@ int32_t vmProcessCreateVnodeReq(SVnodeMgmt *pMgmt, SRpcMsg *pMsg) {
}
code = vmWriteVnodeListToFile(pMgmt);
if (code != 0) goto _OVER;
if (code != 0) {
code = terrno;
goto _OVER;
}
_OVER:
if (code != 0) {
......@@ -314,8 +333,9 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_DISPATCH, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TASK_RECOVER, vmPutNodeMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT_VNODE, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_REPLICA, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_ALTER_CONFIG, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_COMPACT, vmPutNodeMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_CREATE_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_DND_DROP_VNODE, vmPutNodeMsgToMgmtQueue, 0) == NULL) goto _OVER;
......
......@@ -61,7 +61,7 @@ int32_t mndTransAppendRedoAction(STrans *pTrans, STransAction *pAction);
int32_t mndTransAppendUndoAction(STrans *pTrans, STransAction *pAction);
void mndTransSetRpcRsp(STrans *pTrans, void *pCont, int32_t contLen);
void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *param, int32_t paramLen);
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb);
void mndTransSetDbName(STrans *pTrans, const char *dbname);
void mndTransSetSerial(STrans *pTrans);
int32_t mndTransPrepare(SMnode *pMnode, STrans *pTrans);
......
......@@ -36,7 +36,7 @@ SArray *mndBuildDnodesArray(SMnode *pMnode);
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray);
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *del1, SVnodeGid *del2);
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen, bool standby);
void *mndBuildDropVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen);
......
......@@ -261,7 +261,7 @@ void mndReleaseDb(SMnode *pMnode, SDbObj *pDb) {
sdbRelease(pSdb, pDb);
}
static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid) {
static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SVnodeGid *pVgid, bool standby) {
STransAction action = {0};
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgid->dnodeId);
......@@ -270,7 +270,7 @@ static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *p
mndReleaseDnode(pMnode, pDnode);
int32_t contLen = 0;
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen, standby);
if (pReq == NULL) return -1;
action.pCont = pReq;
......@@ -286,7 +286,7 @@ static int32_t mndAddCreateVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *p
return 0;
}
static int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup) {
static int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, tmsg_t msgType) {
STransAction action = {0};
action.epSet = mndGetVgroupEpset(pMnode, pVgroup);
......@@ -296,7 +296,7 @@ static int32_t mndAddAlterVnodeAction(SMnode *pMnode, STrans *pTrans, SDbObj *pD
action.pCont = pReq;
action.contLen = contLen;
action.msgType = TDMT_VND_ALTER_VNODE;
action.msgType = msgType;
if (mndTransAppendRedoAction(pTrans, &action) != 0) {
taosMemoryFree(pReq);
......@@ -388,7 +388,7 @@ static int32_t mndCheckDbCfg(SMnode *pMnode, SDbCfg *pCfg) {
}
terrno = 0;
return TSDB_CODE_SUCCESS;
return terrno;
}
static void mndSetDefaultDbCfg(SDbCfg *pCfg) {
......@@ -467,7 +467,7 @@ static int32_t mndSetCreateDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj
for (int32_t vn = 0; vn < pVgroup->replica; ++vn) {
SVnodeGid *pVgid = pVgroup->vnodeGid + vn;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid) != 0) {
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, pVgroup, pVgid, false) != 0) {
return -1;
}
}
......@@ -550,7 +550,7 @@ static int32_t mndCreateDb(SMnode *pMnode, SRpcMsg *pReq, SCreateDbReq *pCreate,
mDebug("trans:%d, used to create db:%s", pTrans->id, pCreate->db);
mndTransSetDbInfo(pTrans, &dbObj);
mndTransSetDbName(pTrans, dbObj.name);
if (mndSetCreateDbRedoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbUndoLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
if (mndSetCreateDbCommitLogs(pMnode, pTrans, &dbObj, pVgroups) != 0) goto _OVER;
......@@ -688,29 +688,37 @@ static int32_t mndSetDbCfgFromAlterDbReq(SDbObj *pDb, SAlterDbReq *pAlter) {
static int32_t mndSetAlterDbRedoLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
SSdbRaw *pRedoRaw = mndDbActionEncode(pOld);
if (pRedoRaw == NULL) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) return -1;
if (sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY) != 0) return -1;
if (mndTransAppendRedolog(pTrans, pRedoRaw) != 0) {
sdbFreeRaw(pRedoRaw);
return -1;
}
sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY);
return 0;
}
static int32_t mndSetAlterDbCommitLogs(SMnode *pMnode, STrans *pTrans, SDbObj *pOld, SDbObj *pNew) {
SSdbRaw *pCommitRaw = mndDbActionEncode(pNew);
if (pCommitRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) return -1;
if (sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY) != 0) return -1;
if (mndTransAppendCommitlog(pTrans, pCommitRaw) != 0) {
sdbFreeRaw(pCommitRaw);
return -1;
}
sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY);
return 0;
}
static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SVgObj *pVgroup, SArray *pArray) {
if (pVgroup->replica <= 0 || pVgroup->replica == pDb->cfg.replications) {
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup) != 0) {
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, pVgroup, TDMT_VND_ALTER_CONFIG) != 0) {
return -1;
}
} else {
SVgObj newVgroup = {0};
memcpy(&newVgroup, pVgroup, sizeof(SVgObj));
mndTransSetSerial(pTrans);
if (newVgroup.replica < pDb->cfg.replications) {
mInfo("db:%s, vgId:%d, will add 2 vnodes, vn:0 dnode:%d", pVgroup->dbName, pVgroup->vgId,
pVgroup->vnodeGid[0].dnodeId);
......@@ -720,9 +728,9 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
return -1;
}
newVgroup.replica = pDb->cfg.replications;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1]) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[2]) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[1], true) != 0) return -1;
if (mndAddCreateVnodeAction(pMnode, pTrans, pDb, &newVgroup, &newVgroup.vnodeGid[2], true) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
} else {
mInfo("db:%s, vgId:%d, will remove 2 vnodes", pVgroup->dbName, pVgroup->vgId);
......@@ -733,15 +741,18 @@ static int32_t mndBuildAlterVgroupAction(SMnode *pMnode, STrans *pTrans, SDbObj
return -1;
}
newVgroup.replica = pDb->cfg.replications;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup) != 0) return -1;
if (mndAddAlterVnodeAction(pMnode, pTrans, pDb, &newVgroup, TDMT_VND_ALTER_REPLICA) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del1, true) != 0) return -1;
if (mndAddDropVnodeAction(pMnode, pTrans, pDb, &newVgroup, &del2, true) != 0) return -1;
}
SSdbRaw *pVgRaw = mndVgroupActionEncode(&newVgroup);
if (pVgRaw == NULL) return -1;
if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) return -1;
if (sdbSetRawStatus(pVgRaw, SDB_STATUS_READY) != 0) return -1;
if (mndTransAppendCommitlog(pTrans, pVgRaw) != 0) {
sdbFreeRaw(pVgRaw);
return -1;
}
sdbSetRawStatus(pVgRaw, SDB_STATUS_READY);
}
return 0;
......@@ -774,18 +785,16 @@ static int32_t mndSetAlterDbRedoActions(SMnode *pMnode, STrans *pTrans, SDbObj *
}
static int32_t mndAlterDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pOld, SDbObj *pNew) {
int32_t code = -1;
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_RETRY, TRN_CONFLICT_DB, pReq);
if (pTrans == NULL) goto _OVER;
if (pTrans == NULL) return -1;
mDebug("trans:%d, used to alter db:%s", pTrans->id, pOld->name);
mndTransSetDbInfo(pTrans, pOld);
int32_t code = -1;
mndTransSetDbName(pTrans, pOld->name);
if (mndSetAlterDbRedoLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndSetAlterDbCommitLogs(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndSetAlterDbRedoActions(pMnode, pTrans, pOld, pNew) != 0) goto _OVER;
if (mndTransPrepare(pMnode, pTrans) != 0) goto _OVER;
code = 0;
_OVER:
......@@ -1040,7 +1049,7 @@ static int32_t mndDropDb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb) {
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop db:%s", pTrans->id, pDb->name);
mndTransSetDbInfo(pTrans, pDb);
mndTransSetDbName(pTrans, pDb->name);
if (mndSetDropDbRedoLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
if (mndSetDropDbCommitLogs(pMnode, pTrans, pDb) != 0) goto _OVER;
......
......@@ -426,7 +426,7 @@ static int32_t mndSetCreateSmaVgroupRedoActions(SMnode *pMnode, STrans *pTrans,
pVgroup->pTsma = pSmaReq;
int32_t contLen = 0;
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen);
void *pReq = mndBuildCreateVnodeReq(pMnode, pDnode, pDb, pVgroup, &contLen, false);
taosMemoryFreeClear(pSmaReq);
if (pReq == NULL) return -1;
......@@ -512,7 +512,7 @@ static int32_t mndCreateSma(SMnode *pMnode, SRpcMsg *pReq, SMCreateSmaReq *pCrea
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to create sma:%s", pTrans->id, pCreate->name);
mndTransSetDbInfo(pTrans, pDb);
mndTransSetDbName(pTrans, pDb->name);
mndTransSetSerial(pTrans);
if (mndSetCreateSmaRedoLogs(pMnode, pTrans, &smaObj) != 0) goto _OVER;
......@@ -757,7 +757,7 @@ static int32_t mndDropSma(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SSmaObj *p
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop sma:%s", pTrans->id, pSma->name);
mndTransSetDbInfo(pTrans, pDb);
mndTransSetDbName(pTrans, pDb->name);
if (mndSetDropSmaRedoLogs(pMnode, pTrans, pSma) != 0) goto _OVER;
if (mndSetDropSmaVgroupRedoLogs(pMnode, pTrans, pVgroup) != 0) goto _OVER;
......
......@@ -754,7 +754,7 @@ _OVER:
}
int32_t mndAddStbToTrans(SMnode *pMnode, STrans *pTrans, SDbObj *pDb, SStbObj *pStb) {
mndTransSetDbInfo(pTrans, pDb);
mndTransSetDbName(pTrans, pDb->name);
if (mndSetCreateStbRedoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
if (mndSetCreateStbUndoLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
if (mndSetCreateStbCommitLogs(pMnode, pTrans, pDb, pStb) != 0) return -1;
......@@ -1261,7 +1261,7 @@ static int32_t mndAlterStb(SMnode *pMnode, SRpcMsg *pReq, const SMAlterStbReq *p
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to alter stb:%s", pTrans->id, pAlter->name);
mndTransSetDbInfo(pTrans, pDb);
mndTransSetDbName(pTrans, pDb->name);
if (mndSetAlterStbRedoLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
if (mndSetAlterStbCommitLogs(pMnode, pTrans, pDb, &stbObj) != 0) goto _OVER;
......@@ -1407,7 +1407,7 @@ static int32_t mndDropStb(SMnode *pMnode, SRpcMsg *pReq, SDbObj *pDb, SStbObj *p
if (pTrans == NULL) goto _OVER;
mDebug("trans:%d, used to drop stb:%s", pTrans->id, pStb->name);
mndTransSetDbInfo(pTrans, pDb);
mndTransSetDbName(pTrans, pDb->name);
if (mndSetDropStbRedoLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
if (mndSetDropStbCommitLogs(pMnode, pTrans, pStb) != 0) goto _OVER;
......
......@@ -619,8 +619,8 @@ void mndTransSetCb(STrans *pTrans, ETrnFunc startFunc, ETrnFunc stopFunc, void *
pTrans->paramLen = paramLen;
}
void mndTransSetDbInfo(STrans *pTrans, SDbObj *pDb) {
memcpy(pTrans->dbname, pDb->name, TSDB_DB_FNAME_LEN);
void mndTransSetDbName(STrans *pTrans, const char *dbname) {
memcpy(pTrans->dbname, dbname, TSDB_DB_FNAME_LEN);
}
void mndTransSetSerial(STrans *pTrans) { pTrans->exec = TRN_EXEC_SERIAL; }
......
......@@ -51,9 +51,10 @@ int32_t mndInitVgroup(SMnode *pMnode) {
};
mndSetMsgHandle(pMnode, TDMT_DND_CREATE_VNODE_RSP, mndProcessCreateVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_VNODE_RSP, mndProcessAlterVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_REPLICA_RSP, mndProcessAlterVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_VND_ALTER_CONFIG_RSP, mndProcessAlterVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_DND_DROP_VNODE_RSP, mndProcessDropVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_VNODE_RSP, mndProcessCompactVnodeRsp);
mndSetMsgHandle(pMnode, TDMT_VND_COMPACT_RSP, mndProcessCompactVnodeRsp);
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndRetrieveVgroups);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_VGROUP, mndCancelGetNextVgroup);
......@@ -188,10 +189,10 @@ void mndReleaseVgroup(SMnode *pMnode, SVgObj *pVgroup) {
sdbRelease(pSdb, pVgroup);
}
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen) {
void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVgObj *pVgroup, int32_t *pContLen,
bool standby) {
SCreateVnodeReq createReq = {0};
createReq.vgId = pVgroup->vgId;
createReq.dnodeId = pDnode->id;
memcpy(createReq.db, pDb->name, TSDB_DB_FNAME_LEN);
createReq.dbUid = pDb->uid;
createReq.vgVersion = pVgroup->version;
......@@ -218,6 +219,7 @@ void *mndBuildCreateVnodeReq(SMnode *pMnode, SDnodeObj *pDnode, SDbObj *pDb, SVg
createReq.hashMethod = pDb->cfg.hashMethod;
createReq.numOfRetensions = pDb->cfg.numOfRetensions;
createReq.pRetensions = pDb->cfg.pRetensions;
createReq.standby = standby;
createReq.isTsma = pVgroup->isTsma;
createReq.pTsma = pVgroup->pTsma;
......@@ -276,7 +278,6 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_
alterReq.strict = pDb->cfg.strict;
alterReq.cacheLastRow = pDb->cfg.cacheLastRow;
alterReq.replica = pVgroup->replica;
alterReq.selfIndex = -1;
for (int32_t v = 0; v < pVgroup->replica; ++v) {
SReplica *pReplica = &alterReq.replicas[v];
......@@ -292,13 +293,6 @@ void *mndBuildAlterVnodeReq(SMnode *pMnode, SDbObj *pDb, SVgObj *pVgroup, int32_
mndReleaseDnode(pMnode, pVgidDnode);
}
#if 0
if (alterReq.selfIndex == -1) {
terrno = TSDB_CODE_MND_APP_ERROR;
return NULL;
}
#endif
int32_t contLen = tSerializeSAlterVnodeReq(NULL, 0, &alterReq);
if (contLen < 0) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
......@@ -510,7 +504,7 @@ _OVER:
taosArrayDestroy(pArray);
return code;
}
//--->
int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
......@@ -538,7 +532,7 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
SVnodeGid *pVgid = &pVgroup->vnodeGid[maxPos];
pVgid->dnodeId = pDnode->id;
pVgid->role = TAOS_SYNC_STATE_FOLLOWER;
pVgid->role = TAOS_SYNC_STATE_ERROR;
pDnode->numOfVnodes++;
mInfo("db:%s, vgId:%d, vnode_index:%d dnode:%d is added", pVgroup->dbName, pVgroup->vgId, maxPos, pVgid->dnodeId);
......@@ -549,16 +543,15 @@ int32_t mndAddVnodeToVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray) {
terrno = TSDB_CODE_MND_NO_ENOUGH_DNODES;
return -1;
}
//--->
int32_t mndRemoveVnodeFromVgroup(SMnode *pMnode, SVgObj *pVgroup, SArray *pArray, SVnodeGid *del1, SVnodeGid *del2) {
int32_t removedNum = 0;
taosArraySort(pArray, (__compar_fn_t)mndCompareDnodeVnodes);
for (int32_t i = 0; i < taosArrayGetSize(pArray); ++i) {
SDnodeObj *pDnode = taosArrayGet(pArray, i);
mDebug("dnode:%d, equivalent vnodes:%d", pDnode->id, pDnode->numOfVnodes);
}
int32_t removedNum = 0;
for (int32_t d = taosArrayGetSize(pArray) - 1; d >= 0; --d) {
SDnodeObj *pDnode = taosArrayGet(pArray, d);
......@@ -664,6 +657,7 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
int32_t numOfRows = 0;
SVgObj *pVgroup = NULL;
int32_t cols = 0;
int64_t curMs = taosGetTimestampMs();
SDbObj *pDb = NULL;
if (strlen(pShow->db) > 0) {
......@@ -703,12 +697,15 @@ static int32_t mndRetrieveVgroups(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *p
if (i < pVgroup->replica) {
colDataAppend(pColInfo, numOfRows, (const char *)&pVgroup->vnodeGid[i].dnodeId, false);
bool online = false;
SDnodeObj *pDnode = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
if (pDnode != NULL) {
online = mndIsDnodeOnline(pMnode, pDnode, curMs);
mndReleaseDnode(pMnode, pDnode);
}
char buf1[20] = {0};
SDnodeObj *pDnodeObj = mndAcquireDnode(pMnode, pVgroup->vnodeGid[i].dnodeId);
ASSERT(pDnodeObj != NULL);
bool isOffLine = !mndIsDnodeOnline(pMnode, pDnodeObj, taosGetTimestampMs());
const char *role = isOffLine ? "OFFLINE" : syncStr(pVgroup->vnodeGid[i].role);
const char *role = online ? syncStr(pVgroup->vnodeGid[i].role) : "OFFLINE";
STR_WITH_MAXSIZE_TO_VARSTR(buf1, role, pShow->pMeta->pSchemas[cols].bytes);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
......
......@@ -128,7 +128,7 @@ class MndTestTrans2 : public ::testing::Test {
mndTransSetCb(pTrans, TRANS_START_FUNC_TEST, TRANS_STOP_FUNC_TEST, param, strlen(param) + 1);
if (pDb != NULL) {
mndTransSetDbInfo(pTrans, pDb);
mndTransSetDbName(pTrans, pDb->name);
}
int32_t code = mndTransPrepare(pMnode, pTrans);
......@@ -201,7 +201,7 @@ class MndTestTrans2 : public ::testing::Test {
}
if (pDb != NULL) {
mndTransSetDbInfo(pTrans, pDb);
mndTransSetDbName(pTrans, pDb->name);
}
int32_t code = mndTransPrepare(pMnode, pTrans);
......
......@@ -35,7 +35,6 @@ target_sources(
"src/sma/smaTimeRange.c"
# tsdb
# "src/tsdb/tsdbTDBImpl.c"
"src/tsdb/tsdbCommit.c"
"src/tsdb/tsdbCommit2.c"
"src/tsdb/tsdbFile.c"
......@@ -45,7 +44,6 @@ target_sources(
"src/tsdb/tsdbMemTable2.c"
"src/tsdb/tsdbRead.c"
"src/tsdb/tsdbReadImpl.c"
# "src/tsdb/tsdbSma.c"
"src/tsdb/tsdbWrite.c"
"src/tsdb/tsdbSnapshot.c"
......
......@@ -68,6 +68,7 @@ void vnodeGetInfo(SVnode *pVnode, const char **dbname, int32_t *vgId);
int32_t vnodeSnapshotReaderOpen(SVnode *pVnode, SVSnapshotReader **ppReader, int64_t sver, int64_t ever);
int32_t vnodeSnapshotReaderClose(SVSnapshotReader *pReader);
int32_t vnodeSnapshotRead(SVSnapshotReader *pReader, const void **ppData, uint32_t *nData);
int32_t vnodeProcessCreateTSma(SVnode *pVnode, void *pCont, uint32_t contLen);
// meta
typedef struct SMeta SMeta; // todo: remove
......@@ -78,7 +79,7 @@ void metaReaderInit(SMetaReader *pReader, SMeta *pMeta, int32_t flags);
void metaReaderClear(SMetaReader *pReader);
int32_t metaGetTableEntryByUid(SMetaReader *pReader, tb_uid_t uid);
int32_t metaReadNext(SMetaReader *pReader);
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid);
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *tagVal);
typedef struct SMetaFltParam {
tb_uid_t suid;
......@@ -172,7 +173,9 @@ struct SVnodeCfg {
bool isHeap;
bool isWeak;
int8_t isTsma;
int8_t isRsma;
int8_t hashMethod;
int8_t standby;
STsdbCfg tsdbCfg;
SWalCfg walCfg;
SSyncCfg syncCfg;
......
......@@ -117,7 +117,7 @@ typedef struct {
} SSmaIdxKey;
// metaTable ==================
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void* pTagData, int8_t type, tb_uid_t uid,
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void* pTagData, int32_t nTagData, int8_t type, tb_uid_t uid,
STagIdxKey** ppTagIdxKey, int32_t* nTagIdxKey);
#ifndef META_REFACT
......
......@@ -32,14 +32,27 @@ extern "C" {
#define tsdbTrace(...) do { if (tsdbDebugFlag & DEBUG_TRACE) { taosPrintLog("TSDB ", DEBUG_TRACE, tsdbDebugFlag, __VA_ARGS__); }} while(0)
// clang-format on
typedef struct TSDBROW TSDBROW;
typedef struct TSDBKEY TSDBKEY;
typedef struct SDelOp SDelOp;
static int tsdbKeyCmprFn(const void *p1, const void *p2);
// tsdbMemTable2.c ==============================================================================================
typedef struct SMemTable SMemTable;
int32_t tsdbMemTableCreate2(STsdb *pTsdb, SMemTable **ppMemTable);
void tsdbMemTableDestroy2(SMemTable *pMemTable);
// tsdbMemTable ================
typedef struct STsdbRow STsdbRow;
typedef struct STbData STbData;
typedef struct STsdbMemTable STsdbMemTable;
typedef struct SMergeInfo SMergeInfo;
typedef struct STable STable;
int tsdbMemTableCreate(STsdb *pTsdb, STsdbMemTable **ppMemTable);
void tsdbMemTableDestroy(STsdb *pTsdb, STsdbMemTable *pMemTable);
void tsdbMemTableDestroy(STsdbMemTable *pMemTable);
int tsdbLoadDataFromCache(STsdb *pTsdb, STable *pTable, SSkipListIterator *pIter, TSKEY maxKey, int maxRowsToRead,
SDataCols *pCols, TKEY *filterKeys, int nFilterKeys, bool keepDup, SMergeInfo *pMergeInfo);
......@@ -845,6 +858,42 @@ static FORCE_INLINE int tsdbUnLockFS(STsdbFS *pFs) {
return 0;
}
struct TSDBROW {
int64_t version;
STSRow2 tsRow;
};
struct TSDBKEY {
int64_t version;
TSKEY ts;
};
struct SDelOp {
int64_t version;
TSKEY sKey; // included
TSKEY eKey; // included
SDelOp *pNext;
};
static FORCE_INLINE int tsdbKeyCmprFn(const void *p1, const void *p2) {
TSDBKEY *pKey1 = (TSDBKEY *)p1;
TSDBKEY *pKey2 = (TSDBKEY *)p2;
if (pKey1->ts < pKey2->ts) {
return -1;
} else if (pKey1->ts > pKey2->ts) {
return 1;
}
if (pKey1->version < pKey2->version) {
return -1;
} else if (pKey1->version > pKey2->version) {
return 1;
}
return 0;
}
#endif
#ifdef __cplusplus
......
......@@ -81,9 +81,10 @@ int32_t vnodeSyncCommit(SVnode* pVnode);
int32_t vnodeAsyncCommit(SVnode* pVnode);
// vnodeSync.c
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode);
int32_t vnodeSyncOpen(SVnode* pVnode, char* path);
void vnodeSyncStart(SVnode* pVnode);
void vnodeSyncClose(SVnode* pVnode);
void vnodeSyncAlter(SVnode* pVnode, SRpcMsg* pMsg);
#ifdef __cplusplus
}
......
......@@ -241,6 +241,8 @@ struct SVnode {
#define VND_RSMA1(vnd) ((vnd)->pSma->pRSmaTsdb1)
#define VND_RSMA2(vnd) ((vnd)->pSma->pRSmaTsdb2)
#define VND_RETENTIONS(vnd) (&(vnd)->config.tsdbCfg.retentions)
#define VND_IS_RSMA(v) ((v)->config.isRsma == 1)
#define VND_IS_TSMA(v) ((v)->config.isTsma == 1)
struct STbUidStore {
tb_uid_t suid;
......@@ -273,11 +275,6 @@ struct SSma {
#define SMA_RSMA_TSDB1(s) ((s)->pRSmaTsdb1)
#define SMA_RSMA_TSDB2(s) ((s)->pRSmaTsdb2)
static FORCE_INLINE bool vnodeIsRollup(SVnode* pVnode) {
SRetention* pRetention = &(pVnode->config.tsdbCfg.retentions[0]);
return (pRetention->freq > 0 && pRetention->keep > 0);
}
// sma
void smaHandleRes(void* pVnode, int64_t smaId, const SArray* data);
......
......@@ -30,7 +30,7 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
if (tEncodeI64(pCoder, pME->ctbEntry.ctime) < 0) return -1;
if (tEncodeI32(pCoder, pME->ctbEntry.ttlDays) < 0) return -1;
if (tEncodeI64(pCoder, pME->ctbEntry.suid) < 0) return -1;
if (tEncodeBinary(pCoder, pME->ctbEntry.pTags, kvRowLen(pME->ctbEntry.pTags)) < 0) return -1;
if (tEncodeTag(pCoder, (const STag *)pME->ctbEntry.pTags) < 0) return -1;
} else if (pME->type == TSDB_NORMAL_TABLE) {
if (tEncodeI64(pCoder, pME->ntbEntry.ctime) < 0) return -1;
if (tEncodeI32(pCoder, pME->ntbEntry.ttlDays) < 0) return -1;
......@@ -47,7 +47,6 @@ int metaEncodeEntry(SEncoder *pCoder, const SMetaEntry *pME) {
}
int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
uint32_t len;
if (tStartDecode(pCoder) < 0) return -1;
if (tDecodeI64(pCoder, &pME->version) < 0) return -1;
......@@ -62,7 +61,7 @@ int metaDecodeEntry(SDecoder *pCoder, SMetaEntry *pME) {
if (tDecodeI64(pCoder, &pME->ctbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ctbEntry.ttlDays) < 0) return -1;
if (tDecodeI64(pCoder, &pME->ctbEntry.suid) < 0) return -1;
if (tDecodeBinary(pCoder, &pME->ctbEntry.pTags, &len) < 0) return -1; // (TODO)
if (tDecodeTag(pCoder, (STag **)&pME->ctbEntry.pTags) < 0) return -1; // (TODO)
} else if (pME->type == TSDB_NORMAL_TABLE) {
if (tDecodeI64(pCoder, &pME->ntbEntry.ctime) < 0) return -1;
if (tDecodeI32(pCoder, &pME->ntbEntry.ttlDays) < 0) return -1;
......
......@@ -573,10 +573,23 @@ SArray *metaGetSmaTbUids(SMeta *pMeta) {
#endif
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t cid) {
const void *metaGetTableTagVal(SMetaEntry *pEntry, int16_t type, STagVal *val) {
ASSERT(pEntry->type == TSDB_CHILD_TABLE);
return tdGetKVRowValOfCol((const SKVRow)pEntry->ctbEntry.pTags, cid);
STag *tag = (STag *)pEntry->ctbEntry.pTags;
if (type == TSDB_DATA_TYPE_JSON){
if(tag->nTag == 0){
return NULL;
}
return tag;
}
bool find = tTagGet(tag, val);
if(!find){
return NULL;
}
return val;
}
typedef struct {
SMeta * pMeta;
TBC * pCur;
......@@ -609,7 +622,13 @@ int32_t metaFilteTableIds(SMeta *pMeta, SMetaFltParam *param, SArray *pUids) {
STagIdxKey *pKey = NULL;
int32_t nKey = 0;
ret = metaCreateTagIdxKey(pCursor->suid, pCursor->cid, param->val, pCursor->type,
int32_t nTagData = 0;
if(IS_VAR_DATA_TYPE(param->type)){
nTagData = strlen(param->val);
}else{
nTagData = tDataTypes[param->type].bytes;
}
ret = metaCreateTagIdxKey(pCursor->suid, pCursor->cid, param->val, nTagData, pCursor->type,
param->reverse ? INT64_MAX : INT64_MIN, &pKey, &nKey);
if (ret != 0) {
goto END;
......@@ -651,4 +670,4 @@ END:
taosMemoryFree(pCursor);
return ret;
}
}
\ No newline at end of file
......@@ -31,9 +31,9 @@ int metaCreateSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
int vLen = 0;
const void *pKey = NULL;
const void *pVal = NULL;
void * pBuf = NULL;
void *pBuf = NULL;
int32_t szBuf = 0;
void * p = NULL;
void *p = NULL;
SMetaReader mr = {0};
// validate req
......@@ -87,7 +87,7 @@ int metaDropSTable(SMeta *pMeta, int64_t verison, SVDropStbReq *pReq) {
}
// drop all child tables
TBC * pCtbIdxc = NULL;
TBC *pCtbIdxc = NULL;
SArray *pArray = taosArrayInit(8, sizeof(tb_uid_t));
tdbTbcOpen(pMeta->pCtbIdx, &pCtbIdxc, &pMeta->txn);
......@@ -142,8 +142,8 @@ _exit:
int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
SMetaEntry oStbEntry = {0};
SMetaEntry nStbEntry = {0};
TBC * pUidIdxc = NULL;
TBC * pTbDbc = NULL;
TBC *pUidIdxc = NULL;
TBC *pTbDbc = NULL;
const void *pData;
int nData;
int64_t oversion;
......@@ -262,7 +262,7 @@ _err:
}
int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUids) {
void * pData = NULL;
void *pData = NULL;
int nData = 0;
int rc = 0;
tb_uid_t uid;
......@@ -288,7 +288,7 @@ int metaDropTable(SMeta *pMeta, int64_t version, SVDropTbReq *pReq, SArray *tbUi
}
static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
void * pData = NULL;
void *pData = NULL;
int nData = 0;
int rc = 0;
int64_t version;
......@@ -324,14 +324,14 @@ static int metaDropTableByUid(SMeta *pMeta, tb_uid_t uid, int *type) {
}
static int metaAlterTableColumn(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
void * pVal = NULL;
void *pVal = NULL;
int nVal = 0;
const void * pData = NULL;
const void *pData = NULL;
int nData = 0;
int ret = 0;
tb_uid_t uid;
int64_t oversion;
SSchema * pColumn = NULL;
SSchema *pColumn = NULL;
SMetaEntry entry = {0};
SSchemaWrapper *pSchema;
int c;
......@@ -479,7 +479,7 @@ _err:
static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pAlterTbReq) {
SMetaEntry ctbEntry = {0};
SMetaEntry stbEntry = {0};
void * pVal = NULL;
void *pVal = NULL;
int nVal = 0;
int ret;
int c;
......@@ -510,7 +510,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
oversion = *(int64_t *)pData;
// search table.db
TBC * pTbDbc = NULL;
TBC *pTbDbc = NULL;
SDecoder dc1 = {0};
SDecoder dc2 = {0};
......@@ -534,7 +534,7 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
metaDecodeEntry(&dc2, &stbEntry);
SSchemaWrapper *pTagSchema = &stbEntry.stbEntry.schemaTag;
SSchema * pColumn = NULL;
SSchema *pColumn = NULL;
int32_t iCol = 0;
for (;;) {
pColumn = NULL;
......@@ -563,29 +563,39 @@ static int metaUpdateTableTagVal(SMeta *pMeta, int64_t version, SVAlterTbReq *pA
}
memcpy((void *)ctbEntry.ctbEntry.pTags, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal);
} else {
SKVRowBuilder kvrb = {0};
const SKVRow pOldTag = (const SKVRow)ctbEntry.ctbEntry.pTags;
SKVRow pNewTag = NULL;
tdInitKVRowBuilder(&kvrb);
const STag *pOldTag = (const STag *)ctbEntry.ctbEntry.pTags;
STag *pNewTag = NULL;
SArray *pTagArray = taosArrayInit(pTagSchema->nCols, sizeof(STagVal));
if (!pTagArray) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
for (int32_t i = 0; i < pTagSchema->nCols; i++) {
SSchema *pCol = &pTagSchema->pSchema[i];
if (iCol == i) {
tdAddColToKVRow(&kvrb, pCol->colId, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal);
STagVal val = {0};
val.type = pCol->type;
val.cid = pCol->colId;
if (IS_VAR_DATA_TYPE(pCol->type)) {
val.pData = pAlterTbReq->pTagVal;
val.nData = pAlterTbReq->nTagVal;
} else {
memcpy(&val.i64, pAlterTbReq->pTagVal, pAlterTbReq->nTagVal);
}
taosArrayPush(pTagArray, &val);
} else {
void *p = tdGetKVRowValOfCol(pOldTag, pCol->colId);
if (p) {
if (IS_VAR_DATA_TYPE(pCol->type)) {
tdAddColToKVRow(&kvrb, pCol->colId, p, varDataTLen(p));
} else {
tdAddColToKVRow(&kvrb, pCol->colId, p, pCol->bytes);
}
STagVal val = {0};
if (tTagGet(pOldTag, &val)) {
taosArrayPush(pTagArray, &val);
}
}
}
ctbEntry.ctbEntry.pTags = tdGetKVRowFromBuilder(&kvrb);
tdDestroyKVRowBuilder(&kvrb);
if ((terrno = tTagNew(pTagArray, pTagSchema->version, false, &pNewTag)) < 0) {
taosArrayDestroy(pTagArray);
goto _err;
}
ctbEntry.ctbEntry.pTags = (uint8_t *)pNewTag;
taosArrayDestroy(pTagArray);
}
// save to table.db
......@@ -639,8 +649,8 @@ int metaAlterTable(SMeta *pMeta, int64_t version, SVAlterTbReq *pReq) {
static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
STbDbKey tbDbKey;
void * pKey = NULL;
void * pVal = NULL;
void *pKey = NULL;
void *pVal = NULL;
int kLen = 0;
int vLen = 0;
SEncoder coder = {0};
......@@ -721,17 +731,17 @@ static int metaUpdateCtbIdx(SMeta *pMeta, const SMetaEntry *pME) {
return tdbTbInsert(pMeta->pCtbIdx, &ctbIdxKey, sizeof(ctbIdxKey), NULL, 0, &pMeta->txn);
}
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int8_t type, tb_uid_t uid,
int metaCreateTagIdxKey(tb_uid_t suid, int32_t cid, const void *pTagData, int32_t nTagData, int8_t type, tb_uid_t uid,
STagIdxKey **ppTagIdxKey, int32_t *nTagIdxKey) {
int32_t nTagData = 0;
if (pTagData) {
if (IS_VAR_DATA_TYPE(type)) {
nTagData = varDataTLen(pTagData);
} else {
nTagData = tDataTypes[type].bytes;
}
}
// int32_t nTagData = 0;
// if (pTagData) {
// if (IS_VAR_DATA_TYPE(type)) {
// nTagData = varDataTLen(pTagData);
// } else {
// nTagData = tDataTypes[type].bytes;
// }
// }
*nTagIdxKey = sizeof(STagIdxKey) + nTagData + sizeof(tb_uid_t);
*ppTagIdxKey = (STagIdxKey *)taosMemoryMalloc(*nTagIdxKey);
......@@ -755,14 +765,15 @@ static void metaDestroyTagIdxKey(STagIdxKey *pTagIdxKey) {
}
static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
void * pData = NULL;
void *pData = NULL;
int nData = 0;
STbDbKey tbDbKey = {0};
SMetaEntry stbEntry = {0};
STagIdxKey * pTagIdxKey = NULL;
STagIdxKey *pTagIdxKey = NULL;
int32_t nTagIdxKey;
const SSchema *pTagColumn; // = &stbEntry.stbEntry.schema.pSchema[0];
const void * pTagData = NULL; //
const void *pTagData = NULL; //
int32_t nTagData = 0;
SDecoder dc = {0};
// get super table
......@@ -775,7 +786,21 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
metaDecodeEntry(&dc, &stbEntry);
pTagColumn = &stbEntry.stbEntry.schemaTag.pSchema[0];
pTagData = tdGetKVRowValOfCol((const SKVRow)pCtbEntry->ctbEntry.pTags, pTagColumn->colId);
STagVal tagVal = {.cid = pTagColumn->colId};
if (pTagColumn->type != TSDB_DATA_TYPE_JSON) {
tTagGet((const STag *)pCtbEntry->ctbEntry.pTags, &tagVal);
if (IS_VAR_DATA_TYPE(pTagColumn->type)) {
pTagData = tagVal.pData;
nTagData = (int32_t)tagVal.nData;
} else {
pTagData = &(tagVal.i64);
nTagData = tDataTypes[pTagColumn->type].bytes;
}
} else {
// pTagData = pCtbEntry->ctbEntry.pTags;
// nTagData = ((const STag *)pCtbEntry->ctbEntry.pTags)->len;
}
// update tag index
#ifdef USE_INVERTED_INDEX
......@@ -790,8 +815,8 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
int ret = indexPut((SIndex *)pMeta->pTagIvtIdx, tmGroup, tuid);
indexMultiTermDestroy(tmGroup);
#else
if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, pTagColumn->type, pCtbEntry->uid,
&pTagIdxKey, &nTagIdxKey) < 0) {
if (metaCreateTagIdxKey(pCtbEntry->ctbEntry.suid, pTagColumn->colId, pTagData, nTagData, pTagColumn->type,
pCtbEntry->uid, &pTagIdxKey, &nTagIdxKey) < 0) {
return -1;
}
tdbTbInsert(pMeta->pTagIdx, pTagIdxKey, nTagIdxKey, NULL, 0, &pMeta->txn);
......@@ -804,7 +829,7 @@ static int metaUpdateTagIdx(SMeta *pMeta, const SMetaEntry *pCtbEntry) {
static int metaSaveToSkmDb(SMeta *pMeta, const SMetaEntry *pME) {
SEncoder coder = {0};
void * pVal = NULL;
void *pVal = NULL;
int vLen = 0;
int rcode = 0;
SSkmDbKey skmDbKey = {0};
......
......@@ -104,7 +104,7 @@ int32_t smaOpen(SVnode *pVnode) {
taosThreadMutexInit(&pSma->mutex, NULL);
pSma->locked = false;
if (vnodeIsRollup(pVnode)) {
if (VND_IS_RSMA(pVnode)) {
STsdbKeepCfg keepCfg = {0};
for (int i = 0; i < TSDB_RETENTION_MAX; ++i) {
if (i == TSDB_RETENTION_L0) {
......
......@@ -238,7 +238,7 @@ static void tsdbStartCommit(STsdb *pRepo) {
static void tsdbEndCommit(STsdb *pTsdb, int eno) {
tsdbEndFSTxn(pTsdb);
tsdbMemTableDestroy(pTsdb, pTsdb->imem);
tsdbMemTableDestroy(pTsdb->imem);
pTsdb->imem = NULL;
tsdbInfo("vgId:%d commit over, %s", REPO_ID(pTsdb), (eno == TSDB_CODE_SUCCESS) ? "succeed" : "failed");
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
\ No newline at end of file
......@@ -333,7 +333,7 @@ static void setQueryTimewindow(STsdbReadHandle* pTsdbReadHandle, SQueryTableData
}
static STsdb* getTsdbByRetentions(SVnode* pVnode, STsdbReadHandle* pReadHandle, TSKEY winSKey, SRetention* retentions) {
if (vnodeIsRollup(pVnode)) {
if (VND_IS_RSMA(pVnode)) {
int level = 0;
int64_t now = taosGetTimestamp(pVnode->config.tsdbCfg.precision);
......
此差异已折叠。
此差异已折叠。
......@@ -57,6 +57,7 @@ int vnodeEncodeConfig(const void *pObj, SJson *pJson) {
if (tjsonAddIntegerToObject(pJson, "isHeap", pCfg->isHeap) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "isWeak", pCfg->isWeak) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "isTsma", pCfg->isTsma) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "isRsma", pCfg->isRsma) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "precision", pCfg->tsdbCfg.precision) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "update", pCfg->tsdbCfg.update) < 0) return -1;
if (tjsonAddIntegerToObject(pJson, "compression", pCfg->tsdbCfg.compression) < 0) return -1;
......@@ -133,6 +134,8 @@ int vnodeDecodeConfig(const SJson *pJson, void *pObj) {
if(code < 0) return -1;
tjsonGetNumberValue(pJson, "isTsma", pCfg->isTsma, code);
if(code < 0) return -1;
tjsonGetNumberValue(pJson, "isRsma", pCfg->isRsma, code);
if(code < 0) return -1;
tjsonGetNumberValue(pJson, "precision", pCfg->tsdbCfg.precision, code);
if(code < 0) return -1;
tjsonGetNumberValue(pJson, "update", pCfg->tsdbCfg.update, code);
......
......@@ -230,7 +230,7 @@ int vnodeCommit(SVnode *pVnode) {
return -1;
}
if(vnodeIsRollup(pVnode)) {
if (VND_IS_RSMA(pVnode)) {
if (tsdbCommit(VND_RSMA0(pVnode)) < 0) {
ASSERT(0);
return -1;
......@@ -250,7 +250,6 @@ int vnodeCommit(SVnode *pVnode) {
}
}
if (tqCommit(pVnode->pTq) < 0) {
ASSERT(0);
return -1;
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册