提交 62f3adf7 编写于 作者: D dapan1121

Merge branch '3.0' into fix/TD-17819

......@@ -4,9 +4,6 @@
[submodule "src/connector/hivemq-tdengine-extension"]
path = src/connector/hivemq-tdengine-extension
url = git@github.com:taosdata/hivemq-tdengine-extension.git
[submodule "deps/jemalloc"]
path = deps/jemalloc
url = https://github.com/jemalloc/jemalloc
[submodule "deps/TSZ"]
path = deps/TSZ
url = https://github.com/taosdata/TSZ.git
......
......@@ -84,6 +84,12 @@ ELSE ()
ENDIF ()
ENDIF ()
option(
JEMALLOC_ENABLED
"If build with jemalloc"
OFF
)
option(
BUILD_SANITIZER
"If build sanitizer"
......
# jemalloc
ExternalProject_Add(jemalloc
GIT_REPOSITORY https://github.com/jemalloc/jemalloc.git
GIT_TAG 5.3.0
SOURCE_DIR "${TD_CONTRIB_DIR}/jemalloc"
BINARY_DIR ""
CONFIGURE_COMMAND ""
BUILD_COMMAND ""
INSTALL_COMMAND ""
TEST_COMMAND ""
GIT_SHALLOW true
GIT_PROGRESS true
)
......@@ -27,6 +27,10 @@ else ()
cat("${TD_SUPPORT_DIR}/taosadapter_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif()
if(TD_LINUX_64 AND JEMALLOC_ENABLED)
cat("${TD_SUPPORT_DIR}/jemalloc_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif()
# pthread
if(${BUILD_PTHREAD})
cat("${TD_SUPPORT_DIR}/pthread_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
......@@ -399,6 +403,18 @@ if(${BUILD_ADDR2LINE})
endif(NOT ${TD_WINDOWS})
endif(${BUILD_ADDR2LINE})
# jemalloc
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
include(ExternalProject)
ExternalProject_Add(jemalloc
PREFIX "jemalloc"
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/jemalloc
BUILD_IN_SOURCE 1
CONFIGURE_COMMAND ./autogen.sh COMMAND ./configure --prefix=${CMAKE_BINARY_DIR}/build/
BUILD_COMMAND ${MAKE}
)
INCLUDE_DIRECTORIES(${CMAKE_BINARY_DIR}/build/include)
ENDIF ()
# ================================================================================================
# Build test
......
......@@ -111,7 +111,7 @@ TDengine REST API 详情请参考[官方文档](/reference/rest-api/)。
这条命令很快完成 1 亿条记录的插入。具体时间取决于硬件性能。
taosBenchmark 命令本身带有很多选项,配置表的数目、记录条数等等,您可以设置不同参数进行体验,请执行 `taosBenchmark --help` 详细列出。taosBenchmark 详细使用方法请参照 [taosBenchmark 参考手册](../reference/taosbenchmark)
taosBenchmark 命令本身带有很多选项,配置表的数目、记录条数等等,您可以设置不同参数进行体验,请执行 `taosBenchmark --help` 详细列出。taosBenchmark 详细使用方法请参照 [taosBenchmark 参考手册](../../reference/taosbenchmark)
## 体验查询
......
......@@ -245,7 +245,7 @@ select * from t;
Query OK, 2 row(s) in set (0.003128s)
```
除执行 SQL 语句外,系统管理员还可以从 TDengine CLI 进行检查系统运行状态、添加删除用户账号等操作。TDengine CLI 连同应用驱动也可以独立安装在 Linux 或 Windows 机器上运行,更多细节请参考 [这里](../reference/taos-shell/)
除执行 SQL 语句外,系统管理员还可以从 TDengine CLI 进行检查系统运行状态、添加删除用户账号等操作。TDengine CLI 连同应用驱动也可以独立安装在 Linux 或 Windows 机器上运行,更多细节请参考 [这里](../../reference/taos-shell/)
## 使用 taosBenchmark 体验写入速度
......
......@@ -8,17 +8,17 @@ description: "创建、删除数据库,查看、修改数据库参数"
```sql
CREATE DATABASE [IF NOT EXISTS] db_name [database_options]
database_options:
database_option ...
database_option: {
BUFFER value
| CACHEMODEL {'none' | 'last_row' | 'last_value' | 'both'}
| CACHESIZE value
| COMP {0 | 1 | 2}
| DURATION value
| FSYNC value
| WAL_FSYNC_PERIOD value
| MAXROWS value
| MINROWS value
| KEEP value
......@@ -28,7 +28,7 @@ database_option: {
| REPLICA value
| RETENTIONS ingestion_duration:keep_duration ...
| STRICT {'off' | 'on'}
| WAL {1 | 2}
| WAL_LEVEL {1 | 2}
| VGROUPS value
| SINGLE_STABLE {0 | 1}
| WAL_RETENTION_PERIOD value
......@@ -46,7 +46,7 @@ database_option: {
- last_row:表示缓存子表最近一行数据。这将显著改善 LAST_ROW 函数的性能表现。
- last_value:表示缓存子表每一列的最近的非 NULL 值。这将显著改善无特殊影响(WHERE、ORDER BY、GROUP BY、INTERVAL)下的 LAST 函数的性能表现。
- both:表示同时打开缓存最近行和列功能。
- CACHESIZE:表示缓存子表最近数据的内存大小。默认为 1 ,范围是[1, 65536],单位是 MB。
- CACHESIZE:表示每个 vnode 中用于缓存子表最近数据的内存大小。默认为 1 ,范围是[1, 65536],单位是 MB。
- COMP:表示数据库文件压缩标志位,缺省值为 2,取值范围为 [0, 2]。
- 0:表示不压缩。
- 1:表示一阶段压缩。
......
......@@ -218,7 +218,7 @@ GROUP BY 子句中的表达式可以包含表或视图中的任何列,这些
PARTITION BY 子句是 TDengine 特色语法,按 part_list 对数据进行切分,在每个切分的分片中进行计算。
详见 [TDengine 特色查询](./distinguished)
详见 [TDengine 特色查询](../distinguished)
## ORDER BY
......
此差异已折叠。
......@@ -40,7 +40,6 @@ ALTER ALL DNODES dnode_option
dnode_option: {
'resetLog'
| 'resetQueryCache'
| 'balance' value
| 'monitor' value
| 'debugFlag' value
......
......@@ -19,7 +19,7 @@ library_path:包含UDF函数实现的动态链接库的绝对路径,是在
OUTPUTTYPE:标识此函数的返回类型。
BUFSIZE:中间结果的缓冲区大小,单位是字节。不设置则默认为0。最大不可超过512字节。
关于如何开发自定义函数,请参考 [UDF使用说明](../develop/udf)
关于如何开发自定义函数,请参考 [UDF使用说明](../../develop/udf)
## 删除自定义函数
......
---
sidebar_label: taosX
title: 使用 taosX 在集群间复制数据
---
\ No newline at end of file
......@@ -10,7 +10,7 @@ import TabItem from "@theme/TabItem";
## 安装
关于安装,请参考 [使用安装包立即开始](../get-started/package)
关于安装,请参考 [使用安装包立即开始](../../get-started/package)
......
......@@ -5,33 +5,28 @@ title: 容量规划
使用 TDengine 来搭建一个物联网大数据平台,计算资源、存储资源需要根据业务场景进行规划。下面分别讨论系统运行所需要的内存、CPU 以及硬盘空间。
## 内存需求
## 服务端内存需求
每个 Database 可以创建固定数目的 vgroup,默认与 CPU 核数相同,可通过 maxVgroupsPerDb 配置;vgroup 中的每个副本会是一个 vnode;每个 vnode 会占用固定大小的内存(大小与数据库的配置参数 blocks 和 cache 有关);每个 Table 会占用与标签总长度有关的内存;此外,系统会有一些固定的内存开销。因此,每个 DB 需要的系统内存可通过如下公式计算
每个 Database 可以创建固定数目的 vgroup,默认 2 个 vgroup,在创建数据库时可以通过`vgroups <num>`参数来指定,其副本数由参数`replica <num>`指定。vgroup 中的每个副本会是一个 vnode;所以每个数据库占用的内存由以下几个参数决定
```
Database Memory Size = maxVgroupsPerDb * replica * (blocks * cache + 10MB) + numOfTables * (tagSizePerTable + 0.5KB)
```
- vgroups
- replica
- buffer
- pages
- pagesize
- cachesize
示例:假设 maxVgroupPerDB 是缺省值 64,cache 是缺省大小 16M, blocks 是缺省值 6,并且一个 DB 中有 10 万张表,单副本,标签总长度是 256 字节,则这个 DB 总的内存需求为:64 \* 1 \* (16 \* 6 + 10) + 100000 \* (0.25 + 0.5) / 1000 = 6792M
关于这些参数的详细说明请参考 [数据库管理](../../taos-sql/database)
在实际的系统运维中,我们通常会更关心 TDengine 服务进程(taosd)会占用的内存量。
一个数据库所需要的内存大小等于
```
taosd 内存总量 = vnode 内存 + mnode 内存 + 查询内存
vgroups * replica * (buffer + pages * pagesize + cachesize)
```
其中:
1. “vnode 内存”指的是集群中所有的 Database 存储分摊到当前 taosd 节点上所占用的内存资源。可以按上文“Database Memory Size”计算公式估算每个 DB 的内存占用量进行加总,再按集群中总共的 TDengine 节点数做平均(如果设置为多副本,则还需要乘以对应的副本倍数)。
2. “mnode 内存”指的是集群中管理节点所占用的资源。如果一个 taosd 节点上分布有 mnode 管理节点,则内存消耗还需要增加“0.2KB \* 集群中数据表总数”。
3. “查询内存”指的是服务端处理查询请求时所需要占用的内存。单条查询语句至少会占用“0.2KB \* 查询涉及的数据表总数”的内存量。
但要注意的是这些内存并不需要由单一服务器提供,而是由整个集群中所有数据节点共同负担,相当于由这些数据节点所在的服务器共同负担。如果集群中有不止一个数据库,则所需内存要累加。更复杂的场景是如果集群中的数据节点并非在最初就一次性全部建立,而是随着使用中系统负载的增加逐步增加服务器并增加数据节点,则新创建的数据库会导致新旧数据节点上的负载并不均衡,此时简单的理论计算并不能直接使用,要结合各数据节点的负载情况。
注意:以上内存估算方法,主要讲解了系统的“必须内存需求”,而不是“内存总数上限”。在实际运行的生产环境中,由于操作系统缓存、资源管理调度等方面的原因,内存规划应当在估算结果的基础上保留一定冗余,以维持系统状态和系统性能的稳定性。并且,生产环境通常会配置系统资源的监控工具,以便及时发现硬件资源的紧缺情况。
最后,如果内存充裕,可以考虑加大 Blocks 的配置,这样更多数据将保存在内存里,提高写入和查询速度。
### 客户端内存需求
## 客户端内存需求
客户端应用采用 taosc 客户端驱动连接服务端,会有内存需求的开销。
......@@ -56,7 +51,7 @@ CPU 的需求取决于如下两方面:
- **数据插入** TDengine 单核每秒能至少处理一万个插入请求。每个插入请求可以带多条记录,一次插入一条记录与插入 10 条记录,消耗的计算资源差别很小。因此每次插入,条数越大,插入效率越高。如果一个插入请求带 200 条以上记录,单核就能达到每秒插入 100 万条记录的速度。但对前端数据采集的要求越高,因为需要缓存记录,然后一批插入。
- **查询需求** TDengine 提供高效的查询,但是每个场景的查询差异很大,查询频次变化也很大,难以给出客观数字。需要用户针对自己的场景,写一些查询语句,才能确定。
因此仅对数据插入而言,CPU 是可以估算出来的,但查询所耗的计算资源无法估算。在实际运过程中,不建议 CPU 使用率超过 50%,超过后,需要增加新的节点,以获得更多计算资源。
因此仅对数据插入而言,CPU 是可以估算出来的,但查询所耗的计算资源无法估算。在实际运过程中,不建议 CPU 使用率超过 50%,超过后,需要增加新的节点,以获得更多计算资源。
## 存储需求
......@@ -77,5 +72,3 @@ Raw DataSize = numOfTables * rowSizePerTable * rowsPerTable
根据上面的内存、CPU、存储的预估,就可以知道整个系统需要多少核、多少内存、多少存储空间。如果数据副本数不为 1,总需求量需要再乘以副本数。
因为 TDengine 具有很好的水平扩展能力,根据总量,再根据单个物理机或虚拟机的资源,就可以轻松决定需要购置多少台物理机或虚拟机了。
**立即计算 CPU、内存、存储,请参见:[资源估算方法](https://www.taosdata.com/config/config.html)。**
......@@ -4,25 +4,27 @@ title: 容错和灾备
## 容错
TDengine 支持**WAL**(Write Ahead Log)机制,实现数据的容错能力,保证数据的高可用。
TDengine 支持 **WAL**(Write Ahead Log)机制,实现数据的容错能力,保证数据的高可用。
TDengine 接收到应用的请求数据包时,先将请求的原始数据包写入数据库日志文件,等数据成功写入数据库数据文件后,再删除相应的 WAL。这样保证了 TDengine 能够在断电等因素导致的服务重启时从数据库日志文件中恢复数据,避免数据的丢失。
涉及的系统配置参数有两个:
- walLevel:WAL 级别,0:不写 WAL; 1:写 WAL, 但不执行 fsync; 2:写 WAL, 而且执行 fsync
- fsync:当 walLevel 设置为 2 时,执行 fsync 的周期。设置为 0,表示每次写入,立即执行 fsync。
- wal_level:WAL 级别,1:写 WAL,但不执行 fsync。2:写 WAL,而且执行 fsync。默认值为 1
- wal_fsync_period:当 wal_level 设置为 2 时,执行 fsync 的周期。设置为 0,表示每次写入,立即执行 fsync。
如果要 100%的保证数据不丢失,需要将 walLevel 设置为 2,fsync 设置为 0。这时写入速度将会下降。但如果应用侧启动的写数据的线程数达到一定的数量(超过 50),那么写入数据的性能也会很不错,只会比 fsync 设置为 3000 毫秒下降 30%左右。
如果要 100%的保证数据不丢失,需要将 wal_level 设置为 2,wal_fsync_period 设置为 0。这时写入速度将会下降。但如果应用侧启动的写数据的线程数达到一定的数量(超过 50),那么写入数据的性能也会很不错,只会比 wal_fsync_period 设置为 3000 毫秒下降 30%左右。
## 灾备
TDengine 的集群通过多个副本的机制,来提供系统的高可用性,实现灾备能力。
TDengine 的集群通过多个副本的机制,来提供系统的高可用性,同时具备一定的灾备能力。
TDengine 集群是由 mnode 负责管理的,为保证 mnode 的高可靠,可以配置多个 mnode 副本,副本数由系统配置参数 numOfMnodes 决定,为了支持高可靠,需要设置大于 1。为保证元数据的强一致性,mnode 副本之间通过同步方式进行数据复制,保证了元数据的强一致性。
TDengine 集群是由 mnode 负责管理的,为保证 mnode 的高可靠,可以配置 三个 mnode 副本。为保证元数据的强一致性,mnode 副本之间通过同步方式进行数据复制,保证了元数据的强一致性。
TDengine 集群中的时序数据的副本数是与数据库关联的,一个集群里可以有多个数据库,每个数据库可以配置不同的副本数。创建数据库时,通过参数 replica 指定副本数。为了支持高可靠,需要设置副本数大于 1
TDengine 集群中的时序数据的副本数是与数据库关联的,一个集群里可以有多个数据库,每个数据库可以配置不同的副本数。创建数据库时,通过参数 replica 指定副本数。为了支持高可靠,需要设置副本数为 3
TDengine 集群的节点数必须大于等于副本数,否则创建表时将报错。
当 TDengine 集群中的节点部署在不同的物理机上,并设置多个副本数时,就实现了系统的高可靠性,无需再使用其他软件或工具。TDengine 企业版还可以将副本部署在不同机房,从而实现异地容灾。
另外一种灾备方式是通过 `taosX` 将一个 TDengine 集群的数据同步复制到物理上位于不同数据中心的另一个 TDengine 集群。其详细使用方法请参考 [taosX 参考手册](../../reference/taosX)
---
title: 用户管理
---
系统管理员可以在 CLI 界面里添加、删除用户,也可以修改密码。CLI 里 SQL 语法如下:
```sql
CREATE USER <user_name> PASS <'password'>;
```
创建用户,并指定用户名和密码,密码需要用单引号引起来,单引号为英文半角
```sql
DROP USER <user_name>;
```
删除用户,限 root 用户使用
```sql
ALTER USER <user_name> PASS <'password'>;
```
修改用户密码,为避免被转换为小写,密码需要用单引号引用,单引号为英文半角
```sql
ALTER USER <user_name> PRIVILEGE <write|read>;
```
修改用户权限为:write 或 read,不需要添加单引号
说明:系统内共有 super/write/read 三种权限级别,但目前不允许通过 alter 指令把 super 权限赋予用户。
```sql
SHOW USERS;
```
显示所有用户
:::note
SQL 语法中,< >表示需要用户输入的部分,但请不要输入< >本身。
:::
---
title: 系统连接、任务查询管理
---
系统管理员可以从 CLI 查询系统的连接、正在进行的查询、流式计算,并且可以关闭连接、停止正在进行的查询和流式计算。
## 显示数据库的连接
```sql
SHOW CONNECTIONS;
```
其结果中的一列显示 ip:port, 为连接的 IP 地址和端口号。
## 强制关闭数据库连接
```sql
KILL CONNECTION <connection-id>;
```
其中的 connection-id 是 SHOW CONNECTIONS 中显示的第一列的数字。
## 显示数据查询
```sql
SHOW QUERIES;
```
其中第一列显示的以冒号隔开的两个数字为 query-id,为发起该 query 应用连接的 connection-id 和查询次数。
## 强制关闭数据查询
```sql
KILL QUERY <query-id>;
```
其中 query-id 是 SHOW QUERIES 中显示的 connection-id:query-no 字串,如“105:2”,拷贝粘贴即可。
## 显示连续查询
```sql
SHOW STREAMS;
```
其中第一列显示的以冒号隔开的两个数字为 stream-id, 为启动该 stream 应用连接的 connection-id 和发起 stream 的次数。
## 强制关闭连续查询
```sql
KILL STREAM <stream-id>;
```
其中的 stream-id 是 SHOW STREAMS 中显示的 connection-id:stream-no 字串,如 103:2,拷贝粘贴即可。
......@@ -41,10 +41,8 @@ extern int32_t tsCompressMsgSize;
extern int32_t tsCompressColData;
extern int32_t tsMaxNumOfDistinctResults;
extern int32_t tsCompatibleModel;
extern bool tsEnableSlaveQuery;
extern bool tsPrintAuth;
extern int64_t tsTickPerMin[3];
extern int32_t tsCountAlwaysReturnValue;
// multi-process
......@@ -92,8 +90,6 @@ extern uint16_t tsTelemPort;
extern int32_t tsQueryBufferSize; // maximum allowed usage buffer size in MB for each data node during query processing
extern int64_t tsQueryBufferSizeBytes; // maximum allowed usage buffer size in byte for each data node
extern bool tsRetrieveBlockingModel; // retrieve threads will be blocked
extern bool tsKeepOriginalColumnName;
extern bool tsDeadLockKillQuery;
// query client
extern int32_t tsQueryPolicy;
......@@ -102,11 +98,6 @@ extern int32_t tsQuerySmaOptimize;
// client
extern int32_t tsMinSlidingTime;
extern int32_t tsMinIntervalTime;
extern int32_t tsMaxStreamComputDelay;
extern int32_t tsStreamCompStartDelay;
extern int32_t tsRetryStreamCompDelay;
extern float tsStreamComputDelayRatio; // the delayed computing ration of the whole time window
extern int64_t tsMaxRetentWindow;
// build info
extern char version[];
......
......@@ -3051,6 +3051,34 @@ typedef struct SDeleteRes {
int32_t tEncodeDeleteRes(SEncoder* pCoder, const SDeleteRes* pRes);
int32_t tDecodeDeleteRes(SDecoder* pCoder, SDeleteRes* pRes);
typedef struct {
int32_t msgType;
int32_t msgLen;
void* msg;
} SBatchMsg;
typedef struct {
SMsgHead header;
int32_t msgNum;
SBatchMsg msg[];
} SBatchReq;
typedef struct {
int32_t reqType;
int32_t msgLen;
int32_t rspCode;
void* msg;
} SBatchRsp;
static FORCE_INLINE void tFreeSBatchRsp(void *p) {
if (NULL == p) {
return;
}
SBatchRsp* pRsp = (SBatchRsp*)p;
taosMemoryFree(pRsp->msg);
}
#pragma pack(pop)
#ifdef __cplusplus
......
......@@ -136,6 +136,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_MND_DROP_INDEX, "drop-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_INDEX, "get-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_GET_TABLE_INDEX, "get-table-index", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_BATCH_META, "batch-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_TABLE_CFG, "table-cfg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_MND_CREATE_TOPIC, "create-topic", SMCreateTopicReq, SMCreateTopicRsp)
TD_DEF_MSG_TYPE(TDMT_MND_ALTER_TOPIC, "alter-topic", NULL, NULL)
......@@ -180,6 +181,7 @@ enum {
TD_DEF_MSG_TYPE(TDMT_VND_TABLE_META, "vnode-table-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TABLES_META, "vnode-tables-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_TABLE_CFG, "vnode-table-cfg", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_BATCH_META, "vnode-batch-meta", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_CREATE_STB, "vnode-create-stb", SVCreateStbReq, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_ALTER_STB, "vnode-alter-stb", NULL, NULL)
TD_DEF_MSG_TYPE(TDMT_VND_DROP_STB, "vnode-drop-stb", SVDropStbReq, NULL)
......
......@@ -55,7 +55,6 @@ enum {
TASK_INPUT_STATUS__NORMAL = 1,
TASK_INPUT_STATUS__BLOCKED,
TASK_INPUT_STATUS__RECOVER,
TASK_INPUT_STATUS__PROCESSING,
TASK_INPUT_STATUS__STOP,
TASK_INPUT_STATUS__FAILED,
};
......@@ -320,17 +319,6 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask);
void tFreeSStreamTask(SStreamTask* pTask);
static FORCE_INLINE int32_t streamTaskInput(SStreamTask* pTask, SStreamQueueItem* pItem) {
#if 0
while (1) {
int8_t inputStatus =
atomic_val_compare_exchange_8(&pTask->inputStatus, TASK_INPUT_STATUS__NORMAL, TASK_INPUT_STATUS__PROCESSING);
if (inputStatus == TASK_INPUT_STATUS__NORMAL) {
break;
}
ASSERT(0);
}
#endif
if (pItem->type == STREAM_INPUT__DATA_SUBMIT) {
SStreamDataSubmit* pSubmitClone = streamSubmitRefClone((SStreamDataSubmit*)pItem);
if (pSubmitClone == NULL) {
......@@ -443,13 +431,14 @@ typedef struct {
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t sourceTaskId;
int32_t sourceVg;
int32_t upstreamTaskId;
int32_t upstreamNodeId;
} SStreamTaskRecoverReq;
typedef struct {
int64_t streamId;
int32_t taskId;
int32_t rspTaskId;
int32_t reqTaskId;
int8_t inputStatus;
} SStreamTaskRecoverRsp;
......
......@@ -210,7 +210,7 @@ SyncGroupId syncGetVgId(int64_t rid);
void syncGetEpSet(int64_t rid, SEpSet* pEpSet);
void syncGetRetryEpSet(int64_t rid, SEpSet* pEpSet);
int32_t syncPropose(int64_t rid, SRpcMsg* pMsg, bool isWeak);
int32_t syncProposeBatch(int64_t rid, SRpcMsg* pMsgArr, bool* pIsWeakArr, int32_t arrSize);
int32_t syncProposeBatch(int64_t rid, SRpcMsg** pMsgPArr, bool* pIsWeakArr, int32_t arrSize);
bool syncEnvIsStart();
const char* syncStr(ESyncState state);
bool syncIsRestoreFinish(int64_t rid);
......
......@@ -238,7 +238,7 @@ typedef struct SyncClientRequestBatch {
char data[]; // block2, block3
} SyncClientRequestBatch;
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg* rpcMsgArr, SRaftMeta* raftArr, int32_t arrSize,
SyncClientRequestBatch* syncClientRequestBatchBuild(SRpcMsg** rpcMsgPArr, SRaftMeta* raftArr, int32_t arrSize,
int32_t vgId);
void syncClientRequestBatch2RpcMsg(const SyncClientRequestBatch* pSyncMsg, SRpcMsg* pRpcMsg);
void syncClientRequestBatchDestroy(SyncClientRequestBatch* pMsg);
......
......@@ -77,11 +77,11 @@ typedef struct {
} SWalSyncInfo;
typedef struct {
int8_t protoVer;
int64_t version;
int16_t msgType;
int64_t ingestTs;
int32_t bodyLen;
int64_t ingestTs; // not implemented
int16_t msgType;
int8_t protoVer;
// sync meta
SWalSyncInfo syncMeta;
......
......@@ -1713,7 +1713,7 @@ void blockDebugShowDataBlocks(const SArray* dataBlocks, const char* flag) {
size_t numOfCols = taosArrayGetSize(pDataBlock->pDataBlock);
int32_t rows = pDataBlock->info.rows;
printf("%s |block type %d |child id %d|group id %zX\n", flag, (int32_t)pDataBlock->info.type,
printf("%s |block type %d |child id %d|group id %" PRIu64 "\n", flag, (int32_t)pDataBlock->info.type,
pDataBlock->info.childId, pDataBlock->info.groupId);
for (int32_t j = 0; j < rows; j++) {
printf("%s |", flag);
......
......@@ -35,7 +35,6 @@ int32_t tsNumOfSupportVnodes = 256;
// common
int32_t tsMaxShellConns = 50000;
int32_t tsShellActivityTimer = 3; // second
bool tsEnableSlaveQuery = true;
bool tsPrintAuth = false;
// multi process
......@@ -118,20 +117,6 @@ int32_t tsMaxNumOfDistinctResults = 1000 * 10000;
// 1 database precision unit for interval time range, changed accordingly
int32_t tsMinIntervalTime = 1;
// 20sec, the maximum value of stream computing delay, changed accordingly
int32_t tsMaxStreamComputDelay = 20000;
// 10sec, the first stream computing delay time after system launched successfully, changed accordingly
int32_t tsStreamCompStartDelay = 10000;
// the stream computing delay time after executing failed, change accordingly
int32_t tsRetryStreamCompDelay = 10 * 1000;
// The delayed computing ration. 10% of the whole computing time window by default.
float tsStreamComputDelayRatio = 0.1f;
int64_t tsMaxRetentWindow = 24 * 3600L; // maximum time window tolerance
// the maximum allowed query buffer size during query processing for each data node.
// -1 no limit (default)
// 0 no query allowed, queries are disabled
......@@ -142,12 +127,6 @@ int64_t tsQueryBufferSizeBytes = -1;
// in retrieve blocking model, the retrieve threads will wait for the completion of the query processing.
bool tsRetrieveBlockingModel = false;
// last_row(*), first(*), last_row(ts, col1, col2) query, the result fields will be the original column name
bool tsKeepOriginalColumnName = false;
// kill long query
bool tsDeadLockKillQuery = false;
// tsdb config
// For backward compatibility
bool tsdbForceKeepFile = false;
......@@ -330,11 +309,10 @@ static int32_t taosAddClientCfg(SConfig *pCfg) {
if (cfgAddString(pCfg, "fqdn", defaultFqdn, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "serverPort", defaultServerPort, 1, 65056, 1) != 0) return -1;
if (cfgAddDir(pCfg, "tempDir", tsTempDir, 1) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalTempDirGB", 1.0f, 0.001f, 10000000, 1) != 0) return -1;
if (cfgAddFloat(pCfg, "minimalTmpDirGB", 1.0f, 0.001f, 10000000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "shellActivityTimer", tsShellActivityTimer, 1, 120, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "compressMsgSize", tsCompressMsgSize, -1, 100000000, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "compressColData", tsCompressColData, -1, 100000000, 1) != 0) return -1;
if (cfgAddBool(pCfg, "keepColumnName", tsKeepOriginalColumnName, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "queryPolicy", tsQueryPolicy, 1, 3, 1) != 0) return -1;
if (cfgAddInt32(pCfg, "querySmaOptimize", tsQuerySmaOptimize, 0, 1, 1) != 0) return -1;
if (cfgAddString(pCfg, "smlChildTableName", "", 1) != 0) return -1;
......@@ -383,15 +361,9 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "minIntervalTime", tsMinIntervalTime, 1, 1000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxNumOfDistinctRes", tsMaxNumOfDistinctResults, 10 * 10000, 10000 * 10000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "countAlwaysReturnValue", tsCountAlwaysReturnValue, 0, 1, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxStreamCompDelay", tsMaxStreamComputDelay, 10, 1000000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "maxFirstStreamCompDelay", tsStreamCompStartDelay, 1000, 1000000000, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "retryStreamCompDelay", tsRetryStreamCompDelay, 10, 1000000000, 0) != 0) return -1;
if (cfgAddFloat(pCfg, "streamCompDelayRatio", tsStreamComputDelayRatio, 0.1, 0.9, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "queryBufferSize", tsQueryBufferSize, -1, 500000000000, 0) != 0) return -1;
if (cfgAddBool(pCfg, "retrieveBlockingModel", tsRetrieveBlockingModel, 0) != 0) return -1;
if (cfgAddBool(pCfg, "printAuth", tsPrintAuth, 0) != 0) return -1;
if (cfgAddBool(pCfg, "slaveQuery", tsEnableSlaveQuery, 0) != 0) return -1;
if (cfgAddBool(pCfg, "deadLockKillQuery", tsDeadLockKillQuery, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "multiProcess", tsMultiProcess, 0, 2, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mnodeShmSize", tsMnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
......@@ -399,7 +371,7 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
if (cfgAddInt32(pCfg, "qnodeShmSize", tsQnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "snodeShmSize", tsSnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "bnodeShmSize", tsBnodeShmSize, TSDB_MAX_MSG_SIZE * 2 + 1024, INT32_MAX, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "mumOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfShmThreads", tsNumOfShmThreads, 1, 1024, 0) != 0) return -1;
tsNumOfRpcThreads = tsNumOfCores / 2;
tsNumOfRpcThreads = TRANGE(tsNumOfRpcThreads, 1, 4);
......@@ -409,25 +381,21 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfCommitThreads = TRANGE(tsNumOfCommitThreads, 2, 4);
if (cfgAddInt32(pCfg, "numOfCommitThreads", tsNumOfCommitThreads, 1, 1024, 0) != 0) return -1;
tsNumOfMnodeQueryThreads = tsNumOfCores * 2;
tsNumOfMnodeQueryThreads = TRANGE(tsNumOfMnodeQueryThreads, 4, 8);
if (cfgAddInt32(pCfg, "numOfMnodeQueryThreads", tsNumOfMnodeQueryThreads, 1, 1024, 0) != 0) return -1;
tsNumOfMnodeReadThreads = tsNumOfCores / 8;
tsNumOfMnodeReadThreads = TRANGE(tsNumOfMnodeReadThreads, 1, 4);
if (cfgAddInt32(pCfg, "numOfMnodeReadThreads", tsNumOfMnodeReadThreads, 1, 1024, 0) != 0) return -1;
tsNumOfVnodeQueryThreads = tsNumOfCores * 2;
tsNumOfVnodeQueryThreads = TMAX(tsNumOfVnodeQueryThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfVnodeQueryThreads", tsNumOfVnodeQueryThreads, 4, 1024, 0) != 0) return -1;
tsNumOfVnodeStreamThreads = tsNumOfCores / 4;
tsNumOfVnodeStreamThreads = TMAX(tsNumOfVnodeStreamThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfVnodeStreamThreads", tsNumOfVnodeStreamThreads, 4, 1024, 0) != 0) return -1;
tsNumOfVnodeFetchThreads = tsNumOfCores / 4;
tsNumOfVnodeFetchThreads = TMAX(tsNumOfVnodeFetchThreads, 4);
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfVnodeFetchThreads", tsNumOfVnodeFetchThreads, 4, 1024, 0) != 0) return -1;
tsNumOfVnodeWriteThreads = tsNumOfCores;
tsNumOfVnodeWriteThreads = TMAX(tsNumOfVnodeWriteThreads, 1);
......@@ -447,11 +415,11 @@ static int32_t taosAddServerCfg(SConfig *pCfg) {
tsNumOfSnodeSharedThreads = tsNumOfCores / 4;
tsNumOfSnodeSharedThreads = TRANGE(tsNumOfSnodeSharedThreads, 2, 4);
if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeSharedThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfSnodeSharedThreads", tsNumOfSnodeSharedThreads, 2, 1024, 0) != 0) return -1;
tsNumOfSnodeUniqueThreads = tsNumOfCores / 4;
tsNumOfSnodeUniqueThreads = TRANGE(tsNumOfSnodeUniqueThreads, 2, 4);
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 1, 1024, 0) != 0) return -1;
if (cfgAddInt32(pCfg, "numOfSnodeUniqueThreads", tsNumOfSnodeUniqueThreads, 2, 1024, 0) != 0) return -1;
tsRpcQueueMemoryAllowed = tsTotalMemoryKB * 1024 * 0.1;
tsRpcQueueMemoryAllowed = TRANGE(tsRpcQueueMemoryAllowed, TSDB_MAX_MSG_SIZE * 10L, TSDB_MAX_MSG_SIZE * 10000L);
......@@ -532,7 +500,7 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tstrncpy(tsTempDir, cfgGetItem(pCfg, "tempDir")->str, PATH_MAX);
taosExpandDir(tsTempDir, tsTempDir, PATH_MAX);
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTempDirGB")->fval;
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTmpDirGB")->fval;
if (taosMulMkDir(tsTempDir) != 0) {
uError("failed to create tempDir:%s since %s", tsTempDir, terrstr());
return -1;
......@@ -545,7 +513,6 @@ static int32_t taosSetClientCfg(SConfig *pCfg) {
tsShellActivityTimer = cfgGetItem(pCfg, "shellActivityTimer")->i32;
tsCompressMsgSize = cfgGetItem(pCfg, "compressMsgSize")->i32;
tsCompressColData = cfgGetItem(pCfg, "compressColData")->i32;
tsKeepOriginalColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
tsNumOfTaskQueueThreads = cfgGetItem(pCfg, "numOfTaskQueueThreads")->i32;
tsQueryPolicy = cfgGetItem(pCfg, "queryPolicy")->i32;
tsQuerySmaOptimize = cfgGetItem(pCfg, "querySmaOptimize")->i32;
......@@ -579,15 +546,9 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsMinIntervalTime = cfgGetItem(pCfg, "minIntervalTime")->i32;
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
tsCountAlwaysReturnValue = cfgGetItem(pCfg, "countAlwaysReturnValue")->i32;
tsMaxStreamComputDelay = cfgGetItem(pCfg, "maxStreamCompDelay")->i32;
tsStreamCompStartDelay = cfgGetItem(pCfg, "maxFirstStreamCompDelay")->i32;
tsRetryStreamCompDelay = cfgGetItem(pCfg, "retryStreamCompDelay")->i32;
tsStreamComputDelayRatio = cfgGetItem(pCfg, "streamCompDelayRatio")->fval;
tsQueryBufferSize = cfgGetItem(pCfg, "queryBufferSize")->i32;
tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval;
tsPrintAuth = cfgGetItem(pCfg, "printAuth")->bval;
tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval;
tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->i32;
tsMultiProcess = cfgGetItem(pCfg, "multiProcess")->bval;
tsMnodeShmSize = cfgGetItem(pCfg, "mnodeShmSize")->i32;
......@@ -598,7 +559,6 @@ static int32_t taosSetServerCfg(SConfig *pCfg) {
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
tsNumOfMnodeQueryThreads = cfgGetItem(pCfg, "numOfMnodeQueryThreads")->i32;
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
tsNumOfVnodeQueryThreads = cfgGetItem(pCfg, "numOfVnodeQueryThreads")->i32;
tsNumOfVnodeStreamThreads = cfgGetItem(pCfg, "numOfVnodeStreamThreads")->i32;
......@@ -673,9 +633,7 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
break;
}
case 'd': {
if (strcasecmp("deadLockKillQuery", name) == 0) {
tsDeadLockKillQuery = cfgGetItem(pCfg, "deadLockKillQuery")->i32;
} else if (strcasecmp("dDebugFlag", name) == 0) {
if (strcasecmp("dDebugFlag", name) == 0) {
dDebugFlag = cfgGetItem(pCfg, "dDebugFlag")->i32;
}
break;
......@@ -732,9 +690,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
break;
}
case 'k': {
if (strcasecmp("keepColumnName", name) == 0) {
tsKeepOriginalColumnName = cfgGetItem(pCfg, "keepColumnName")->bval;
}
break;
}
case 'l': {
......@@ -758,10 +713,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsMaxShellConns = cfgGetItem(pCfg, "maxShellConns")->i32;
} else if (strcasecmp("maxNumOfDistinctRes", name) == 0) {
tsMaxNumOfDistinctResults = cfgGetItem(pCfg, "maxNumOfDistinctRes")->i32;
} else if (strcasecmp("maxStreamCompDelay", name) == 0) {
tsMaxStreamComputDelay = cfgGetItem(pCfg, "maxStreamCompDelay")->i32;
} else if (strcasecmp("maxFirstStreamCompDelay", name) == 0) {
tsStreamCompStartDelay = cfgGetItem(pCfg, "maxFirstStreamCompDelay")->i32;
}
break;
}
......@@ -772,8 +723,8 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
break;
}
case 'i': {
if (strcasecmp("minimalTempDirGB", name) == 0) {
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTempDirGB")->fval;
if (strcasecmp("minimalTmpDirGB", name) == 0) {
tsTempSpace.reserved = cfgGetItem(pCfg, "minimalTmpDirGB")->fval;
} else if (strcasecmp("minimalDataDirGB", name) == 0) {
tsDataSpace.reserved = cfgGetItem(pCfg, "minimalDataDirGB")->fval;
} else if (strcasecmp("minSlidingTime", name) == 0) {
......@@ -834,8 +785,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsNumOfRpcThreads = cfgGetItem(pCfg, "numOfRpcThreads")->i32;
} else if (strcasecmp("numOfCommitThreads", name) == 0) {
tsNumOfCommitThreads = cfgGetItem(pCfg, "numOfCommitThreads")->i32;
} else if (strcasecmp("numOfMnodeQueryThreads", name) == 0) {
tsNumOfMnodeQueryThreads = cfgGetItem(pCfg, "numOfMnodeQueryThreads")->i32;
} else if (strcasecmp("numOfMnodeReadThreads", name) == 0) {
tsNumOfMnodeReadThreads = cfgGetItem(pCfg, "numOfMnodeReadThreads")->i32;
} else if (strcasecmp("numOfVnodeQueryThreads", name) == 0) {
......@@ -883,9 +832,7 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
break;
}
case 'r': {
if (strcasecmp("retryStreamCompDelay", name) == 0) {
tsRetryStreamCompDelay = cfgGetItem(pCfg, "retryStreamCompDelay")->i32;
} else if (strcasecmp("retrieveBlockingModel", name) == 0) {
if (strcasecmp("retrieveBlockingModel", name) == 0) {
tsRetrieveBlockingModel = cfgGetItem(pCfg, "retrieveBlockingModel")->bval;
} else if (strcasecmp("rpcQueueMemoryAllowed", name) == 0) {
tsRpcQueueMemoryAllowed = cfgGetItem(pCfg, "rpcQueueMemoryAllowed")->i64;
......@@ -913,10 +860,6 @@ int32_t taosSetCfg(SConfig *pCfg, char *name) {
tsNumOfSupportVnodes = cfgGetItem(pCfg, "supportVnodes")->i32;
} else if (strcasecmp("statusInterval", name) == 0) {
tsStatusInterval = cfgGetItem(pCfg, "statusInterval")->i32;
} else if (strcasecmp("streamCompDelayRatio", name) == 0) {
tsStreamComputDelayRatio = cfgGetItem(pCfg, "streamCompDelayRatio")->fval;
} else if (strcasecmp("slaveQuery", name) == 0) {
tsEnableSlaveQuery = cfgGetItem(pCfg, "slaveQuery")->bval;
} else if (strcasecmp("snodeShmSize", name) == 0) {
tsSnodeShmSize = cfgGetItem(pCfg, "snodeShmSize")->i32;
} else if (strcasecmp("serverPort", name) == 0) {
......
......@@ -14,4 +14,7 @@ target_include_directories(
taosd
PRIVATE "${CMAKE_CURRENT_SOURCE_DIR}/node_mgmt/inc"
)
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
add_dependencies(taosd jemalloc)
ENDIF ()
target_link_libraries(taosd dnode)
......@@ -184,6 +184,7 @@ SArray *mmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_MND_ALTER_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_STB, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_BATCH_META, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_TABLE_CFG, mmPutMsgToReadQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_CREATE_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_MND_DROP_SMA, mmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
......
......@@ -343,6 +343,7 @@ SArray *vmGetMsgHandles() {
if (dmSetMgmtHandle(pArray, TDMT_VND_UPDATE_TAG_VAL, vmPutMsgToWriteQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLE_CFG, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_BATCH_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_VND_TABLES_META, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_CANCEL_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
if (dmSetMgmtHandle(pArray, TDMT_SCH_DROP_TASK, vmPutMsgToFetchQueue, 0) == NULL) goto _OVER;
......
......@@ -805,14 +805,6 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
return -1;
}
SDnodeObj *pDnode = mndAcquireDnode(pMnode, cfgReq.dnodeId);
if (pDnode == NULL) {
mError("dnode:%d, failed to config since %s ", cfgReq.dnodeId, terrstr());
return -1;
}
SEpSet epSet = mndGetDnodeEpset(pDnode);
mndReleaseDnode(pMnode, pDnode);
SDCfgDnodeReq dcfgReq = {0};
if (strcasecmp(cfgReq.config, "resetlog") == 0) {
strcpy(dcfgReq.config, "resetlog");
......@@ -860,16 +852,36 @@ static int32_t mndProcessConfigDnodeReq(SRpcMsg *pReq) {
}
}
int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, &dcfgReq);
void *pBuf = rpcMallocCont(bufLen);
int32_t code = -1;
SSdb *pSdb = pMnode->pSdb;
void *pIter = NULL;
while (1) {
SDnodeObj *pDnode = NULL;
pIter = sdbFetch(pSdb, SDB_DNODE, pIter, (void **)&pDnode);
if (pIter == NULL) break;
if (pBuf == NULL) return -1;
tSerializeSDCfgDnodeReq(pBuf, bufLen, &dcfgReq);
if (pDnode->id == cfgReq.dnodeId || cfgReq.dnodeId == -1 || cfgReq.dnodeId == 0) {
SEpSet epSet = mndGetDnodeEpset(pDnode);
int32_t bufLen = tSerializeSDCfgDnodeReq(NULL, 0, &dcfgReq);
void *pBuf = rpcMallocCont(bufLen);
if (pBuf != NULL) {
tSerializeSDCfgDnodeReq(pBuf, bufLen, &dcfgReq);
mInfo("dnode:%d, send config req to dnode, app:%p config:%s value:%s", cfgReq.dnodeId, pReq->info.ahandle,
dcfgReq.config, dcfgReq.value);
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen};
tmsgSendReq(&epSet, &rpcMsg);
code = 0;
}
}
mInfo("dnode:%d, send config req to dnode, app:%p config:%s value:%s", cfgReq.dnodeId, pReq->info.ahandle,
dcfgReq.config, dcfgReq.value);
SRpcMsg rpcMsg = {.msgType = TDMT_DND_CONFIG_DNODE, .pCont = pBuf, .contLen = bufLen};
return tmsgSendReq(&epSet, &rpcMsg);
sdbRelease(pSdb, pDnode);
}
if (code == -1) {
terrno = TSDB_CODE_MND_DNODE_NOT_EXIST;
}
return code;
}
static int32_t mndProcessConfigDnodeRsp(SRpcMsg *pRsp) {
......
......@@ -63,6 +63,106 @@ int32_t mndProcessQueryMsg(SRpcMsg *pMsg) {
return code;
}
int32_t mndProcessBatchMetaMsg(SRpcMsg *pMsg) {
int32_t code = 0;
int32_t offset = 0;
int32_t rspSize = 0;
SBatchReq *batchReq = (SBatchReq*)pMsg->pCont;
int32_t msgNum = ntohl(batchReq->msgNum);
offset += sizeof(SBatchReq);
SBatchMsg req = {0};
SBatchRsp rsp = {0};
SRpcMsg reqMsg = *pMsg;
SRpcMsg rspMsg = {0};
void* pRsp = NULL;
SMnode *pMnode = pMsg->info.node;
SArray* batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
if (NULL == batchRsp) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
for (int32_t i = 0; i < msgNum; ++i) {
req.msgType = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
offset += sizeof(req.msgType);
req.msgLen = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
offset += sizeof(req.msgLen);
req.msg = (char*)pMsg->pCont + offset;
offset += req.msgLen;
reqMsg.msgType = req.msgType;
reqMsg.pCont = req.msg;
reqMsg.contLen = req.msgLen;
reqMsg.info.rsp = NULL;
reqMsg.info.rspLen = 0;
MndMsgFp fp = pMnode->msgFp[TMSG_INDEX(req.msgType)];
if (fp == NULL) {
mError("msg:%p, failed to get msg handle, app:%p type:%s", pMsg, pMsg->info.ahandle, TMSG_INFO(pMsg->msgType));
terrno = TSDB_CODE_MSG_NOT_PROCESSED;
return -1;
}
if ((*fp)(&reqMsg)) {
rsp.rspCode = terrno;
} else {
rsp.rspCode = 0;
}
rsp.reqType = reqMsg.msgType;
rsp.msgLen = reqMsg.info.rspLen;
rsp.msg = reqMsg.info.rsp;
taosArrayPush(batchRsp, &rsp);
rspSize += sizeof(rsp) + rsp.msgLen - POINTER_BYTES;
}
rspSize += sizeof(int32_t);
offset = 0;
pRsp = rpcMallocCont(rspSize);
if (pRsp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
*(int32_t*)((char*)pRsp + offset) = htonl(msgNum);
offset += sizeof(msgNum);
for (int32_t i = 0; i < msgNum; ++i) {
SBatchRsp *p = taosArrayGet(batchRsp, i);
*(int32_t*)((char*)pRsp + offset) = htonl(p->reqType);
offset += sizeof(p->reqType);
*(int32_t*)((char*)pRsp + offset) = htonl(p->msgLen);
offset += sizeof(p->msgLen);
*(int32_t*)((char*)pRsp + offset) = htonl(p->rspCode);
offset += sizeof(p->rspCode);
memcpy((char*)pRsp + offset, p->msg, p->msgLen);
offset += p->msgLen;
rpcFreeCont(p->msg);
}
taosArrayDestroy(batchRsp);
batchRsp = NULL;
_exit:
pMsg->info.rsp = pRsp;
pMsg->info.rspLen = rspSize;
if (code) {
mError("mnd get batch meta failed cause of %s", tstrerror(code));
}
taosArrayDestroyEx(batchRsp, tFreeSBatchRsp);
return code;
}
int32_t mndInitQuery(SMnode *pMnode) {
if (qWorkerInit(NODE_TYPE_MNODE, MNODE_HANDLE, NULL, (void **)&pMnode->pQuery, &pMnode->msgCb) != 0) {
mError("failed to init qworker in mnode since %s", terrstr());
......@@ -76,6 +176,7 @@ int32_t mndInitQuery(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_SCH_MERGE_FETCH, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_DROP_TASK, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_SCH_QUERY_HEARTBEAT, mndProcessQueryMsg);
mndSetMsgHandle(pMnode, TDMT_MND_BATCH_META, mndProcessBatchMetaMsg);
return 0;
}
......
......@@ -1287,6 +1287,7 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
mDebug("trans:%d, stage keep on redoAction since %s", pTrans->id, tstrerror(code));
continueExec = false;
} else {
pTrans->failedTimes++;
pTrans->code = terrno;
if (pTrans->policy == TRN_POLICY_ROLLBACK) {
if (pTrans->lastAction != 0) {
......@@ -1306,7 +1307,6 @@ static bool mndTransPerformRedoActionStage(SMnode *pMnode, STrans *pTrans) {
mError("trans:%d, stage from redoAction to rollback since %s", pTrans->id, terrstr());
continueExec = true;
} else {
pTrans->failedTimes++;
mError("trans:%d, stage keep on redoAction since %s, failedTimes:%d", pTrans->id, terrstr(), pTrans->failedTimes);
continueExec = false;
}
......
......@@ -179,7 +179,7 @@ static int32_t sndProcessTaskRecoverRsp(SSnode *pNode, SRpcMsg *pMsg) {
SStreamMeta *pMeta = pNode->pMeta;
SStreamTaskRecoverRsp *pRsp = pMsg->pCont;
int32_t taskId = pRsp->taskId;
int32_t taskId = pRsp->rspTaskId;
SStreamTask *pTask = *(SStreamTask **)taosHashGet(pMeta->pHash, &taskId, sizeof(int32_t));
streamProcessRecoverRsp(pTask, pRsp);
return 0;
......
......@@ -78,8 +78,9 @@ void vnodeBufPoolReset(SVBufPool* pPool);
// vnodeQuery.c
int32_t vnodeQueryOpen(SVnode* pVnode);
void vnodeQueryClose(SVnode* pVnode);
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg);
int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg);
int32_t vnodeGetTableMeta(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int vnodeGetTableCfg(SVnode* pVnode, SRpcMsg* pMsg, bool direct);
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg);
// vnodeCommit.c
int32_t vnodeBegin(SVnode* pVnode);
......
......@@ -293,7 +293,10 @@ int metaAlterSTable(SMeta *pMeta, int64_t version, SVCreateStbReq *pReq) {
tdbTbcOpen(pMeta->pUidIdx, &pUidIdxc, &pMeta->txn);
ret = tdbTbcMoveTo(pUidIdxc, &pReq->suid, sizeof(tb_uid_t), &c);
if (ret < 0 || c) {
ASSERT(0);
tdbTbcClose(pUidIdxc);
terrno = TSDB_CODE_TDB_STB_NOT_EXIST;
// ASSERT(0);
return -1;
}
......@@ -980,6 +983,9 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
tbDbKey.version = pME->version;
tbDbKey.uid = pME->uid;
metaDebug("vgId:%d, start to save table version:%" PRId64 "uid: %" PRId64, TD_VID(pMeta->pVnode), pME->version,
pME->uid);
pKey = &tbDbKey;
kLen = sizeof(tbDbKey);
......@@ -1012,6 +1018,9 @@ static int metaSaveToTbDb(SMeta *pMeta, const SMetaEntry *pME) {
return 0;
_err:
metaError("vgId:%d, failed to save table version:%" PRId64 "uid: %" PRId64 " %s", TD_VID(pMeta->pVnode), pME->version,
pME->uid, tstrerror(terrno));
taosMemoryFree(pVal);
return -1;
}
......
......@@ -599,14 +599,14 @@ static int32_t tdRSmaFetchAndSubmitResult(SRSmaInfoItem *pItem, STSchema *pTSche
SSubmitReq *pReq = NULL;
// TODO: the schema update should be handled
if (buildSubmitReqFromDataBlock(&pReq, pResult, pTSchema, SMA_VID(pSma), suid) < 0) {
smaError("vgId:%d, build submit req for rsma table %" PRIi64 "l evel %" PRIi8 " failed since %s", SMA_VID(pSma),
smaError("vgId:%d, build submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s", SMA_VID(pSma),
suid, pItem->level, terrstr());
goto _err;
}
if (pReq && tdProcessSubmitReq(sinkTsdb, output->info.version, pReq) < 0) {
taosMemoryFreeClear(pReq);
smaError("vgId:%d, process submit req for rsma table %" PRIi64 " level %" PRIi8 " failed since %s",
smaError("vgId:%d, process submit req for rsma stable %" PRIi64 " level %" PRIi8 " failed since %s",
SMA_VID(pSma), suid, pItem->level, terrstr());
goto _err;
}
......
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "sma.h"
static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppData);
static int32_t rsmaSnapWriteQTaskInfo(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData);
// SRsmaSnapReader ========================================
struct SRsmaSnapReader {
SSma* pSma;
int64_t sver;
int64_t ever;
// for data file
int8_t rsmaDataDone[TSDB_RETENTION_L2];
STsdbSnapReader* pDataReader[TSDB_RETENTION_L2];
// for qtaskinfo file
int8_t qTaskDone;
SQTaskFReader* pQTaskFReader;
};
int32_t rsmaSnapReaderOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapReader** ppReader) {
int32_t code = 0;
SRsmaSnapReader* pReader = NULL;
// alloc
pReader = (SRsmaSnapReader*)taosMemoryCalloc(1, sizeof(*pReader));
if (pReader == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pReader->pSma = pSma;
pReader->sver = sver;
pReader->ever = ever;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pSma->pRSmaTsdb[i]) {
code = tsdbSnapReaderOpen(pSma->pRSmaTsdb[i], sver, ever, &pReader->pDataReader[i]);
if (code < 0) {
goto _err;
}
}
}
*ppReader = pReader;
smaInfo("vgId:%d vnode snapshot rsma reader opened succeed", SMA_VID(pSma));
return TSDB_CODE_SUCCESS;
_err:
smaError("vgId:%d vnode snapshot rsma reader opened failed since %s", SMA_VID(pSma), tstrerror(code));
return TSDB_CODE_FAILED;
}
static int32_t rsmaSnapReadQTaskInfo(SRsmaSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
SSma* pSma = pReader->pSma;
_exit:
smaInfo("vgId:%d vnode snapshot rsma read qtaskinfo succeed", SMA_VID(pSma));
return code;
_err:
smaError("vgId:%d vnode snapshot rsma read qtaskinfo failed since %s", SMA_VID(pSma), tstrerror(code));
return code;
}
int32_t rsmaSnapRead(SRsmaSnapReader* pReader, uint8_t** ppData) {
int32_t code = 0;
*ppData = NULL;
smaInfo("vgId:%d vnode snapshot rsma read entry", SMA_VID(pReader->pSma));
// read rsma1/rsma2 file
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
STsdbSnapReader* pTsdbSnapReader = pReader->pDataReader[i];
if (!pTsdbSnapReader) {
continue;
}
if (!pReader->rsmaDataDone[i]) {
smaInfo("vgId:%d vnode snapshot rsma read level %d not done", SMA_VID(pReader->pSma), i);
code = tsdbSnapRead(pTsdbSnapReader, ppData);
if (code) {
goto _err;
} else {
if (*ppData) {
goto _exit;
} else {
pReader->rsmaDataDone[i] = 1;
}
}
} else {
smaInfo("vgId:%d vnode snapshot rsma read level %d is done", SMA_VID(pReader->pSma), i);
}
}
// read qtaskinfo file
if (!pReader->qTaskDone) {
code = rsmaSnapReadQTaskInfo(pReader, ppData);
if (code) {
goto _err;
} else {
if (*ppData) {
goto _exit;
} else {
pReader->qTaskDone = 1;
}
}
}
_exit:
smaInfo("vgId:%d vnode snapshot rsma read succeed", SMA_VID(pReader->pSma));
return code;
_err:
smaError("vgId:%d vnode snapshot rsma read failed since %s", SMA_VID(pReader->pSma), tstrerror(code));
return code;
}
int32_t rsmaSnapReaderClose(SRsmaSnapReader** ppReader) {
int32_t code = 0;
SRsmaSnapReader* pReader = *ppReader;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pReader->pDataReader[i]) {
tsdbSnapReaderClose(&pReader->pDataReader[i]);
}
}
if (pReader->pQTaskFReader) {
// TODO: close for qtaskinfo
smaInfo("vgId:%d vnode snapshot rsma reader closed for qTaskInfo", SMA_VID(pReader->pSma));
}
smaInfo("vgId:%d vnode snapshot rsma reader closed", SMA_VID(pReader->pSma));
taosMemoryFreeClear(*ppReader);
return code;
}
// SRsmaSnapWriter ========================================
struct SRsmaSnapWriter {
SSma* pSma;
int64_t sver;
int64_t ever;
// config
int64_t commitID;
// for data file
STsdbSnapWriter* pDataWriter[TSDB_RETENTION_L2];
// for qtaskinfo file
SQTaskFReader* pQTaskFReader;
SQTaskFWriter* pQTaskFWriter;
};
int32_t rsmaSnapWriterOpen(SSma* pSma, int64_t sver, int64_t ever, SRsmaSnapWriter** ppWriter) {
int32_t code = 0;
SRsmaSnapWriter* pWriter = NULL;
// alloc
pWriter = (SRsmaSnapWriter*)taosMemoryCalloc(1, sizeof(*pWriter));
if (pWriter == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _err;
}
pWriter->pSma = pSma;
pWriter->sver = sver;
pWriter->ever = ever;
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pSma->pRSmaTsdb[i]) {
code = tsdbSnapWriterOpen(pSma->pRSmaTsdb[i], sver, ever, &pWriter->pDataWriter[i]);
if (code < 0) {
goto _err;
}
}
}
// qtaskinfo
// TODO
*ppWriter = pWriter;
smaInfo("vgId:%d rsma snapshot writer open succeed", TD_VID(pSma->pVnode));
return code;
_err:
smaError("vgId:%d rsma snapshot writer open failed since %s", TD_VID(pSma->pVnode), tstrerror(code));
*ppWriter = NULL;
return code;
}
int32_t rsmaSnapWriterClose(SRsmaSnapWriter** ppWriter, int8_t rollback) {
int32_t code = 0;
SRsmaSnapWriter* pWriter = *ppWriter;
if (rollback) {
ASSERT(0);
// code = tsdbFSRollback(pWriter->pTsdb->pFS);
// if (code) goto _err;
} else {
for (int32_t i = 0; i < TSDB_RETENTION_L2; ++i) {
if (pWriter->pDataWriter[i]) {
code = tsdbSnapWriterClose(&pWriter->pDataWriter[i], rollback);
if (code) goto _err;
}
}
}
taosMemoryFree(pWriter);
*ppWriter = NULL;
smaInfo("vgId:%d vnode snapshot rsma writer close succeed", SMA_VID(pWriter->pSma));
return code;
_err:
smaError("vgId:%d vnode snapshot rsma writer close failed since %s", SMA_VID(pWriter->pSma), tstrerror(code));
return code;
}
int32_t rsmaSnapWrite(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
SSnapDataHdr* pHdr = (SSnapDataHdr*)pData;
// rsma1/rsma2
if (pHdr->type == SNAP_DATA_RSMA1) {
pHdr->type = SNAP_DATA_TSDB;
code = tsdbSnapWrite(pWriter->pDataWriter[0], pData, nData);
} else if (pHdr->type == SNAP_DATA_RSMA2) {
pHdr->type = SNAP_DATA_TSDB;
code = tsdbSnapWrite(pWriter->pDataWriter[1], pData, nData);
} else if (pHdr->type == SNAP_DATA_QTASK) {
code = rsmaSnapWriteQTaskInfo(pWriter, pData, nData);
}
if (code < 0) goto _err;
_exit:
smaInfo("vgId:%d rsma snapshot write for data %" PRIi8 " succeed", SMA_VID(pWriter->pSma), pHdr->type);
return code;
_err:
smaError("vgId:%d rsma snapshot write for data %" PRIi8 " failed since %s", SMA_VID(pWriter->pSma), pHdr->type,
tstrerror(code));
return code;
}
static int32_t rsmaSnapWriteQTaskInfo(SRsmaSnapWriter* pWriter, uint8_t* pData, uint32_t nData) {
int32_t code = 0;
if (pWriter->pQTaskFWriter == NULL) {
// SDelFile* pDelFile = pWriter->fs.pDelFile;
// // reader
// if (pDelFile) {
// code = tsdbDelFReaderOpen(&pWriter->pDelFReader, pDelFile, pTsdb, NULL);
// if (code) goto _err;
// code = tsdbReadDelIdx(pWriter->pDelFReader, pWriter->aDelIdxR, NULL);
// if (code) goto _err;
// }
// // writer
// SDelFile delFile = {.commitID = pWriter->commitID, .offset = 0, .size = 0};
// code = tsdbDelFWriterOpen(&pWriter->pDelFWriter, &delFile, pTsdb);
// if (code) goto _err;
}
smaInfo("vgId:%d vnode snapshot rsma write qtaskinfo succeed", SMA_VID(pWriter->pSma));
_exit:
return code;
_err:
smaError("vgId:%d vnode snapshot rsma write qtaskinfo failed since %s", SMA_VID(pWriter->pSma), tstrerror(code));
return code;
}
......@@ -796,7 +796,7 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->taskId;
int32_t taskId = pRsp->rspTaskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) {
streamProcessRecoverRsp(*ppTask, pRsp);
......
......@@ -21,7 +21,7 @@ int vnodeQueryOpen(SVnode *pVnode) {
void vnodeQueryClose(SVnode *pVnode) { qWorkerDestroy((void **)&pVnode->pQuery); }
int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
STableInfoReq infoReq = {0};
STableMetaRsp metaRsp = {0};
SMetaReader mer1 = {0};
......@@ -99,7 +99,12 @@ int vnodeGetTableMeta(SVnode *pVnode, SRpcMsg *pMsg) {
goto _exit;
}
pRsp = rpcMallocCont(rspLen);
if (direct) {
pRsp = rpcMallocCont(rspLen);
} else {
pRsp = taosMemoryCalloc(1, rspLen);
}
if (pRsp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
......@@ -117,15 +122,19 @@ _exit:
qError("get table %s meta failed cause of %s", infoReq.tbName, tstrerror(code));
}
tmsgSendRsp(&rpcMsg);
if (direct) {
tmsgSendRsp(&rpcMsg);
} else {
*pMsg = rpcMsg;
}
taosMemoryFree(metaRsp.pSchemas);
metaReaderClear(&mer2);
metaReaderClear(&mer1);
return TSDB_CODE_SUCCESS;
}
int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg) {
int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg, bool direct) {
STableCfgReq cfgReq = {0};
STableCfgRsp cfgRsp = {0};
SMetaReader mer1 = {0};
......@@ -209,7 +218,12 @@ int vnodeGetTableCfg(SVnode *pVnode, SRpcMsg *pMsg) {
goto _exit;
}
pRsp = rpcMallocCont(rspLen);
if (direct) {
pRsp = rpcMallocCont(rspLen);
} else {
pRsp = taosMemoryCalloc(1, rspLen);
}
if (pRsp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
......@@ -227,14 +241,124 @@ _exit:
qError("get table %s cfg failed cause of %s", cfgReq.tbName, tstrerror(code));
}
tmsgSendRsp(&rpcMsg);
if (direct) {
tmsgSendRsp(&rpcMsg);
} else {
*pMsg = rpcMsg;
}
tFreeSTableCfgRsp(&cfgRsp);
metaReaderClear(&mer2);
metaReaderClear(&mer1);
return TSDB_CODE_SUCCESS;
}
int32_t vnodeGetBatchMeta(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t code = 0;
int32_t offset = 0;
int32_t rspSize = 0;
SBatchReq *batchReq = (SBatchReq*)pMsg->pCont;
int32_t msgNum = ntohl(batchReq->msgNum);
offset += sizeof(SBatchReq);
SBatchMsg req = {0};
SBatchRsp rsp = {0};
SRpcMsg reqMsg = *pMsg;
SRpcMsg rspMsg = {0};
void* pRsp = NULL;
SArray* batchRsp = taosArrayInit(msgNum, sizeof(SBatchRsp));
if (NULL == batchRsp) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
for (int32_t i = 0; i < msgNum; ++i) {
req.msgType = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
offset += sizeof(req.msgType);
req.msgLen = ntohl(*(int32_t*)((char*)pMsg->pCont + offset));
offset += sizeof(req.msgLen);
req.msg = (char*)pMsg->pCont + offset;
offset += req.msgLen;
reqMsg.msgType = req.msgType;
reqMsg.pCont = req.msg;
reqMsg.contLen = req.msgLen;
switch (req.msgType) {
case TDMT_VND_TABLE_META:
vnodeGetTableMeta(pVnode, &reqMsg, false);
break;
case TDMT_VND_TABLE_CFG:
vnodeGetTableCfg(pVnode, &reqMsg, false);
break;
default:
qError("invalid req msgType %d", req.msgType);
reqMsg.code = TSDB_CODE_INVALID_MSG;
reqMsg.pCont = NULL;
reqMsg.contLen = 0;
break;
}
rsp.reqType = reqMsg.msgType;
rsp.msgLen = reqMsg.contLen;
rsp.rspCode = reqMsg.code;
rsp.msg = reqMsg.pCont;
taosArrayPush(batchRsp, &rsp);
rspSize += sizeof(rsp) + rsp.msgLen - POINTER_BYTES;
}
rspSize += sizeof(int32_t);
offset = 0;
pRsp = rpcMallocCont(rspSize);
if (pRsp == NULL) {
code = TSDB_CODE_OUT_OF_MEMORY;
goto _exit;
}
*(int32_t*)((char*)pRsp + offset) = htonl(msgNum);
offset += sizeof(msgNum);
for (int32_t i = 0; i < msgNum; ++i) {
SBatchRsp *p = taosArrayGet(batchRsp, i);
*(int32_t*)((char*)pRsp + offset) = htonl(p->reqType);
offset += sizeof(p->reqType);
*(int32_t*)((char*)pRsp + offset) = htonl(p->msgLen);
offset += sizeof(p->msgLen);
*(int32_t*)((char*)pRsp + offset) = htonl(p->rspCode);
offset += sizeof(p->rspCode);
memcpy((char*)pRsp + offset, p->msg, p->msgLen);
offset += p->msgLen;
taosMemoryFreeClear(p->msg);
}
taosArrayDestroy(batchRsp);
batchRsp = NULL;
_exit:
rspMsg.info = pMsg->info;
rspMsg.pCont = pRsp;
rspMsg.contLen = rspSize;
rspMsg.code = code;
rspMsg.msgType = pMsg->msgType;
if (code) {
qError("vnd get batch meta failed cause of %s", tstrerror(code));
}
taosArrayDestroyEx(batchRsp, tFreeSBatchRsp);
tmsgSendRsp(&rspMsg);
return code;
}
int32_t vnodeGetLoad(SVnode *pVnode, SVnodeLoad *pLoad) {
pLoad->vgId = TD_VID(pVnode);
pLoad->syncState = syncGetMyRole(pVnode->sync);
......
......@@ -298,7 +298,7 @@ int32_t vnodeProcessQueryMsg(SVnode *pVnode, SRpcMsg *pMsg) {
int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
vTrace("message in fetch queue is processing");
if ((pMsg->msgType == TDMT_SCH_FETCH || pMsg->msgType == TDMT_VND_TABLE_META ||
pMsg->msgType == TDMT_VND_TABLE_CFG) &&
pMsg->msgType == TDMT_VND_TABLE_CFG || pMsg->msgType == TDMT_VND_BATCH_META) &&
!vnodeIsLeader(pVnode)) {
vnodeRedirectRpcMsg(pVnode, pMsg);
return 0;
......@@ -320,9 +320,11 @@ int32_t vnodeProcessFetchMsg(SVnode *pVnode, SRpcMsg *pMsg, SQueueInfo *pInfo) {
case TDMT_SCH_QUERY_HEARTBEAT:
return qWorkerProcessHbMsg(pVnode, pVnode->pQuery, pMsg, 0);
case TDMT_VND_TABLE_META:
return vnodeGetTableMeta(pVnode, pMsg);
return vnodeGetTableMeta(pVnode, pMsg, true);
case TDMT_VND_TABLE_CFG:
return vnodeGetTableCfg(pVnode, pMsg);
return vnodeGetTableCfg(pVnode, pMsg, true);
case TDMT_VND_BATCH_META:
return vnodeGetBatchMeta(pVnode, pMsg);
case TDMT_VND_CONSUME:
return tqProcessPollReq(pVnode->pTq, pMsg);
case TDMT_STREAM_TASK_RUN:
......
......@@ -281,8 +281,8 @@ void vnodeApplyWriteMsg(SQueueInfo *pInfo, STaosQall *qall, int32_t numOfMsgs) {
for (int32_t i = 0; i < numOfMsgs; ++i) {
if (taosGetQitem(qall, (void **)&pMsg) == 0) continue;
const STraceId *trace = &pMsg->info.traceId;
vGTrace("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p", vgId, pMsg, TMSG_INFO(pMsg->msgType),
pMsg->info.handle);
vGInfo("vgId:%d, msg:%p get from vnode-apply queue, type:%s handle:%p index:%ld", vgId, pMsg,
TMSG_INFO(pMsg->msgType), pMsg->info.handle, pMsg->info.conn.applyIndex);
SRpcMsg rsp = {.code = pMsg->code, .info = pMsg->info};
if (rsp.code == 0) {
......@@ -503,9 +503,6 @@ static void vnodeSyncReconfig(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SReCon
static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
if (cbMeta.isWeak == 0) {
SVnode *pVnode = pFsm->data;
vTrace("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state,
syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
if (cbMeta.code == 0) {
SRpcMsg rpcMsg = {.msgType = pMsg->msgType, .contLen = pMsg->contLen};
......@@ -514,11 +511,17 @@ static void vnodeSyncCommitMsg(SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta c
syncGetAndDelRespRpc(pVnode->sync, cbMeta.seqNum, &rpcMsg.info);
rpcMsg.info.conn.applyIndex = cbMeta.index;
rpcMsg.info.conn.applyTerm = cbMeta.term;
vInfo("vgId:%d, commit-cb is excuted, fsm:%p, index:%" PRId64 ", term:%" PRIu64 ", msg-index:%" PRId64
", isWeak:%d, code:%d, state:%d %s, msgtype:%d %s",
syncGetVgId(pVnode->sync), pFsm, cbMeta.index, cbMeta.term, rpcMsg.info.conn.applyIndex, cbMeta.isWeak,
cbMeta.code, cbMeta.state, syncUtilState2String(cbMeta.state), pMsg->msgType, TMSG_INFO(pMsg->msgType));
tmsgPutToQueue(&pVnode->msgCb, APPLY_QUEUE, &rpcMsg);
} else {
SRpcMsg rsp = {.code = cbMeta.code, .info = pMsg->info};
vError("vgId:%d, sync commit error, msgtype:%d,%s, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync),
pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.code, tstrerror(cbMeta.code));
vError("vgId:%d, sync commit error, msgtype:%d,%s, index:%ld, error:0x%X, errmsg:%s", syncGetVgId(pVnode->sync),
pMsg->msgType, TMSG_INFO(pMsg->msgType), cbMeta.index, cbMeta.code, tstrerror(cbMeta.code));
if (rsp.info.handle != NULL) {
tmsgSendRsp(&rsp);
}
......
......@@ -31,6 +31,7 @@ extern "C" {
#define CTG_DEFAULT_RENT_SECOND 10
#define CTG_DEFAULT_RENT_SLOT_SIZE 10
#define CTG_DEFAULT_MAX_RETRY_TIMES 3
#define CTG_DEFAULT_BATCH_NUM 64
#define CTG_RENT_SLOT_SECOND 1.5
......@@ -38,6 +39,8 @@ extern "C" {
#define CTG_ERR_CODE_TABLE_NOT_EXIST TSDB_CODE_PAR_TABLE_NOT_EXIST
#define CTG_BATCH_FETCH 1
enum {
CTG_READ = 1,
CTG_WRITE,
......@@ -200,8 +203,20 @@ typedef struct SCatalog {
SCtgRentMgmt stbRent;
} SCatalog;
typedef struct SCtgBatch {
int32_t batchId;
int32_t msgType;
int32_t msgSize;
SArray* pMsgs;
SRequestConnInfo conn;
char dbFName[TSDB_DB_FNAME_LEN];
SArray* pTaskIds;
} SCtgBatch;
typedef struct SCtgJob {
int64_t refId;
int32_t batchId;
SHashObj* pBatchs;
SArray* pTasks;
int32_t taskDone;
SMetaData jobRes;
......@@ -236,6 +251,16 @@ typedef struct SCtgMsgCtx {
char* target;
} SCtgMsgCtx;
typedef struct SCtgTaskCallbackParam {
uint64_t queryId;
int64_t refId;
SArray* taskId;
int32_t reqType;
int32_t batchId;
} SCtgTaskCallbackParam;
typedef struct SCtgTask SCtgTask;
typedef int32_t (*ctgSubTaskCbFp)(SCtgTask*);
......@@ -258,6 +283,7 @@ typedef struct SCtgTask {
SRWLatch lock;
SArray* pParents;
SCtgSubRes subRes;
SHashObj* pBatchs;
} SCtgTask;
typedef int32_t (*ctgInitTaskFp)(SCtgJob*, int32_t, void*);
......@@ -618,6 +644,7 @@ int32_t ctgGetTbMetaFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SNa
int32_t ctgGetTableCfgFromVnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, SVgroupInfo *vgroupInfo, STableCfg **out, SCtgTask* pTask);
int32_t ctgGetTableCfgFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, const SName* pTableName, STableCfg **out, SCtgTask* pTask);
int32_t ctgGetSvrVerFromMnode(SCatalog* pCtg, SRequestConnInfo *pConn, char **out, SCtgTask* pTask);
int32_t ctgLaunchBatchs(SCatalog* pCtg, SCtgJob *pJob, SHashObj* pBatchs);
int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const SCatalogReq* pReq, catalogCallback fp, void* param);
int32_t ctgLaunchJob(SCtgJob *pJob);
......@@ -626,6 +653,9 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp,
int32_t ctgGetTbCfgCb(SCtgTask *pTask);
void ctgFreeHandle(SCatalog* pCatalog);
void ctgFreeMsgSendParam(void* param);
void ctgFreeBatch(SCtgBatch *pBatch);
void ctgFreeBatchs(SHashObj *pBatchs);
int32_t ctgCloneVgInfo(SDBVgInfo *src, SDBVgInfo **dst);
int32_t ctgCloneMetaOutput(STableMetaOutput *output, STableMetaOutput **pOutput);
int32_t ctgGenerateVgList(SCatalog *pCtg, SHashObj *vgHash, SArray** pList);
......@@ -642,7 +672,7 @@ int32_t ctgDbVgVersionSearchCompare(const void* key1, const void* key2);
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput);
int32_t ctgUpdateMsgCtx(SCtgMsgCtx* pCtx, int32_t reqType, void* out, char* target);
char * ctgTaskTypeStr(CTG_TASK_TYPE type);
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask);
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId);
int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes);
void ctgFreeSTableIndex(void *info);
void ctgClearSubTaskRes(SCtgSubRes *pRes);
......
......@@ -20,12 +20,6 @@
extern "C" {
#endif
typedef struct SCtgTaskCallbackParam {
uint64_t queryId;
int64_t refId;
uint64_t taskId;
int32_t reqType;
} SCtgTaskCallbackParam;
#ifdef __cplusplus
......
......@@ -473,8 +473,15 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
pJob->tbCfgNum = tbCfgNum;
pJob->svrVerNum = svrVerNum;
pJob->pTasks = taosArrayInit(taskNum, sizeof(SCtgTask));
#if CTG_BATCH_FETCH
pJob->pBatchs = taosHashInit(CTG_DEFAULT_BATCH_NUM, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), false, HASH_NO_LOCK);
if (NULL == pJob->pBatchs) {
ctgError("taosHashInit %d batch failed", CTG_DEFAULT_BATCH_NUM);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
}
#endif
pJob->pTasks = taosArrayInit(taskNum, sizeof(SCtgTask));
if (NULL == pJob->pTasks) {
ctgError("taosArrayInit %d tasks failed", taskNum);
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
......@@ -560,7 +567,7 @@ int32_t ctgInitJob(SCatalog* pCtg, SRequestConnInfo *pConn, SCtgJob** job, const
_return:
taosMemoryFreeClear(*job);
ctgFreeJob(*job);
CTG_RET(code);
}
......@@ -776,7 +783,8 @@ int32_t ctgCallSubCb(SCtgTask *pTask) {
pParent->subRes.code = code;
}
}
pParent->pBatchs = pTask->pBatchs;
CTG_ERR_JRET(pParent->subRes.fp(pParent));
}
......@@ -872,7 +880,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
SVgroupInfo vgInfo = {0};
CTG_ERR_JRET(ctgGetVgInfoFromHashValue(pCtg, dbCache->vgCache.vgInfo, ctx->pName, &vgInfo));
ctgDebug("will refresh tbmeta, not supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
ctgDebug("will refresh tbmeta, supposed to be stb, tbName:%s, flag:%d", tNameGetTableName(ctx->pName), ctx->flag);
ctx->vgId = vgInfo.vgId;
CTG_ERR_JRET(ctgGetTbMetaFromVnode(pCtg, pConn, ctx->pName, &vgInfo, NULL, pTask));
......@@ -890,7 +898,7 @@ int32_t ctgHandleGetTbMetaRsp(SCtgTask* pTask, int32_t reqType, const SDataBuf *
return TSDB_CODE_SUCCESS;
}
ctgError("no tbmeta got, tbNmae:%s", tNameGetTableName(ctx->pName));
ctgError("no tbmeta got, tbName:%s", tNameGetTableName(ctx->pName));
ctgRemoveTbMetaFromCache(pCtg, ctx->pName, false);
CTG_ERR_JRET(CTG_ERR_CODE_TABLE_NOT_EXIST);
......@@ -1653,6 +1661,7 @@ int32_t ctgSetSubTaskCb(SCtgTask *pSub, SCtgTask *pTask) {
if (CTG_TASK_DONE == pSub->status) {
pTask->subRes.code = pSub->code;
CTG_ERR_JRET((*gCtgAsyncFps[pTask->type].cloneFp)(pSub, &pTask->subRes.res));
pTask->pBatchs = pSub->pBatchs;
CTG_ERR_JRET(pTask->subRes.fp(pTask));
} else {
if (NULL == pSub->pParents) {
......@@ -1690,6 +1699,7 @@ int32_t ctgLaunchSubTask(SCtgTask *pTask, CTG_TASK_TYPE type, ctgSubTaskCbFp fp,
CTG_ERR_RET(ctgSetSubTaskCb(pSub, pTask));
if (newTask) {
pSub->pBatchs = pTask->pBatchs;
CTG_ERR_RET((*gCtgAsyncFps[pSub->type].launchFp)(pSub));
pSub->status = CTG_TASK_LAUNCHED;
}
......@@ -1702,9 +1712,11 @@ int32_t ctgLaunchJob(SCtgJob *pJob) {
for (int32_t i = 0; i < taskNum; ++i) {
SCtgTask *pTask = taosArrayGet(pJob->pTasks, i);
pTask->pBatchs = pJob->pBatchs;
qDebug("QID:0x%" PRIx64 " ctg launch [%dth] task", pJob->queryId, pTask->taskId);
CTG_ERR_RET((*gCtgAsyncFps[pTask->type].launchFp)(pTask));
pTask->status = CTG_TASK_LAUNCHED;
}
......@@ -1712,6 +1724,10 @@ int32_t ctgLaunchJob(SCtgJob *pJob) {
qDebug("QID:0x%" PRIx64 " ctg call user callback with rsp %s", pJob->queryId, tstrerror(pJob->jobResCode));
taosAsyncExec(ctgCallUserCb, pJob, NULL);
#if CTG_BATCH_FETCH
} else {
ctgLaunchBatchs(pJob->pCtg, pJob, pJob->pBatchs);
#endif
}
return TSDB_CODE_SUCCESS;
......
此差异已折叠。
......@@ -19,6 +19,39 @@
#include "catalogInt.h"
#include "systable.h"
void ctgFreeMsgSendParam(void* param) {
if (NULL == param) {
return;
}
SCtgTaskCallbackParam* pParam = (SCtgTaskCallbackParam*)param;
taosArrayDestroy(pParam->taskId);
taosMemoryFree(param);
}
void ctgFreeBatch(SCtgBatch *pBatch) {
if (NULL == pBatch) {
return;
}
taosArrayDestroy(pBatch->pMsgs);
taosArrayDestroy(pBatch->pTaskIds);
}
void ctgFreeBatchs(SHashObj *pBatchs) {
void* p = taosHashIterate(pBatchs, NULL);
while (NULL != p) {
SCtgBatch* pBatch = (SCtgBatch*)p;
ctgFreeBatch(pBatch);
p = taosHashIterate(pBatchs, p);
}
taosHashCleanup(pBatchs);
}
char *ctgTaskTypeStr(CTG_TASK_TYPE type) {
switch (type) {
case CTG_TASK_GET_QNODE:
......@@ -612,6 +645,7 @@ void ctgFreeJob(void* job) {
uint64_t qid = pJob->queryId;
ctgFreeTasks(pJob->pTasks);
ctgFreeBatchs(pJob->pBatchs);
ctgFreeSMetaData(&pJob->jobRes);
......@@ -867,14 +901,10 @@ int32_t ctgCloneTableIndex(SArray* pIndex, SArray** pRes) {
}
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, SCtgTask* pTask) {
if (msgType == TDMT_VND_TABLE_META) {
SCtgTbMetaCtx* ctx = (SCtgTbMetaCtx*)pTask->taskCtx;
char dbFName[TSDB_DB_FNAME_LEN];
tNameGetFullDbName(ctx->pName, dbFName);
int32_t ctgUpdateSendTargetInfo(SMsgSendInfo *pMsgSendInfo, int32_t msgType, char* dbFName, int32_t vgId) {
if (msgType == TDMT_VND_TABLE_META || msgType == TDMT_VND_TABLE_CFG || msgType == TDMT_VND_BATCH_META) {
pMsgSendInfo->target.type = TARGET_TYPE_VNODE;
pMsgSendInfo->target.vgId = ctx->vgId;
pMsgSendInfo->target.vgId = vgId;
pMsgSendInfo->target.dbFName = strdup(dbFName);
} else {
pMsgSendInfo->target.type = TARGET_TYPE_MNODE;
......
......@@ -318,6 +318,7 @@ typedef struct STableScanInfo {
int32_t currentTable;
int8_t scanMode;
int8_t noTable;
int8_t assignBlockUid;
} STableScanInfo;
typedef struct STableMergeScanInfo {
......
此差异已折叠。
此差异已折叠。
......@@ -82,6 +82,7 @@ static bool columnNodeEqual(const SColumnNode* a, const SColumnNode* b) {
COMPARE_STRING_FIELD(dbName);
COMPARE_STRING_FIELD(tableName);
COMPARE_STRING_FIELD(colName);
COMPARE_STRING_FIELD(tableAlias);
return true;
}
......
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
此差异已折叠。
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册