提交 6a833a8d 编写于 作者: C Cary Xu

Merge branch '3.0' into feature/TD-11274-3.0

...@@ -10,7 +10,9 @@ ELSEIF (TD_WINDOWS) ...@@ -10,7 +10,9 @@ ELSEIF (TD_WINDOWS)
# INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/python DESTINATION connector) # INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/python DESTINATION connector)
# INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/C\# DESTINATION connector) # INSTALL(DIRECTORY ${TD_SOURCE_DIR}/src/connector/C\# DESTINATION connector)
# INSTALL(DIRECTORY ${TD_SOURCE_DIR}/examples DESTINATION .) # INSTALL(DIRECTORY ${TD_SOURCE_DIR}/examples DESTINATION .)
INSTALL(FILES ${TD_SOURCE_DIR}/packaging/cfg/taos.cfg DESTINATION cfg) INSTALL(CODE "IF (NOT EXISTS ${CMAKE_INSTALL_PREFIX}/cfg/taos.cfg)
execute_process(COMMAND ${CMAKE_COMMAND} -E copy ${TD_SOURCE_DIR}/packaging/cfg/taos.cfg ${CMAKE_INSTALL_PREFIX}/cfg/taos.cfg)
ENDIF ()")
INSTALL(FILES ${TD_SOURCE_DIR}/include/client/taos.h DESTINATION include) INSTALL(FILES ${TD_SOURCE_DIR}/include/client/taos.h DESTINATION include)
INSTALL(FILES ${TD_SOURCE_DIR}/include/util/taoserror.h DESTINATION include) INSTALL(FILES ${TD_SOURCE_DIR}/include/util/taoserror.h DESTINATION include)
INSTALL(FILES ${TD_SOURCE_DIR}/include/libs/function/taosudf.h DESTINATION include) INSTALL(FILES ${TD_SOURCE_DIR}/include/libs/function/taosudf.h DESTINATION include)
......
...@@ -27,6 +27,10 @@ else () ...@@ -27,6 +27,10 @@ else ()
cat("${TD_SUPPORT_DIR}/taosadapter_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/taosadapter_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif() endif()
if(TD_LINUX_64 AND JEMALLOC_ENABLED)
cat("${TD_SUPPORT_DIR}/jemalloc_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
endif()
# pthread # pthread
if(${BUILD_PTHREAD}) if(${BUILD_PTHREAD})
cat("${TD_SUPPORT_DIR}/pthread_CMakeLists.txt.in" ${CONTRIB_TMP_FILE}) cat("${TD_SUPPORT_DIR}/pthread_CMakeLists.txt.in" ${CONTRIB_TMP_FILE})
...@@ -392,6 +396,19 @@ if(${BUILD_WITH_SQLITE}) ...@@ -392,6 +396,19 @@ if(${BUILD_WITH_SQLITE})
endif(NOT TD_WINDOWS) endif(NOT TD_WINDOWS)
endif(${BUILD_WITH_SQLITE}) endif(${BUILD_WITH_SQLITE})
# jemalloc
IF (TD_LINUX_64 AND JEMALLOC_ENABLED)
include(ExternalProject)
ExternalProject_Add(jemalloc
PREFIX "jemalloc"
SOURCE_DIR ${CMAKE_CURRENT_SOURCE_DIR}/jemalloc
BUILD_IN_SOURCE 1
CONFIGURE_COMMAND ./autogen.sh COMMAND ./configure --prefix=${CMAKE_BINARY_DIR}/build/
BUILD_COMMAND ${MAKE}
)
INCLUDE_DIRECTORIES(${CMAKE_BINARY_DIR}/build/include)
ENDIF ()
# addr2line # addr2line
if(${BUILD_ADDR2LINE}) if(${BUILD_ADDR2LINE})
if(NOT ${TD_WINDOWS}) if(NOT ${TD_WINDOWS})
......
...@@ -11,7 +11,7 @@ import TabItem from "@theme/TabItem"; ...@@ -11,7 +11,7 @@ import TabItem from "@theme/TabItem";
::: :::
TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包。 TDengine 开源版本提供 deb 和 rpm 格式安装包,用户可以根据自己的运行环境选择合适的安装包。其中 deb 支持 Debian/Ubuntu 及衍生系统,rpm 支持 CentOS/RHEL/SUSE 及衍生系统。同时我们也为企业用户提供 tar.gz 格式安装包。也支持通过 `apt-get` 工具从线上进行安装。
## 安装 ## 安装
...@@ -293,4 +293,4 @@ taos> select avg(current), max(voltage), min(phase) from test.meters where group ...@@ -293,4 +293,4 @@ taos> select avg(current), max(voltage), min(phase) from test.meters where group
```sql ```sql
taos> select avg(current), max(voltage), min(phase) from test.d10 interval(10s); taos> select avg(current), max(voltage), min(phase) from test.d10 interval(10s);
``` ```
\ No newline at end of file
...@@ -25,6 +25,7 @@ TDengine 采用 SQL 作为查询语言。应用程序可以通过 REST API 或 ...@@ -25,6 +25,7 @@ TDengine 采用 SQL 作为查询语言。应用程序可以通过 REST API 或
- 单列、多列数据查询 - 单列、多列数据查询
- 标签和数值的多种过滤条件:>, <, =, <\>, like 等 - 标签和数值的多种过滤条件:>, <, =, <\>, like 等
- 聚合结果的分组(Group by)、排序(Order by)、约束输出(Limit/Offset) - 聚合结果的分组(Group by)、排序(Order by)、约束输出(Limit/Offset)
- 时间窗口(Interval)、会话窗口(Session)和状态窗口(State_window)等窗口切分聚合查询
- 数值列及聚合结果的四则运算 - 数值列及聚合结果的四则运算
- 时间戳对齐的连接查询(Join Query: 隐式连接)操作 - 时间戳对齐的连接查询(Join Query: 隐式连接)操作
- 多种聚合/计算函数: count, max, min, avg, sum, twa, stddev, leastsquares, top, bottom, first, last, percentile, apercentile, last_row, spread, diff 等 - 多种聚合/计算函数: count, max, min, avg, sum, twa, stddev, leastsquares, top, bottom, first, last, percentile, apercentile, last_row, spread, diff 等
...@@ -40,7 +41,7 @@ taos> select * from d1001 where voltage > 215 order by ts desc limit 2; ...@@ -40,7 +41,7 @@ taos> select * from d1001 where voltage > 215 order by ts desc limit 2;
Query OK, 2 row(s) in set (0.001100s) Query OK, 2 row(s) in set (0.001100s)
``` ```
为满足物联网场景的需求,TDengine 支持几个特殊的函数,比如 twa(时间加权平均),spread (最大值与最小值的差),last_row(最后一条记录)等,更多与物联网场景相关的函数将添加进来。TDengine 还支持连续查询。 为满足物联网场景的需求,TDengine 支持几个特殊的函数,比如 twa(时间加权平均),spread (最大值与最小值的差),last_row(最后一条记录)等,更多与物联网场景相关的函数将添加进来。
具体的查询语法请看 [TAOS SQL 的数据查询](/taos-sql/select) 章节。 具体的查询语法请看 [TAOS SQL 的数据查询](/taos-sql/select) 章节。
...@@ -73,7 +74,7 @@ taos> SELECT count(*), max(current) FROM meters where groupId = 2 and ts > now - ...@@ -73,7 +74,7 @@ taos> SELECT count(*), max(current) FROM meters where groupId = 2 and ts > now -
Query OK, 1 row(s) in set (0.002136s) Query OK, 1 row(s) in set (0.002136s)
``` ```
TDengine 仅容许对属于同一个超级表的表之间进行聚合查询,不同超级表之间的聚合查询不支持。在 [TAOS SQL 的数据查询](/taos-sql/select) 一章,查询类操作都会注明是否支持超级表。 在 [TAOS SQL 的数据查询](/taos-sql/select) 一章,查询类操作都会注明是否支持超级表。
## 降采样查询、插值 ## 降采样查询、插值
......
...@@ -6,11 +6,11 @@ title: 数据订阅 ...@@ -6,11 +6,11 @@ title: 数据订阅
为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。 为了帮助应用实时获取写入 TDengine 的数据,或者以事件到达顺序处理数据,TDengine提供了类似消息队列产品的数据订阅、消费接口。这样在很多场景下,采用 TDengine 的时序数据处理系统不再需要集成消息队列产品,比如 kafka, 从而简化系统设计的复杂度,降低运营维护成本。
与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 可以是一张超级表,或一张子表。不仅如此,你可以通过标签、表名、列、表达式等多种方法过滤所需数据,并且支持对数据进行函数变换、预处理(包括标量udf计算)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤交给 TDengine,而不是应用完成,有效的减少传输的数据量 与 kafka 一样,你需要定义 topic, 但 TDengine 的 topic 是基于一个已经存在的超级表、子表或普通表的查询条件,即一个 SELECT 语句。你可以使用 SQL 对标签、表名、列、表达式等条件进行过滤,以及对数据进行标量函数与 UDF 计算(不包括数据聚合)。与其他消息队列软件相比,这是 TDengine 数据订阅功能的最大的优势,它提供了更大的灵活性,数据的颗粒度可以由应用随时调整,而且数据的过滤与预处理交给 TDengine,而不是应用完成,有效的减少传输的数据量与应用的复杂度
消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程分布式的消费数据,提高数据通吐率。但不同消费者组即使消费同一个topic, 并不共享消费进度。一个消费者组可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的vnode上,也就是多个shard上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的ACK机制,在宕机、重启等复杂环境下确保at least once消费。 消费者订阅 topic 后,可以实时获得最新的数据。多个消费者可以组成一个消费者组 (consumer group), 一个消费者组里的多个消费者共享消费进度,便于多线程、分布式地消费数据,提高消费速度。但不同消费者组中的消费者即使消费同一个topic, 并不共享消费进度。一个消费者可以订阅多个 topic。如果订阅的是超级表,数据可能会分布在多个不同的 vnode 上,也就是多个 shard 上,这样一个消费组里有多个消费者可以提高消费效率。TDengine 的消息队列提供了消息的ACK机制,在宕机、重启等复杂环境下确保 at least once 消费。
为了实现上述功能,TDengine 采用了灵活的 WAL (Write-Ahead-Log) 文件切换与保留机制:可以按照时间或文件大小来保留WAL文件(详见create database语句)。在消费时,TDengine 从 WAL 中获取数据,并经过过滤、变换等操作,将数据推送给消费者。 为了实现上述功能,TDengine 会为 WAL (Write-Ahead-Log) 文件自动创建索引以支持快速随机访问,并提供了灵活可配置的文件切换与保留机制:用户可以按需指定 WAL 文件保留的时间以及大小(详见 create database 语句)。通过以上方式将 WAL 改造成了一个保留事件到达顺序的、可持久化的存储引擎(但由于 TSDB 具有远比 WAL 更高的压缩率,我们不推荐保留太长时间,一般来说,不超过几天)。 对于以 topic 形式创建的查询,TDengine 将对接 WAL 而不是 TSDB 作为其存储引擎。在消费时,TDengine 根据当前消费进度从 WAL 直接读取数据,并使用统一的查询引擎实现过滤、变换等操作,将数据推送给消费者。
本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。 本文档不对消息队列本身的基础知识做介绍,如果需要了解,请自行搜索。
......
...@@ -104,6 +104,7 @@ typedef struct SDataBlockInfo { ...@@ -104,6 +104,7 @@ typedef struct SDataBlockInfo {
uint32_t capacity; uint32_t capacity;
// TODO: optimize and remove following // TODO: optimize and remove following
int64_t version; // used for stream, and need serialization int64_t version; // used for stream, and need serialization
int64_t ts; // used for stream, and need serialization
int32_t childId; // used for stream, do not serialize int32_t childId; // used for stream, do not serialize
EStreamType type; // used for stream, do not serialize EStreamType type; // used for stream, do not serialize
STimeWindow calWin; // used for stream, do not serialize STimeWindow calWin; // used for stream, do not serialize
......
...@@ -3100,7 +3100,7 @@ typedef struct { ...@@ -3100,7 +3100,7 @@ typedef struct {
void* msg; void* msg;
} SBatchRsp; } SBatchRsp;
static FORCE_INLINE void tFreeSBatchRsp(void *p) { static FORCE_INLINE void tFreeSBatchRsp(void* p) {
if (NULL == p) { if (NULL == p) {
return; return;
} }
......
...@@ -17,6 +17,7 @@ ...@@ -17,6 +17,7 @@
#include "os.h" #include "os.h"
#include "query.h" #include "query.h"
#include "tdatablock.h" #include "tdatablock.h"
#include "tdbInt.h"
#include "tmsg.h" #include "tmsg.h"
#include "tmsgcb.h" #include "tmsgcb.h"
#include "tqueue.h" #include "tqueue.h"
...@@ -85,6 +86,12 @@ enum { ...@@ -85,6 +86,12 @@ enum {
TASK_OUTPUT__FETCH, TASK_OUTPUT__FETCH,
}; };
enum {
STREAM_QUEUE__SUCESS = 1,
STREAM_QUEUE__FAILED,
STREAM_QUEUE__PROCESSING,
};
typedef struct { typedef struct {
int8_t type; int8_t type;
} SStreamQueueItem; } SStreamQueueItem;
...@@ -123,12 +130,6 @@ typedef struct { ...@@ -123,12 +130,6 @@ typedef struct {
SSDataBlock* pBlock; SSDataBlock* pBlock;
} SStreamTrigger; } SStreamTrigger;
enum {
STREAM_QUEUE__SUCESS = 1,
STREAM_QUEUE__FAILED,
STREAM_QUEUE__PROCESSING,
};
typedef struct { typedef struct {
STaosQueue* queue; STaosQueue* queue;
STaosQall* qall; STaosQall* qall;
...@@ -233,6 +234,7 @@ typedef struct { ...@@ -233,6 +234,7 @@ typedef struct {
typedef struct SStreamTask { typedef struct SStreamTask {
int64_t streamId; int64_t streamId;
int32_t taskId; int32_t taskId;
int32_t totalLevel;
int8_t taskLevel; int8_t taskLevel;
int8_t outputType; int8_t outputType;
int16_t dispatchMsgType; int16_t dispatchMsgType;
...@@ -458,13 +460,26 @@ int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp); ...@@ -458,13 +460,26 @@ int32_t streamProcessRetrieveRsp(SStreamTask* pTask, SStreamRetrieveRsp* pRsp);
int32_t streamTryExec(SStreamTask* pTask); int32_t streamTryExec(SStreamTask* pTask);
int32_t streamSchedExec(SStreamTask* pTask); int32_t streamSchedExec(SStreamTask* pTask);
typedef struct SStreamMeta SStreamMeta; typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask);
typedef struct SStreamMeta {
char* path;
TDB* db;
TTB* pTaskDb;
TTB* pStateDb;
SHashObj* pTasks;
void* ahandle;
TXN txn;
FTaskExpand* expandFunc;
} SStreamMeta;
SStreamMeta* streamMetaOpen(); SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc);
void streamMetaClose(SStreamMeta* streamMeta); void streamMetaClose(SStreamMeta* streamMeta);
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask); int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId); int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen);
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId);
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId);
int32_t streamMetaBegin(SStreamMeta* pMeta); int32_t streamMetaBegin(SStreamMeta* pMeta);
int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaCommit(SStreamMeta* pMeta);
......
...@@ -57,7 +57,7 @@ extern "C" { ...@@ -57,7 +57,7 @@ extern "C" {
#if defined(WINDOWS) #if defined(WINDOWS)
char *stpcpy (char *dest, const char *src); char *stpcpy (char *dest, const char *src);
char *stpncpy (char *dest, const char *src, size_t n); char *stpncpy (char *dest, const char *src, int n);
// specific // specific
#ifndef __COMPAR_FN_T #ifndef __COMPAR_FN_T
...@@ -77,7 +77,7 @@ extern "C" { ...@@ -77,7 +77,7 @@ extern "C" {
char * strsep(char **stringp, const char *delim); char * strsep(char **stringp, const char *delim);
char * getpass(const char *prefix); char * getpass(const char *prefix);
char * strndup(const char *s, size_t n); char * strndup(const char *s, int n);
// for send function in tsocket.c // for send function in tsocket.c
#define MSG_NOSIGNAL 0 #define MSG_NOSIGNAL 0
......
...@@ -2,61 +2,78 @@ ...@@ -2,61 +2,78 @@
set internal_dir=%~dp0\..\..\ set internal_dir=%~dp0\..\..\
set community_dir=%~dp0\.. set community_dir=%~dp0\..
cd %community_dir% set package_dir=%cd%
git checkout -- .
cd %community_dir%\packaging
:: %1 name %2 version :: %1 name %2 version
if !%1==! GOTO USAGE if !%1==! GOTO USAGE
if !%2==! GOTO USAGE if !%2==! GOTO USAGE
if %1 == taos GOTO TAOS
if %1 == power GOTO POWER if "%1" == "cluster" (
if %1 == tq GOTO TQ set work_dir=%internal_dir%
if %1 == pro GOTO PRO set packagServerName_x64=TDengine-enterprise-server-%2-beta-Windows-x64
if %1 == kh GOTO KH set packagServerName_x86=TDengine-enterprise-server-%2-beta-Windows-x86
if %1 == jh GOTO JH set packagClientName_x64=TDengine-enterprise-client-%2-beta-Windows-x64
GOTO USAGE set packagClientName_x86=TDengine-enterprise-client-%2-beta-Windows-x86
) else (
:TAOS set work_dir=%community_dir%
goto RELEASE set packagServerName_x64=TDengine-server-%2-Windows-x64
set packagServerName_x86=TDengine-server-%2-Windows-x86
:POWER set packagClientName_x64=TDengine-client-%2-Windows-x64
call sed_power.bat %community_dir% set packagClientName_x86=TDengine-client-%2-Windows-x86
goto RELEASE )
:TQ echo release windows-client for %1, version: %2
call sed_tq.bat %community_dir% if not exist %work_dir%\debug (
goto RELEASE md %work_dir%\debug
)
:PRO if not exist %work_dir%\debug\ver-%2-x64 (
call sed_pro.bat %community_dir% md %work_dir%\debug\ver-%2-x64
goto RELEASE
:KH
call sed_kh.bat %community_dir%
goto RELEASE
:JH
call sed_jh.bat %community_dir%
goto RELEASE
:RELEASE
echo release windows-client-64 for %1, version: %2
if not exist %internal_dir%\debug\ver-%2-64bit-%1 (
md %internal_dir%\debug\ver-%2-64bit-%1
) else ( ) else (
rd /S /Q %internal_dir%\debug\ver-%2-64bit-%1 rd /S /Q %work_dir%\debug\ver-%2-x64
md %internal_dir%\debug\ver-%2-64bit-%1 md %work_dir%\debug\ver-%2-x64
) )
cd %internal_dir%\debug\ver-%2-64bit-%1 if not exist %work_dir%\debug\ver-%2-x86 (
call "C:\Program Files (x86)\Microsoft Visual Studio 14.0\VC\vcvarsall.bat" amd64 md %work_dir%\debug\ver-%2-x86
cmake ../../ -G "NMake Makefiles" -DVERNUMBER=%2 -DCPUTYPE=x64 ) else (
set CL=/MP4 rd /S /Q %work_dir%\debug\ver-%2-x86
nmake install md %work_dir%\debug\ver-%2-x86
)
cd %work_dir%\debug\ver-%2-x64
call vcvarsall.bat x64
cmake ../../ -G "NMake Makefiles JOM" -DCMAKE_MAKE_PROGRAM=jom -DBUILD_TOOLS=true -DBUILD_HTTP=false -DVERNUMBER=%2 -DCPUTYPE=x64
cmake --build .
rd /s /Q C:\TDengine
cmake --install .
if not %errorlevel% == 0 ( call :RUNFAILED build x64 failed & exit /b 1)
cd %package_dir%
iscc /DMyAppInstallName="%packagServerName_x64%" /DMyAppVersion="%2" /DMyAppExcludeSource="" tools\tdengine.iss /O..\release
if not %errorlevel% == 0 ( call :RUNFAILED package %packagServerName_x64% failed & exit /b 1)
iscc /DMyAppInstallName="%packagClientName_x64%" /DMyAppVersion="%2" /DMyAppExcludeSource="taosd.exe" tools\tdengine.iss /O..\release
if not %errorlevel% == 0 ( call :RUNFAILED package %packagClientName_x64% failed & exit /b 1)
cd %work_dir%\debug\ver-%2-x86
call vcvarsall.bat x86
cmake ../../ -G "NMake Makefiles JOM" -DCMAKE_MAKE_PROGRAM=jom -DBUILD_TOOLS=true -DBUILD_HTTP=false -DVERNUMBER=%2 -DCPUTYPE=x86
cmake --build .
rd /s /Q C:\TDengine
cmake --install .
if not %errorlevel% == 0 ( call :RUNFAILED build x86 failed & exit /b 1)
cd %package_dir%
iscc /DMyAppInstallName="%packagServerName_x86%" /DMyAppVersion="%2" /DMyAppExcludeSource="" tools\tdengine.iss /O..\release
if not %errorlevel% == 0 ( call :RUNFAILED package %packagServerName_x86% failed & exit /b 1)
iscc /DMyAppInstallName="%packagClientName_x86%" /DMyAppVersion="%2" /DMyAppExcludeSource="taosd.exe" tools\tdengine.iss /O..\release
if not %errorlevel% == 0 ( call :RUNFAILED package %packagClientName_x86% failed & exit /b 1)
goto EXIT0 goto EXIT0
:USAGE :USAGE
echo Usage: release.bat $productName $version echo Usage: release.bat $verMode $version
goto EXIT0 goto EXIT0
:EXIT0 :EXIT0
\ No newline at end of file exit /b
:RUNFAILED
echo %*
cd %package_dir%
goto :eof
\ No newline at end of file
...@@ -26,7 +26,7 @@ soMode=dynamic # [static | dynamic] ...@@ -26,7 +26,7 @@ soMode=dynamic # [static | dynamic]
dbName=taos # [taos | ...] dbName=taos # [taos | ...]
allocator=glibc # [glibc | jemalloc] allocator=glibc # [glibc | jemalloc]
verNumber="" verNumber=""
verNumberComp="2.0.0.0" verNumberComp="3.0.0.0"
httpdBuild=false httpdBuild=false
while getopts "hv:V:c:o:l:s:d:a:n:m:H:" arg; do while getopts "hv:V:c:o:l:s:d:a:n:m:H:" arg; do
...@@ -216,7 +216,7 @@ else ...@@ -216,7 +216,7 @@ else
fi fi
# check support cpu type # check support cpu type
if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "mips64" ]]; then if [[ "$cpuType" == "x64" ]] || [[ "$cpuType" == "aarch64" ]] || [[ "$cpuType" == "aarch32" ]] || [[ "$cpuType" == "arm64" ]] || [[ "$cpuType" == "arm32" ]] || [[ "$cpuType" == "mips64" ]]; then
if [ "$verMode" != "cluster" ]; then if [ "$verMode" != "cluster" ]; then
# community-version compile # community-version compile
cmake ../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DPAGMODE=${pagMode} -DBUILD_HTTP=${BUILD_HTTP} -DBUILD_TOOLS=${BUILD_TOOLS} ${allocator_macro} cmake ../ -DCPUTYPE=${cpuType} -DOSTYPE=${osType} -DSOMODE=${soMode} -DDBNAME=${dbName} -DVERTYPE=${verType} -DVERDATE="${build_time}" -DGITINFO=${gitinfo} -DGITINFOI=${gitinfoOfInternal} -DVERNUMBER=${verNumber} -DVERCOMPATIBLE=${verNumberComp} -DPAGMODE=${pagMode} -DBUILD_HTTP=${BUILD_HTTP} -DBUILD_TOOLS=${BUILD_TOOLS} ${allocator_macro}
......
...@@ -30,7 +30,6 @@ configDir="/etc/taos" ...@@ -30,7 +30,6 @@ configDir="/etc/taos"
installDir="/usr/local/taos" installDir="/usr/local/taos"
adapterName="taosadapter" adapterName="taosadapter"
benchmarkName="taosBenchmark" benchmarkName="taosBenchmark"
tmqName="tmq_sim"
dumpName="taosdump" dumpName="taosdump"
demoName="taosdemo" demoName="taosdemo"
......
...@@ -60,7 +60,7 @@ if [ "$pagMode" == "lite" ]; then ...@@ -60,7 +60,7 @@ if [ "$pagMode" == "lite" ]; then
strip ${build_dir}/bin/${serverName} strip ${build_dir}/bin/${serverName}
strip ${build_dir}/bin/${clientName} strip ${build_dir}/bin/${clientName}
# lite version doesn't include taosadapter, which will lead to no restful interface # lite version doesn't include taosadapter, which will lead to no restful interface
bin_files="${build_dir}/bin/${serverName} ${build_dir}/bin/${clientName} ${script_dir}/remove.sh ${script_dir}/startPre.sh ${build_dir}/bin/taosBenchmark ${build_dir}/bin/tmq_sim" bin_files="${build_dir}/bin/${serverName} ${build_dir}/bin/${clientName} ${script_dir}/remove.sh ${script_dir}/startPre.sh ${build_dir}/bin/taosBenchmark "
taostools_bin_files="" taostools_bin_files=""
else else
...@@ -78,7 +78,6 @@ else ...@@ -78,7 +78,6 @@ else
taostools_bin_files=" ${build_dir}/bin/taosdump \ taostools_bin_files=" ${build_dir}/bin/taosdump \
${build_dir}/bin/taosBenchmark \ ${build_dir}/bin/taosBenchmark \
${build_dir}/bin/tmq_sim \
${build_dir}/bin/TDinsight.sh \ ${build_dir}/bin/TDinsight.sh \
$tdinsight_caches" $tdinsight_caches"
......
@echo off
cd C:\TDengine
if not "%1" == "" (
%1 --help
@cmd /k
)
\ No newline at end of file
#define MyAppName "TDengine"
#define MyAppPublisher "taosdata"
#define MyAppURL "http://www.taosdata.com/"
#define MyAppBeforeInstallTxt "windows_before_install.txt"
#define MyAppIco "favicon.ico"
#define MyAppInstallDir "C:\TDengine"
#define MyAppOutputDir "./"
#define MyAppSourceDir "C:\TDengine"
;#define MyAppAllFile "\*"
#define MyAppCfgName "\cfg\*"
#define MyAppDriverName "\driver\*"
#define MyAppConnectorName "\connector\*"
#define MyAppExamplesName "\examples\*"
#define MyAppIncludeName "\include\*"
#define MyAppExeName "\*.exe"
#define MyAppTaosExeName "\taos.bat"
#define MyAppTaosdemoExeName "\taosBenchmark.exe"
#define MyAppDLLName "\driver\taos.dll"
;#define MyAppVersion "3.0"
;#define MyAppInstallName "TDengine"
[Setup]
VersionInfoVersion={#MyAppVersion}
AppId={{A0F7A93C-79C4-485D-B2B8-F0D03DF42FAB}
AppName={#MyAppName}
AppVersion={#MyAppVersion}
;AppVerName={#MyAppName} {#MyAppVersion}
AppPublisher={#MyAppPublisher}
AppPublisherURL={#MyAppURL}
AppSupportURL={#MyAppURL}
AppUpdatesURL={#MyAppURL}
DefaultDirName={#MyAppInstallDir}
DefaultGroupName={#MyAppName}
DisableProgramGroupPage=yes
InfoBeforeFile={#MyAppBeforeInstallTxt}
OutputDir={#MyAppOutputDir}
OutputBaseFilename={#MyAppInstallName}
SetupIconFile={#MyAppIco}
Compression=lzma
SolidCompression=yes
DisableDirPage=yes
Uninstallable=yes
[Languages]
Name: "chinesesimp"; MessagesFile: "compiler:Default.isl"
;Name: "english"; MessagesFile: "compiler:Languages\English.isl"
[Files]
;Source: {#MyAppSourceDir}{#MyAppAllFile}; DestDir: "{app}"; Flags: igNoreversion recursesubdirs createallsubdirs
Source: taos.bat; DestDir: "{app}\include"; Flags: igNoreversion;
;Source: taosdemo.png; DestDir: "{app}\include"; Flags: igNoreversion;
;Source: taosShell.png; DestDir: "{app}\include"; Flags: igNoreversion;
Source: favicon.ico; DestDir: "{app}\include"; Flags: igNoreversion;
Source: {#MyAppSourceDir}{#MyAppDLLName}; DestDir: "{win}\System32"; Flags: igNoreversion;
Source: {#MyAppSourceDir}{#MyAppCfgName}; DestDir: "{app}\cfg"; Flags: igNoreversion recursesubdirs createallsubdirs onlyifdoesntexist uninsneveruninstall
Source: {#MyAppSourceDir}{#MyAppDriverName}; DestDir: "{app}\driver"; Flags: igNoreversion recursesubdirs createallsubdirs
;Source: {#MyAppSourceDir}{#MyAppConnectorName}; DestDir: "{app}\connector"; Flags: igNoreversion recursesubdirs createallsubdirs
;Source: {#MyAppSourceDir}{#MyAppExamplesName}; DestDir: "{app}\examples"; Flags: igNoreversion recursesubdirs createallsubdirs
Source: {#MyAppSourceDir}{#MyAppIncludeName}; DestDir: "{app}\include"; Flags: igNoreversion recursesubdirs createallsubdirs
Source: {#MyAppSourceDir}{#MyAppExeName}; DestDir: "{app}"; Excludes: {#MyAppExcludeSource} ; Flags: igNoreversion recursesubdirs createallsubdirs
Source: {#MyAppSourceDir}{#MyAppTaosdemoExeName}; DestDir: "{app}"; Flags: igNoreversion recursesubdirs createallsubdirs
[UninstallDelete]
Name: {app}\driver; Type: filesandordirs
Name: {app}\connector; Type: filesandordirs
Name: {app}\examples; Type: filesandordirs
Name: {app}\include; Type: filesandordirs
[Tasks]
Name: "desktopicon";Description: "{cm:CreateDesktopIcon}"; GroupDescription:"{cm:AdditionalIcons}"; Flags: checkablealone
[Icons]
Name:"{group}\Taos Shell"; Filename: "{app}\include\{#MyAppTaosExeName}" ; Parameters: "taos.exe" ; IconFilename: "{app}\include\{#MyAppIco}"
Name:"{group}\Open TDengine Directory"; Filename: "{app}\"
Name:"{group}\Taosdemo"; Filename: "{app}\include\{#MyAppTaosExeName}" ; Parameters: "taosdemo.exe" ; IconFilename: "{app}\include\{#MyAppIco}"
Name: "{group}\{cm:UninstallProgram,{#MyAppName}}"; Filename: "{uninstallexe}" ; IconFilename: "{app}\include\{#MyAppIco}"
Name:"{commondesktop}\Taos Shell"; Filename: "{app}\include\{#MyAppTaosExeName}" ; Parameters: "taos.exe" ; Tasks: desktopicon; WorkingDir: "{app}" ; IconFilename: "{app}\include\{#MyAppIco}"
[Messages]
ConfirmUninstall=Do you really want to uninstall TDengine from your computer?%n%nPress [Y] to completely delete %1 and all its components;%nPress [N] to keep the software on your computer.
TDengine is a high-efficient, scalable, high-available distributed time-series database, which makes a lot of optimizations on inserting and querying data, which is far more efficient than normal regular databases. So TDengine can meet the high requirements of IOT and other areas on storing and querying a large amount of data.
TDengine will be installed under C:\TDengine, users can modify configuration file C:\TDengine\cfg\taos.cfg, set the log file path or other parameters.
\ No newline at end of file
...@@ -286,6 +286,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) { ...@@ -286,6 +286,7 @@ static int32_t hbAsyncCallBack(void *param, SDataBuf *pMsg, int32_t code) {
if (pInst == NULL || NULL == *pInst) { if (pInst == NULL || NULL == *pInst) {
taosThreadMutexUnlock(&appInfo.mutex); taosThreadMutexUnlock(&appInfo.mutex);
tscError("cluster not exist, key:%s", key); tscError("cluster not exist, key:%s", key);
taosMemoryFree(pMsg->pData);
tFreeClientHbBatchRsp(&pRsp); tFreeClientHbBatchRsp(&pRsp);
return -1; return -1;
} }
......
...@@ -1007,7 +1007,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) { ...@@ -1007,7 +1007,7 @@ int32_t tmqPollCb(void* param, SDataBuf* pMsg, int32_t code) {
taosMemoryFree(pParam); taosMemoryFree(pParam);
if (code != 0) { if (code != 0) {
tscWarn("msg discard from vgId:%d, epoch %d, code:%x", vgId, epoch, code); tscWarn("msg discard from vgId:%d, epoch %d, code:%x", vgId, epoch, code);
if (pMsg->pData) taosMemoryFree(pMsg->pData); if (pMsg->pData) taosMemoryFreeClear(pMsg->pData);
if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) { if (code == TSDB_CODE_TQ_NO_COMMITTED_OFFSET) {
SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM); SMqPollRspWrapper* pRspWrapper = taosAllocateQitem(sizeof(SMqPollRspWrapper), DEF_QITEM);
if (pRspWrapper == NULL) { if (pRspWrapper == NULL) {
...@@ -1699,7 +1699,8 @@ int32_t tmq_consumer_close(tmq_t* tmq) { ...@@ -1699,7 +1699,8 @@ int32_t tmq_consumer_close(tmq_t* tmq) {
tmq_list_destroy(lst); tmq_list_destroy(lst);
return rsp; /*return rsp;*/
return 0;
} }
// TODO: free resources // TODO: free resources
return 0; return 0;
......
...@@ -16,15 +16,15 @@ ...@@ -16,15 +16,15 @@
#include "systable.h" #include "systable.h"
#include "taos.h" #include "taos.h"
#include "tdef.h" #include "tdef.h"
#include "types.h"
#include "tgrant.h" #include "tgrant.h"
#include "types.h"
#define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_TABLE_NAME_LEN ((TSDB_TABLE_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_DB_NAME_LEN ((TSDB_DB_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
#define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE) #define SYSTABLE_SCH_COL_NAME_LEN ((TSDB_COL_NAME_LEN - 1) + VARSTR_HEADER_SIZE)
static const SSysDbTableSchema dnodesSchema[] = { static const SSysDbTableSchema dnodesSchema[] = {
{.name = "id", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_SMALLINT},
{.name = "endpoint", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "endpoint", .bytes = TSDB_EP_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, {.name = "vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
{.name = "support_vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT}, {.name = "support_vnodes", .bytes = 2, .type = TSDB_DATA_TYPE_SMALLINT},
...@@ -66,7 +66,7 @@ static const SSysDbTableSchema bnodesSchema[] = { ...@@ -66,7 +66,7 @@ static const SSysDbTableSchema bnodesSchema[] = {
}; };
static const SSysDbTableSchema clusterSchema[] = { static const SSysDbTableSchema clusterSchema[] = {
{.name = "id", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "id", .bytes = 4, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "name", .bytes = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR}, {.name = "name", .bytes = TSDB_CLUSTER_ID_LEN + VARSTR_HEADER_SIZE, .type = TSDB_DATA_TYPE_VARCHAR},
{.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP}, {.name = "create_time", .bytes = 8, .type = TSDB_DATA_TYPE_TIMESTAMP},
}; };
...@@ -97,7 +97,7 @@ static const SSysDbTableSchema userDBSchema[] = { ...@@ -97,7 +97,7 @@ static const SSysDbTableSchema userDBSchema[] = {
{.name = "wal_retention_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "wal_retention_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "wal_retention_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "wal_retention_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
{.name = "wal_roll_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT}, {.name = "wal_roll_period", .bytes = 4, .type = TSDB_DATA_TYPE_INT},
{.name = "wal_seg_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT}, {.name = "wal_segment_size", .bytes = 8, .type = TSDB_DATA_TYPE_BIGINT},
}; };
static const SSysDbTableSchema userFuncSchema[] = { static const SSysDbTableSchema userFuncSchema[] = {
...@@ -243,8 +243,8 @@ static const SSysTableMeta infosMeta[] = { ...@@ -243,8 +243,8 @@ static const SSysTableMeta infosMeta[] = {
{TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema)}, {TSDB_INS_TABLE_MNODES, mnodesSchema, tListLen(mnodesSchema)},
{TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema)}, {TSDB_INS_TABLE_MODULES, modulesSchema, tListLen(modulesSchema)},
{TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema)}, {TSDB_INS_TABLE_QNODES, qnodesSchema, tListLen(qnodesSchema)},
// {TSDB_INS_TABLE_SNODES, snodesSchema, tListLen(snodesSchema)}, // {TSDB_INS_TABLE_SNODES, snodesSchema, tListLen(snodesSchema)},
// {TSDB_INS_TABLE_BNODES, bnodesSchema, tListLen(bnodesSchema)}, // {TSDB_INS_TABLE_BNODES, bnodesSchema, tListLen(bnodesSchema)},
{TSDB_INS_TABLE_CLUSTER, clusterSchema, tListLen(clusterSchema)}, {TSDB_INS_TABLE_CLUSTER, clusterSchema, tListLen(clusterSchema)},
{TSDB_INS_TABLE_DATABASES, userDBSchema, tListLen(userDBSchema)}, {TSDB_INS_TABLE_DATABASES, userDBSchema, tListLen(userDBSchema)},
{TSDB_INS_TABLE_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)}, {TSDB_INS_TABLE_FUNCTIONS, userFuncSchema, tListLen(userFuncSchema)},
......
...@@ -604,11 +604,11 @@ typedef struct { ...@@ -604,11 +604,11 @@ typedef struct {
int64_t createTime; int64_t createTime;
int64_t updateTime; int64_t updateTime;
int32_t version; int32_t version;
int32_t totalLevel;
int64_t smaId; // 0 for unused int64_t smaId; // 0 for unused
// info // info
int64_t uid; int64_t uid;
int8_t status; int8_t status;
int8_t isDistributed;
// config // config
int8_t igExpired; int8_t igExpired;
int8_t trigger; int8_t trigger;
...@@ -647,7 +647,6 @@ typedef struct { ...@@ -647,7 +647,6 @@ typedef struct {
typedef struct { typedef struct {
int64_t uid; int64_t uid;
int64_t streamId; int64_t streamId;
int8_t isDistributed;
int8_t status; int8_t status;
int8_t stage; int8_t stage;
} SStreamRecoverObj; } SStreamRecoverObj;
......
...@@ -23,11 +23,11 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) { ...@@ -23,11 +23,11 @@ int32_t tEncodeSStreamObj(SEncoder *pEncoder, const SStreamObj *pObj) {
if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->createTime) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1; if (tEncodeI64(pEncoder, pObj->updateTime) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->version) < 0) return -1; if (tEncodeI32(pEncoder, pObj->version) < 0) return -1;
if (tEncodeI32(pEncoder, pObj->totalLevel) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1; if (tEncodeI64(pEncoder, pObj->smaId) < 0) return -1;
if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1; if (tEncodeI64(pEncoder, pObj->uid) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->status) < 0) return -1; if (tEncodeI8(pEncoder, pObj->status) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->isDistributed) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1; if (tEncodeI8(pEncoder, pObj->igExpired) < 0) return -1;
if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1; if (tEncodeI8(pEncoder, pObj->trigger) < 0) return -1;
...@@ -69,11 +69,11 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) { ...@@ -69,11 +69,11 @@ int32_t tDecodeSStreamObj(SDecoder *pDecoder, SStreamObj *pObj) {
if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->createTime) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->updateTime) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1; if (tDecodeI32(pDecoder, &pObj->version) < 0) return -1;
if (tDecodeI32(pDecoder, &pObj->totalLevel) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->smaId) < 0) return -1;
if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1; if (tDecodeI64(pDecoder, &pObj->uid) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->status) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->isDistributed) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->igExpired) < 0) return -1;
if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1; if (tDecodeI8(pDecoder, &pObj->trigger) < 0) return -1;
......
...@@ -307,10 +307,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -307,10 +307,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
terrno = TSDB_CODE_QRY_INVALID_INPUT; terrno = TSDB_CODE_QRY_INVALID_INPUT;
return -1; return -1;
} }
int32_t totLevel = LIST_LENGTH(pPlan->pSubplans); int32_t planTotLevel = LIST_LENGTH(pPlan->pSubplans);
ASSERT(totLevel <= 2); ASSERT(planTotLevel <= 2);
pStream->tasks = taosArrayInit(totLevel, sizeof(void*)); pStream->tasks = taosArrayInit(planTotLevel, sizeof(void*));
pStream->isDistributed = totLevel == 2;
bool hasExtraSink = false; bool hasExtraSink = false;
bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0; bool externalTargetDB = strcmp(pStream->sourceDb, pStream->targetDb) != 0;
...@@ -320,7 +319,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -320,7 +319,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
bool multiTarget = pDbObj->cfg.numOfVgroups > 1; bool multiTarget = pDbObj->cfg.numOfVgroups > 1;
if (totLevel == 2 || externalTargetDB || multiTarget) { if (planTotLevel == 2 || externalTargetDB || multiTarget) {
/*if (true) {*/ /*if (true) {*/
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel); taosArrayPush(pStream->tasks, &taskOneLevel);
...@@ -338,8 +337,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -338,8 +337,9 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
} }
} }
} }
pStream->totalLevel = planTotLevel + hasExtraSink;
if (totLevel > 1) { if (planTotLevel > 1) {
SStreamTask* pInnerTask; SStreamTask* pInnerTask;
// inner level // inner level
{ {
...@@ -371,13 +371,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -371,13 +371,6 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
return -1; return -1;
} }
#if 0
SDbObj* pSourceDb = mndAcquireDb(pMnode, pStream->sourceDb);
ASSERT(pDbObj != NULL);
sdbRelease(pSdb, pSourceDb);
pInnerTask->numOfVgroups = pSourceDb->cfg.numOfVgroups;
#endif
if (tsSchedStreamToSnode) { if (tsSchedStreamToSnode) {
SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode); SSnodeObj* pSnode = mndSchedFetchOneSnode(pMnode);
if (pSnode == NULL) { if (pSnode == NULL) {
...@@ -464,7 +457,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) { ...@@ -464,7 +457,7 @@ int32_t mndScheduleStream(SMnode* pMnode, SStreamObj* pStream) {
} }
} }
if (totLevel == 1) { if (planTotLevel == 1) {
SArray* taskOneLevel = taosArrayInit(0, sizeof(void*)); SArray* taskOneLevel = taosArrayInit(0, sizeof(void*));
taosArrayPush(pStream->tasks, &taskOneLevel); taosArrayPush(pStream->tasks, &taskOneLevel);
......
...@@ -36,7 +36,7 @@ static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream); ...@@ -36,7 +36,7 @@ static int32_t mndStreamActionDelete(SSdb *pSdb, SStreamObj *pStream);
static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream); static int32_t mndStreamActionUpdate(SSdb *pSdb, SStreamObj *pStream, SStreamObj *pNewStream);
static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq); static int32_t mndProcessCreateStreamReq(SRpcMsg *pReq);
static int32_t mndProcessDropStreamReq(SRpcMsg *pReq); static int32_t mndProcessDropStreamReq(SRpcMsg *pReq);
static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq); /*static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq);*/
static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq); static int32_t mndProcessStreamMetaReq(SRpcMsg *pReq);
static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta); static int32_t mndGetStreamMeta(SRpcMsg *pReq, SShowObj *pShow, STableMetaRsp *pMeta);
static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows); static int32_t mndRetrieveStream(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
...@@ -55,7 +55,7 @@ int32_t mndInitStream(SMnode *pMnode) { ...@@ -55,7 +55,7 @@ int32_t mndInitStream(SMnode *pMnode) {
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_CREATE_STREAM, mndProcessCreateStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq); mndSetMsgHandle(pMnode, TDMT_MND_DROP_STREAM, mndProcessDropStreamReq);
mndSetMsgHandle(pMnode, TDMT_MND_RECOVER_STREAM, mndProcessRecoverStreamReq); /*mndSetMsgHandle(pMnode, TDMT_MND_RECOVER_STREAM, mndProcessRecoverStreamReq);*/
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DEPLOY_RSP, mndTransProcessRsp);
mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp); mndSetMsgHandle(pMnode, TDMT_STREAM_TASK_DROP_RSP, mndTransProcessRsp);
...@@ -540,6 +540,7 @@ static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) { ...@@ -540,6 +540,7 @@ static int32_t mndPersistTaskRecoverReq(STrans *pTrans, SStreamTask *pTask) {
return 0; return 0;
} }
#if 0
int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
if (pStream->isDistributed) { if (pStream->isDistributed) {
int32_t lv = taosArrayGetSize(pStream->tasks); int32_t lv = taosArrayGetSize(pStream->tasks);
...@@ -573,6 +574,7 @@ int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea ...@@ -573,6 +574,7 @@ int32_t mndRecoverStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStrea
} }
return 0; return 0;
} }
#endif
int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) { int32_t mndDropStreamTasks(SMnode *pMnode, STrans *pTrans, SStreamObj *pStream) {
int32_t lv = taosArrayGetSize(pStream->tasks); int32_t lv = taosArrayGetSize(pStream->tasks);
...@@ -755,6 +757,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) { ...@@ -755,6 +757,7 @@ static int32_t mndProcessDropStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
#if 0
static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) { static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node; SMnode *pMnode = pReq->info.node;
SStreamObj *pStream = NULL; SStreamObj *pStream = NULL;
...@@ -817,6 +820,7 @@ static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) { ...@@ -817,6 +820,7 @@ static int32_t mndProcessRecoverStreamReq(SRpcMsg *pReq) {
return TSDB_CODE_ACTION_IN_PROGRESS; return TSDB_CODE_ACTION_IN_PROGRESS;
} }
#endif
int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) { int32_t mndDropStreamByDb(SMnode *pMnode, STrans *pTrans, SDbObj *pDb) {
SSdb *pSdb = pMnode->pSdb; SSdb *pSdb = pMnode->pSdb;
......
...@@ -117,10 +117,9 @@ typedef struct { ...@@ -117,10 +117,9 @@ typedef struct {
struct STQ { struct STQ {
SVnode* pVnode; SVnode* pVnode;
char* path; char* path;
SHashObj* pushMgr; // consumerId -> STqHandle* SHashObj* pushMgr; // consumerId -> STqHandle*
SHashObj* handles; // subKey -> STqHandle SHashObj* handles; // subKey -> STqHandle
SHashObj* pStreamTasks; // taksId -> SStreamTask SHashObj* pAlterInfo; // topic -> SAlterCheckInfo
SHashObj* pAlterInfo; // topic -> SAlterCheckInfo
STqOffsetStore* pOffsetStore; STqOffsetStore* pOffsetStore;
...@@ -129,9 +128,7 @@ struct STQ { ...@@ -129,9 +128,7 @@ struct STQ {
TTB* pAlterInfoStore; TTB* pAlterInfoStore;
TDB* pStreamStore; SStreamMeta* pStreamMeta;
TTB* pTaskDb;
TTB* pTaskState;
}; };
typedef struct { typedef struct {
...@@ -188,6 +185,9 @@ static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ve ...@@ -188,6 +185,9 @@ static FORCE_INLINE void tqOffsetResetToLog(STqOffsetVal* pOffsetVal, int64_t ve
pOffsetVal->version = ver; pOffsetVal->version = ver;
} }
// tqStream
int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask);
#ifdef __cplusplus #ifdef __cplusplus
} }
#endif #endif
......
...@@ -62,8 +62,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ...@@ -62,8 +62,6 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->handles = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
pTq->pStreamTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK); pTq->pushMgr = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_BIGINT), true, HASH_ENTRY_LOCK);
pTq->pAlterInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK); pTq->pAlterInfo = taosHashInit(64, MurmurHash3_32, true, HASH_ENTRY_LOCK);
...@@ -76,6 +74,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ...@@ -76,6 +74,11 @@ STQ* tqOpen(const char* path, SVnode* pVnode) {
ASSERT(0); ASSERT(0);
} }
pTq->pStreamMeta = streamMetaOpen(path, pTq, (FTaskExpand*)tqExpandTask);
if (pTq->pStreamMeta == NULL) {
ASSERT(0);
}
return pTq; return pTq;
} }
...@@ -83,18 +86,11 @@ void tqClose(STQ* pTq) { ...@@ -83,18 +86,11 @@ void tqClose(STQ* pTq) {
if (pTq) { if (pTq) {
tqOffsetClose(pTq->pOffsetStore); tqOffsetClose(pTq->pOffsetStore);
taosHashCleanup(pTq->handles); taosHashCleanup(pTq->handles);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
tFreeSStreamTask(pTask);
}
taosHashCleanup(pTq->pStreamTasks);
taosHashCleanup(pTq->pushMgr); taosHashCleanup(pTq->pushMgr);
taosHashCleanup(pTq->pAlterInfo); taosHashCleanup(pTq->pAlterInfo);
taosMemoryFree(pTq->path); taosMemoryFree(pTq->path);
tqMetaClose(pTq); tqMetaClose(pTq);
streamMetaClose(pTq->pStreamMeta);
taosMemoryFree(pTq); taosMemoryFree(pTq);
} }
} }
...@@ -672,6 +668,9 @@ FAIL: ...@@ -672,6 +668,9 @@ FAIL:
} }
int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
//
return streamMetaAddSerializedTask(pTq->pStreamMeta, msg, msgLen);
#if 0
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask)); SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) { if (pTask == NULL) {
return -1; return -1;
...@@ -695,6 +694,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) { ...@@ -695,6 +694,7 @@ int32_t tqProcessTaskDeployReq(STQ* pTq, char* msg, int32_t msgLen) {
FAIL: FAIL:
if (pTask) taosMemoryFree(pTask); if (pTask) taosMemoryFree(pTask);
return -1; return -1;
#endif
} }
int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) { int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
...@@ -710,7 +710,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) { ...@@ -710,7 +710,7 @@ int32_t tqProcessStreamTrigger(STQ* pTq, SSubmitReq* pReq, int64_t ver) {
} }
while (1) { while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter); pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue; if (pTask->taskLevel != TASK_LEVEL__SOURCE) continue;
...@@ -744,9 +744,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -744,9 +744,9 @@ int32_t tqProcessTaskRunReq(STQ* pTq, SRpcMsg* pMsg) {
// //
SStreamTaskRunReq* pReq = pMsg->pCont; SStreamTaskRunReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId; int32_t taskId = pReq->taskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (ppTask) { if (pTask) {
streamProcessRunReq(*ppTask); streamProcessRunReq(pTask);
return 0; return 0;
} else { } else {
return -1; return -1;
...@@ -762,14 +762,15 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { ...@@ -762,14 +762,15 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen); tDecoderInit(&decoder, (uint8_t*)msgBody, msgLen);
tDecodeStreamDispatchReq(&decoder, &req); tDecodeStreamDispatchReq(&decoder, &req);
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) { SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (pTask) {
SRpcMsg rsp = { SRpcMsg rsp = {
.info = pMsg->info, .info = pMsg->info,
.code = 0, .code = 0,
}; };
streamProcessDispatchReq(*ppTask, &req, &rsp, exec); streamProcessDispatchReq(pTask, &req, &rsp, exec);
return 0; return 0;
} else { } else {
return -1; return -1;
...@@ -779,9 +780,9 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) { ...@@ -779,9 +780,9 @@ int32_t tqProcessTaskDispatchReq(STQ* pTq, SRpcMsg* pMsg, bool exec) {
int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverReq* pReq = pMsg->pCont; SStreamTaskRecoverReq* pReq = pMsg->pCont;
int32_t taskId = pReq->taskId; int32_t taskId = pReq->taskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (ppTask) { if (pTask) {
streamProcessRecoverReq(*ppTask, pReq, pMsg); streamProcessRecoverReq(pTask, pReq, pMsg);
return 0; return 0;
} else { } else {
return -1; return -1;
...@@ -791,9 +792,9 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -791,9 +792,9 @@ int32_t tqProcessTaskRecoverReq(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead)); SStreamDispatchRsp* pRsp = POINTER_SHIFT(pMsg->pCont, sizeof(SMsgHead));
int32_t taskId = pRsp->taskId; int32_t taskId = pRsp->taskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (ppTask) { if (pTask) {
streamProcessDispatchRsp(*ppTask, pRsp); streamProcessDispatchRsp(pTask, pRsp);
return 0; return 0;
} else { } else {
return -1; return -1;
...@@ -803,9 +804,10 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -803,9 +804,10 @@ int32_t tqProcessTaskDispatchRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
SStreamTaskRecoverRsp* pRsp = pMsg->pCont; SStreamTaskRecoverRsp* pRsp = pMsg->pCont;
int32_t taskId = pRsp->rspTaskId; int32_t taskId = pRsp->rspTaskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) { SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
streamProcessRecoverRsp(*ppTask, pRsp); if (pTask) {
streamProcessRecoverRsp(pTask, pRsp);
return 0; return 0;
} else { } else {
return -1; return -1;
...@@ -815,18 +817,7 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) { ...@@ -815,18 +817,7 @@ int32_t tqProcessTaskRecoverRsp(STQ* pTq, SRpcMsg* pMsg) {
int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) { int32_t tqProcessTaskDropReq(STQ* pTq, char* msg, int32_t msgLen) {
SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg; SVDropStreamTaskReq* pReq = (SVDropStreamTaskReq*)msg;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t)); return streamMetaRemoveTask(pTq->pStreamMeta, pReq->taskId);
if (ppTask) {
SStreamTask* pTask = *ppTask;
taosHashRemove(pTq->pStreamTasks, &pReq->taskId, sizeof(int32_t));
atomic_store_8(&pTask->taskStatus, TASK_STATUS__DROPPING);
}
// todo
// clear queue
// push drop req into queue
// launch exec to free memory
// remove from hash
return 0;
} }
int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
...@@ -837,18 +828,18 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) { ...@@ -837,18 +828,18 @@ int32_t tqProcessTaskRetrieveReq(STQ* pTq, SRpcMsg* pMsg) {
SDecoder decoder; SDecoder decoder;
tDecoderInit(&decoder, msgBody, msgLen); tDecoderInit(&decoder, msgBody, msgLen);
tDecodeStreamRetrieveReq(&decoder, &req); tDecodeStreamRetrieveReq(&decoder, &req);
int32_t taskId = req.dstTaskId; int32_t taskId = req.dstTaskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t)); SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (ppTask) { if (pTask) {
SRpcMsg rsp = { SRpcMsg rsp = {
.info = pMsg->info, .info = pMsg->info,
.code = 0, .code = 0,
}; };
streamProcessRetrieveReq(*ppTask, &req, &rsp); streamProcessRetrieveReq(pTask, &req, &rsp);
return 0;
} else { } else {
return -1; return -1;
} }
return 0;
} }
int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) { int32_t tqProcessTaskRetrieveRsp(STQ* pTq, SRpcMsg* pMsg) {
...@@ -871,16 +862,18 @@ void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) { ...@@ -871,16 +862,18 @@ void vnodeEnqueueStreamMsg(SVnode* pVnode, SRpcMsg* pMsg) {
goto FAIL; goto FAIL;
} }
int32_t taskId = req.taskId; int32_t taskId = req.taskId;
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pTq->pStreamTasks, &taskId, sizeof(int32_t));
if (ppTask) { SStreamTask* pTask = streamMetaGetTask(pTq->pStreamMeta, taskId);
if (pTask) {
SRpcMsg rsp = { SRpcMsg rsp = {
.info = pMsg->info, .info = pMsg->info,
.code = 0, .code = 0,
}; };
streamProcessDispatchReq(*ppTask, &req, &rsp, false); streamProcessDispatchReq(pTask, &req, &rsp, false);
return; return;
} }
FAIL: FAIL:
if (pMsg->info.handle == NULL) return; if (pMsg->info.handle == NULL) return;
SRpcMsg rsp = { SRpcMsg rsp = {
......
...@@ -215,7 +215,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver) ...@@ -215,7 +215,7 @@ int tqPushMsg(STQ* pTq, void* msg, int32_t msgLen, tmsg_t msgType, int64_t ver)
walApplyVer(pTq->pVnode->pWal, ver); walApplyVer(pTq->pVnode->pWal, ver);
if (msgType == TDMT_VND_SUBMIT) { if (msgType == TDMT_VND_SUBMIT) {
if (taosHashGetSize(pTq->pStreamTasks) == 0) return 0; if (taosHashGetSize(pTq->pStreamMeta->pTasks) == 0) return 0;
void* data = taosMemoryMalloc(msgLen); void* data = taosMemoryMalloc(msgLen);
if (data == NULL) { if (data == NULL) {
......
...@@ -413,7 +413,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) { ...@@ -413,7 +413,7 @@ int32_t tqUpdateTbUidList(STQ* pTq, const SArray* tbUidList, bool isAdd) {
} }
} }
while (1) { while (1) {
pIter = taosHashIterate(pTq->pStreamTasks, pIter); pIter = taosHashIterate(pTq->pStreamMeta->pTasks, pIter);
if (pIter == NULL) break; if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter; SStreamTask* pTask = *(SStreamTask**)pIter;
if (pTask->taskLevel == TASK_LEVEL__SOURCE) { if (pTask->taskLevel == TASK_LEVEL__SOURCE) {
......
...@@ -115,12 +115,19 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) { ...@@ -115,12 +115,19 @@ static int32_t tsdbSnapReadData(STsdbSnapReader* pReader, uint8_t** ppData) {
TSDBROW row = tsdbRowFromBlockData(&pReader->oBlockData, iRow); TSDBROW row = tsdbRowFromBlockData(&pReader->oBlockData, iRow);
int64_t version = TSDBROW_VERSION(&row); int64_t version = TSDBROW_VERSION(&row);
tsdbTrace("vgId:%d, vnode snapshot tsdb read for %s, %" PRId64 "(%" PRId64 " , %" PRId64 ")",
TD_VID(pReader->pTsdb->pVnode), pReader->pTsdb->path, version, pReader->sver, pReader->ever);
if (version < pReader->sver || version > pReader->ever) continue; if (version < pReader->sver || version > pReader->ever) continue;
code = tBlockDataAppendRow(&pReader->nBlockData, &row, NULL); code = tBlockDataAppendRow(&pReader->nBlockData, &row, NULL);
if (code) goto _err; if (code) goto _err;
} }
if (pReader->nBlockData.nRow <= 0) {
continue;
}
// org data // org data
// compress data (todo) // compress data (todo)
int32_t size = sizeof(TABLEID) + tPutBlockData(NULL, &pReader->nBlockData); int32_t size = sizeof(TABLEID) + tPutBlockData(NULL, &pReader->nBlockData);
...@@ -808,7 +815,8 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) { ...@@ -808,7 +815,8 @@ static int32_t tsdbSnapWriteTableData(STsdbSnapWriter* pWriter, TABLEID id) {
if (code) goto _err; if (code) goto _err;
_exit: _exit:
tsdbDebug("vgId:%d, vnode snapshot tsdb write data impl for %s", TD_VID(pWriter->pTsdb->pVnode), pWriter->pTsdb->path); tsdbDebug("vgId:%d, vnode snapshot tsdb write data impl for %s", TD_VID(pWriter->pTsdb->pVnode),
pWriter->pTsdb->path);
return code; return code;
_err: _err:
......
...@@ -722,10 +722,12 @@ void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); } ...@@ -722,10 +722,12 @@ void vnodeSyncClose(SVnode *pVnode) { syncStop(pVnode->sync); }
bool vnodeIsLeader(SVnode *pVnode) { bool vnodeIsLeader(SVnode *pVnode) {
if (!syncIsReady(pVnode->sync)) { if (!syncIsReady(pVnode->sync)) {
vDebug("vgId:%d, vnode not ready", pVnode->config.vgId);
return false; return false;
} }
if (!pVnode->restored) { if (!pVnode->restored) {
vDebug("vgId:%d, vnode not restored", pVnode->config.vgId);
terrno = TSDB_CODE_APP_NOT_READY; terrno = TSDB_CODE_APP_NOT_READY;
return false; return false;
} }
......
...@@ -467,6 +467,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT ...@@ -467,6 +467,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
if (NULL == taosArrayPush(newBatch.pMsgs, &req)) { if (NULL == taosArrayPush(newBatch.pMsgs, &req)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
msg = NULL;
if (NULL == taosArrayPush(newBatch.pTaskIds, &pTask->taskId)) { if (NULL == taosArrayPush(newBatch.pTaskIds, &pTask->taskId)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
...@@ -517,6 +518,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT ...@@ -517,6 +518,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
if (NULL == taosArrayPush(pBatch->pMsgs, &req)) { if (NULL == taosArrayPush(pBatch->pMsgs, &req)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
msg = NULL;
if (NULL == taosArrayPush(pBatch->pTaskIds, &pTask->taskId)) { if (NULL == taosArrayPush(pBatch->pTaskIds, &pTask->taskId)) {
CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY); CTG_ERR_JRET(TSDB_CODE_OUT_OF_MEMORY);
} }
...@@ -545,7 +547,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT ...@@ -545,7 +547,7 @@ int32_t ctgAddBatch(SCatalog* pCtg, int32_t vgId, SRequestConnInfo* pConn, SCtgT
CTG_ERR_JRET(TSDB_CODE_APP_ERROR); CTG_ERR_JRET(TSDB_CODE_APP_ERROR);
} }
tNameGetFullDbName(pName, newBatch.dbFName); tNameGetFullDbName(pName, pBatch->dbFName);
} }
ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId, ctgDebug("task %d %s req added to batch %d, target vgId %d", pTask->taskId, TMSG_INFO(msgType), pBatch->batchId,
......
...@@ -438,6 +438,14 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) { ...@@ -438,6 +438,14 @@ void ctgFreeMsgCtx(SCtgMsgCtx* pCtx) {
} }
} }
void ctgFreeTbMetasMsgCtx(SCtgMsgCtx* pCtx) {
ctgFreeMsgCtx(pCtx);
if (pCtx->lastOut) {
ctgFreeSTableMetaOutput((STableMetaOutput*)pCtx->lastOut);
pCtx->lastOut = NULL;
}
}
void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput) { void ctgFreeSTableMetaOutput(STableMetaOutput* pOutput) {
if (NULL == pOutput) { if (NULL == pOutput) {
return; return;
...@@ -641,7 +649,7 @@ void ctgFreeTaskCtx(SCtgTask* pTask) { ...@@ -641,7 +649,7 @@ void ctgFreeTaskCtx(SCtgTask* pTask) {
taosArrayDestroy(taskCtx->pFetchs); taosArrayDestroy(taskCtx->pFetchs);
// NO NEED TO FREE pNames // NO NEED TO FREE pNames
taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeMsgCtx); taosArrayDestroyEx(pTask->msgCtxs, (FDelete)ctgFreeTbMetasMsgCtx);
if (pTask->msgCtx.lastOut) { if (pTask->msgCtx.lastOut) {
ctgFreeSTableMetaOutput((STableMetaOutput*)pTask->msgCtx.lastOut); ctgFreeSTableMetaOutput((STableMetaOutput*)pTask->msgCtx.lastOut);
......
...@@ -2515,7 +2515,7 @@ static bool isPartitionByTbname(SNodeList* pPartitionByList) { ...@@ -2515,7 +2515,7 @@ static bool isPartitionByTbname(SNodeList* pPartitionByList) {
return false; return false;
} }
SNode* pPartKey = nodesListGetNode(pPartitionByList, 0); SNode* pPartKey = nodesListGetNode(pPartitionByList, 0);
return QUERY_NODE_FUNCTION != nodeType(pPartKey) || FUNCTION_TYPE_TBNAME != ((SFunctionNode*)pPartKey)->funcType; return QUERY_NODE_FUNCTION == nodeType(pPartKey) && FUNCTION_TYPE_TBNAME == ((SFunctionNode*)pPartKey)->funcType;
} }
static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* pSelect) { static int32_t checkStateWindowForStream(STranslateContext* pCxt, SSelectStmt* pSelect) {
...@@ -2566,7 +2566,6 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) { ...@@ -2566,7 +2566,6 @@ static int32_t translateWindow(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (NULL == pSelect->pWindow) { if (NULL == pSelect->pWindow) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pSelect->isTimeLineResult = true;
pCxt->currClause = SQL_CLAUSE_WINDOW; pCxt->currClause = SQL_CLAUSE_WINDOW;
int32_t code = translateExpr(pCxt, &pSelect->pWindow); int32_t code = translateExpr(pCxt, &pSelect->pWindow);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
...@@ -2637,7 +2636,6 @@ static int32_t translatePartitionBy(STranslateContext* pCxt, SSelectStmt* pSelec ...@@ -2637,7 +2636,6 @@ static int32_t translatePartitionBy(STranslateContext* pCxt, SSelectStmt* pSelec
if (NULL == pSelect->pPartitionByList) { if (NULL == pSelect->pPartitionByList) {
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
pSelect->isTimeLineResult = false;
pCxt->currClause = SQL_CLAUSE_PARTITION_BY; pCxt->currClause = SQL_CLAUSE_PARTITION_BY;
return translateExprList(pCxt, pSelect->pPartitionByList); return translateExprList(pCxt, pSelect->pPartitionByList);
} }
...@@ -4708,6 +4706,12 @@ static int32_t translateKillTransaction(STranslateContext* pCxt, SKillStmt* pStm ...@@ -4708,6 +4706,12 @@ static int32_t translateKillTransaction(STranslateContext* pCxt, SKillStmt* pStm
return buildCmdMsg(pCxt, TDMT_MND_KILL_TRANS, (FSerializeFunc)tSerializeSKillTransReq, &killReq); return buildCmdMsg(pCxt, TDMT_MND_KILL_TRANS, (FSerializeFunc)tSerializeSKillTransReq, &killReq);
} }
static bool crossTableWithoutAggOper(SSelectStmt* pSelect) {
return NULL == pSelect->pWindow && !pSelect->hasAggFuncs && !pSelect->hasIndefiniteRowsFunc &&
!pSelect->hasInterpFunc && TSDB_SUPER_TABLE == ((SRealTableNode*)pSelect->pFromTable)->pMeta->tableType &&
!isPartitionByTbname(pSelect->pPartitionByList);
}
static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) { static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pStmt) {
if (NULL != pStmt->pOptions->pWatermark && if (NULL != pStmt->pOptions->pWatermark &&
(DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark))) { (DEAL_RES_ERROR == translateValue(pCxt, (SValueNode*)pStmt->pOptions->pWatermark))) {
...@@ -4723,14 +4727,12 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt ...@@ -4723,14 +4727,12 @@ static int32_t checkCreateStream(STranslateContext* pCxt, SCreateStreamStmt* pSt
return TSDB_CODE_SUCCESS; return TSDB_CODE_SUCCESS;
} }
if (QUERY_NODE_SELECT_STMT == nodeType(pStmt->pQuery)) { if (QUERY_NODE_SELECT_STMT != nodeType(pStmt->pQuery) ||
SSelectStmt* pSelect = (SSelectStmt*)pStmt->pQuery; QUERY_NODE_REAL_TABLE != nodeType(((SSelectStmt*)pStmt->pQuery)->pFromTable)) {
if (QUERY_NODE_REAL_TABLE == nodeType(pSelect->pFromTable)) { return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
return TSDB_CODE_SUCCESS;
}
} }
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query"); return TSDB_CODE_SUCCESS;
} }
static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) { static void getSourceDatabase(SNode* pStmt, int32_t acctId, char* pDbFName) {
...@@ -4759,12 +4761,23 @@ static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) { ...@@ -4759,12 +4761,23 @@ static int32_t addWstartTsToCreateStreamQuery(SNode* pStmt) {
return code; return code;
} }
static int32_t checkStreamQuery(STranslateContext* pCxt, SSelectStmt* pSelect) {
if (TSDB_DATA_TYPE_TIMESTAMP != ((SExprNode*)nodesListGetNode(pSelect->pProjectionList, 0))->resType.type ||
!pSelect->isTimeLineResult || crossTableWithoutAggOper(pSelect)) {
return generateSyntaxErrMsgExt(&pCxt->msgBuf, TSDB_CODE_PAR_INVALID_STREAM_QUERY, "Unsupported stream query");
}
return TSDB_CODE_SUCCESS;
}
static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SNode* pStmt, SCMCreateStreamReq* pReq) { static int32_t buildCreateStreamQuery(STranslateContext* pCxt, SNode* pStmt, SCMCreateStreamReq* pReq) {
pCxt->createStream = true; pCxt->createStream = true;
int32_t code = addWstartTsToCreateStreamQuery(pStmt); int32_t code = addWstartTsToCreateStreamQuery(pStmt);
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
code = translateQuery(pCxt, pStmt); code = translateQuery(pCxt, pStmt);
} }
if (TSDB_CODE_SUCCESS == code) {
code = checkStreamQuery(pCxt, (SSelectStmt*)pStmt);
}
if (TSDB_CODE_SUCCESS == code) { if (TSDB_CODE_SUCCESS == code) {
getSourceDatabase(pStmt, pCxt->pParseCxt->acctId, pReq->sourceDB); getSourceDatabase(pStmt, pCxt->pParseCxt->acctId, pReq->sourceDB);
code = nodesNodeToString(pStmt, false, &pReq->ast, NULL); code = nodesNodeToString(pStmt, false, &pReq->ast, NULL);
......
...@@ -268,7 +268,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) { ...@@ -268,7 +268,7 @@ static bool stbSplNeedSplit(bool streamQuery, SLogicNode* pNode) {
case QUERY_NODE_LOGIC_PLAN_JOIN: case QUERY_NODE_LOGIC_PLAN_JOIN:
return stbSplNeedSplitJoin(streamQuery, (SJoinLogicNode*)pNode); return stbSplNeedSplitJoin(streamQuery, (SJoinLogicNode*)pNode);
case QUERY_NODE_LOGIC_PLAN_PARTITION: case QUERY_NODE_LOGIC_PLAN_PARTITION:
return stbSplIsMultiTbScanChild(streamQuery, pNode); return streamQuery ? false : stbSplIsMultiTbScanChild(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_AGG: case QUERY_NODE_LOGIC_PLAN_AGG:
return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode); return !stbSplHasGatherExecFunc(((SAggLogicNode*)pNode)->pAggFuncs) && stbSplHasMultiTbScan(streamQuery, pNode);
case QUERY_NODE_LOGIC_PLAN_WINDOW: case QUERY_NODE_LOGIC_PLAN_WINDOW:
......
...@@ -8,7 +8,8 @@ target_include_directories( ...@@ -8,7 +8,8 @@ target_include_directories(
target_link_libraries( target_link_libraries(
stream stream
PRIVATE os util transport qcom executor tdb PUBLIC tdb
PRIVATE os util transport qcom executor
) )
if(${BUILD_TEST}) if(${BUILD_TEST})
......
...@@ -14,22 +14,8 @@ ...@@ -14,22 +14,8 @@
*/ */
#include "executor.h" #include "executor.h"
#include "tdbInt.h"
#include "tstream.h" #include "tstream.h"
typedef int32_t FTaskExpand(void* ahandle, SStreamTask* pTask);
typedef struct SStreamMeta {
char* path;
TDB* db;
TTB* pTaskDb;
TTB* pStateDb;
SHashObj* pTasks;
void* ahandle;
TXN txn;
FTaskExpand* expandFunc;
} SStreamMeta;
SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) { SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) {
SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta)); SStreamMeta* pMeta = taosMemoryCalloc(1, sizeof(SStreamMeta));
if (pMeta == NULL) { if (pMeta == NULL) {
...@@ -50,8 +36,18 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF ...@@ -50,8 +36,18 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF
goto _err; goto _err;
} }
pMeta->pTasks = taosHashInit(64, taosGetDefaultHashFunction(TSDB_DATA_TYPE_INT), true, HASH_NO_LOCK);
if (pMeta->pTasks == NULL) {
goto _err;
}
if (streamMetaBegin(pMeta) < 0) {
goto _err;
}
pMeta->ahandle = ahandle; pMeta->ahandle = ahandle;
pMeta->expandFunc = expandFunc; pMeta->expandFunc = expandFunc;
return pMeta; return pMeta;
_err: _err:
return NULL; return NULL;
...@@ -62,6 +58,48 @@ void streamMetaClose(SStreamMeta* pMeta) { ...@@ -62,6 +58,48 @@ void streamMetaClose(SStreamMeta* pMeta) {
tdbTbClose(pMeta->pTaskDb); tdbTbClose(pMeta->pTaskDb);
tdbTbClose(pMeta->pStateDb); tdbTbClose(pMeta->pStateDb);
tdbClose(pMeta->db); tdbClose(pMeta->db);
void* pIter = NULL;
while (1) {
pIter = taosHashIterate(pMeta->pTasks, pIter);
if (pIter == NULL) break;
SStreamTask* pTask = *(SStreamTask**)pIter;
tFreeSStreamTask(pTask);
}
taosHashCleanup(pMeta->pTasks);
taosMemoryFree(pMeta->path);
taosMemoryFree(pMeta);
}
int32_t streamMetaAddSerializedTask(SStreamMeta* pMeta, char* msg, int32_t msgLen) {
SStreamTask* pTask = taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
return -1;
}
SDecoder decoder;
tDecoderInit(&decoder, (uint8_t*)msg, msgLen);
if (tDecodeSStreamTask(&decoder, pTask) < 0) {
ASSERT(0);
goto FAIL;
}
tDecoderClear(&decoder);
if (pMeta->expandFunc(pMeta->ahandle, pTask) < 0) {
ASSERT(0);
goto FAIL;
}
taosHashPut(pMeta->pTasks, &pTask->taskId, sizeof(int32_t), &pTask, sizeof(void*));
if (tdbTbUpsert(pMeta->pTaskDb, &pTask->taskId, sizeof(int32_t), msg, msgLen, &pMeta->txn) < 0) {
ASSERT(0);
return -1;
}
return 0;
FAIL:
if (pTask) taosMemoryFree(pTask);
return -1;
} }
int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) { int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
...@@ -94,6 +132,16 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) { ...@@ -94,6 +132,16 @@ int32_t streamMetaAddTask(SStreamMeta* pMeta, SStreamTask* pTask) {
return 0; return 0;
} }
SStreamTask* streamMetaGetTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) {
ASSERT((*ppTask)->taskId == taskId);
return *ppTask;
} else {
return NULL;
}
}
int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) { int32_t streamMetaRemoveTask(SStreamMeta* pMeta, int32_t taskId) {
SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t)); SStreamTask** ppTask = (SStreamTask**)taosHashGet(pMeta->pTasks, &taskId, sizeof(int32_t));
if (ppTask) { if (ppTask) {
...@@ -150,7 +198,7 @@ int32_t streamMetaAbort(SStreamMeta* pMeta) { ...@@ -150,7 +198,7 @@ int32_t streamMetaAbort(SStreamMeta* pMeta) {
return 0; return 0;
} }
int32_t streamRestoreTask(SStreamMeta* pMeta) { int32_t streamLoadTasks(SStreamMeta* pMeta) {
TBC* pCur = NULL; TBC* pCur = NULL;
if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) { if (tdbTbcOpen(pMeta->pTaskDb, &pCur, NULL) < 0) {
ASSERT(0); ASSERT(0);
......
...@@ -87,53 +87,80 @@ int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp ...@@ -87,53 +87,80 @@ int32_t tDecodeSMStreamTaskRecoverRsp(SDecoder* pDecoder, SMStreamTaskRecoverRsp
return 0; return 0;
} }
int32_t streamProcessFailRecoverReq(SStreamTask* pTask, SMStreamTaskRecoverReq* pReq, SRpcMsg* pRsp) { typedef struct {
#if 0 int32_t vgId;
if (pTask->taskStatus != TASK_STATUS__FAIL) { int32_t childId;
return 0; int64_t ver;
} SStreamVgVerCheckpoint;
int32_t tEncodeSStreamVgVerCheckpoint(SEncoder* pEncoder, const SStreamVgVerCheckpoint* pCheckpoint) {
if (tEncodeI32(pEncoder, pCheckpoint->vgId) < 0) return -1;
if (tEncodeI32(pEncoder, pCheckpoint->childId) < 0) return -1;
if (tEncodeI64(pEncoder, pCheckpoint->ver) < 0) return -1;
return 0;
}
int32_t tDecodeSStreamVgVerCheckpoint(SDecoder* pDecoder, SStreamVgVerCheckpoint* pCheckpoint) {
if (tDecodeI32(pDecoder, &pCheckpoint->vgId) < 0) return -1;
if (tDecodeI32(pDecoder, &pCheckpoint->childId) < 0) return -1;
if (tDecodeI64(pDecoder, &pCheckpoint->ver) < 0) return -1;
return 0;
}
typedef struct {
int64_t streamId;
int64_t checkTs;
int64_t checkpointId;
int32_t taskId;
SArray* checkpointVer; // SArray<SStreamVgCheckpointVer>
} SStreamAggVerCheckpoint;
int32_t tEncodeSStreamAggVerCheckpoint(SEncoder* pEncoder, const SStreamAggVerCheckpoint* pCheckpoint) {
if (tEncodeI64(pEncoder, pCheckpoint->streamId) < 0) return -1;
if (tEncodeI64(pEncoder, pCheckpoint->checkTs) < 0) return -1;
if (tEncodeI64(pEncoder, pCheckpoint->checkpointId) < 0) return -1;
if (tEncodeI32(pEncoder, pCheckpoint->taskId) < 0) return -1;
int32_t sz = taosArrayGetSize(pCheckpoint->checkpointVer);
if (tEncodeI32(pEncoder, sz) < 0) return -1;
for (int32_t i = 0; i < sz; i++) {
SStreamVgVerCheckpoint* pOneVgCkpoint = taosArrayGet(pCheckpoint->checkpointVer, i);
if (tEncodeSStreamVgVerCheckpoint(pEncoder, pOneVgCkpoint) < 0) return -1;
} }
return 0;
}
if (pTask->isStreamDistributed) { int32_t tDecodeSStreamAggVerCheckpoint(SDecoder* pDecoder, SStreamAggVerCheckpoint* pCheckpoint) {
if (pTask->taskType == TASK_TYPE__SOURCE) { if (tDecodeI64(pDecoder, &pCheckpoint->streamId) < 0) return -1;
pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER; if (tDecodeI64(pDecoder, &pCheckpoint->checkTs) < 0) return -1;
} else if (pTask->taskType != TASK_TYPE__SINK) { if (tDecodeI64(pDecoder, &pCheckpoint->checkpointId) < 0) return -1;
pTask->taskStatus = TASK_STATUS__PREPARE_RECOVER; if (tDecodeI32(pDecoder, &pCheckpoint->taskId) < 0) return -1;
bool hasCheckpoint = false; int32_t sz;
int32_t childSz = taosArrayGetSize(pTask->childEpInfo); if (tDecodeI32(pDecoder, &sz) < 0) return -1;
for (int32_t i = 0; i < childSz; i++) { for (int32_t i = 0; i < sz; i++) {
SStreamChildEpInfo* pEpInfo = taosArrayGetP(pTask->childEpInfo, i); SStreamVgVerCheckpoint oneVgCheckpoint;
if (pEpInfo->checkpointVer == -1) { if (tDecodeSStreamVgVerCheckpoint(pDecoder, &oneVgCheckpoint) < 0) return -1;
hasCheckpoint = true; taosArrayPush(pCheckpoint->checkpointVer, &oneVgCheckpoint);
break;
}
}
if (hasCheckpoint) {
// load from checkpoint
} else {
// recover child
}
}
} else {
if (pTask->taskType == TASK_TYPE__SOURCE) {
if (pTask->checkpointVer != -1) {
// load from checkpoint
} else {
// reset stream query task info
// TODO get snapshot ver
pTask->recoverSnapVer = -1;
qStreamPrepareRecover(pTask->exec.executor, pTask->startVer, pTask->recoverSnapVer);
pTask->taskStatus = TASK_STATUS__RECOVERING;
}
}
} }
return 0;
}
if (pTask->taskStatus == TASK_STATUS__RECOVERING) { int32_t streamRecoverSinkLevel(SStreamMeta* pMeta, SStreamTask* pTask) {
if (streamPipelineExec(pTask, 100) < 0) { ASSERT(pTask->taskLevel == TASK_LEVEL__SINK);
// set fail // load status
return -1; void* pVal = NULL;
} int32_t vLen = 0;
if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) {
return -1;
} }
SDecoder decoder;
tDecoderInit(&decoder, pVal, vLen);
SStreamAggVerCheckpoint aggCheckpoint;
tDecodeSStreamAggVerCheckpoint(&decoder, &aggCheckpoint);
/*pTask->*/
return 0;
}
#endif int32_t streamRecoverTask(SStreamTask* pTask) {
//
return 0; return 0;
} }
...@@ -52,6 +52,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { ...@@ -52,6 +52,7 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
/*if (tStartEncode(pEncoder) < 0) return -1;*/ /*if (tStartEncode(pEncoder) < 0) return -1;*/
if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1; if (tEncodeI64(pEncoder, pTask->streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->totalLevel) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->taskLevel) < 0) return -1; if (tEncodeI8(pEncoder, pTask->taskLevel) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1; if (tEncodeI8(pEncoder, pTask->outputType) < 0) return -1;
if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1; if (tEncodeI16(pEncoder, pTask->dispatchMsgType) < 0) return -1;
...@@ -62,7 +63,6 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) { ...@@ -62,7 +63,6 @@ int32_t tEncodeSStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1; if (tEncodeI32(pEncoder, pTask->nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1; if (tEncodeSEpSet(pEncoder, &pTask->epSet) < 0) return -1;
/*if (tEncodeI32(pEncoder, pTask->numOfVgroups) < 0) return -1;*/
int32_t epSz = taosArrayGetSize(pTask->childEpInfo); int32_t epSz = taosArrayGetSize(pTask->childEpInfo);
if (tEncodeI32(pEncoder, epSz) < 0) return -1; if (tEncodeI32(pEncoder, epSz) < 0) return -1;
...@@ -101,6 +101,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -101,6 +101,7 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
/*if (tStartDecode(pDecoder) < 0) return -1;*/ /*if (tStartDecode(pDecoder) < 0) return -1;*/
if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1; if (tDecodeI64(pDecoder, &pTask->streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->totalLevel) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->taskLevel) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->taskLevel) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1; if (tDecodeI8(pDecoder, &pTask->outputType) < 0) return -1;
if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1; if (tDecodeI16(pDecoder, &pTask->dispatchMsgType) < 0) return -1;
...@@ -111,7 +112,6 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) { ...@@ -111,7 +112,6 @@ int32_t tDecodeSStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1; if (tDecodeI32(pDecoder, &pTask->nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1; if (tDecodeSEpSet(pDecoder, &pTask->epSet) < 0) return -1;
/*if (tDecodeI32(pDecoder, &pTask->numOfVgroups) < 0) return -1;*/
int32_t epSz; int32_t epSz;
if (tDecodeI32(pDecoder, &epSz) < 0) return -1; if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
......
...@@ -2668,7 +2668,22 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p ...@@ -2668,7 +2668,22 @@ int32_t syncDoLeaderTransfer(SSyncNode* ths, SRpcMsg* pRpcMsg, SSyncRaftEntry* p
syncNodeEventLog(ths, "I am not follower, can not do leader transfer"); syncNodeEventLog(ths, "I am not follower, can not do leader transfer");
return 0; return 0;
} }
syncNodeEventLog(ths, "do leader transfer");
if (!ths->restoreFinish) {
syncNodeEventLog(ths, "restore not finish, can not do leader transfer");
return 0;
}
if (ths->vgId > 1) {
syncNodeEventLog(ths, "I am vnode, can not do leader transfer");
return 0;
}
do {
char logBuf[128];
snprintf(logBuf, sizeof(logBuf), "do leader transfer, index:%ld", pEntry->index);
syncNodeEventLog(ths, logBuf);
} while (0);
bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId)); bool sameId = syncUtilSameId(&(pSyncLeaderTransfer->newLeaderId), &(ths->myRaftId));
bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 && bool sameNodeInfo = strcmp(pSyncLeaderTransfer->newNodeInfo.nodeFqdn, ths->myNodeInfo.nodeFqdn) == 0 &&
......
...@@ -482,6 +482,7 @@ void cliReadTimeoutCb(uv_timer_t* handle) { ...@@ -482,6 +482,7 @@ void cliReadTimeoutCb(uv_timer_t* handle) {
// set up timeout cb // set up timeout cb
SCliConn* conn = handle->data; SCliConn* conn = handle->data;
tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn)); tTrace("%s conn %p timeout, ref:%d", CONN_GET_INST_LABEL(conn), conn, T_REF_VAL_GET(conn));
uv_read_stop(conn->stream);
cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT); cliHandleExceptImpl(conn, TSDB_CODE_RPC_TIMEOUT);
} }
...@@ -993,6 +994,8 @@ static void cliAsyncCb(uv_async_t* handle) { ...@@ -993,6 +994,8 @@ static void cliAsyncCb(uv_async_t* handle) {
if (count >= 2) { if (count >= 2) {
tTrace("cli process batch size:%d", count); tTrace("cli process batch size:%d", count);
} }
// if (!uv_is_active((uv_handle_t*)pThrd->prepare)) uv_prepare_start(pThrd->prepare, cliPrepareCb);
if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd); if (pThrd->stopMsg != NULL) cliHandleQuit(pThrd->stopMsg, pThrd);
} }
static void cliPrepareCb(uv_prepare_t* handle) { static void cliPrepareCb(uv_prepare_t* handle) {
...@@ -1088,7 +1091,7 @@ static SCliThrd* createThrdObj() { ...@@ -1088,7 +1091,7 @@ static SCliThrd* createThrdObj() {
pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t)); pThrd->prepare = taosMemoryCalloc(1, sizeof(uv_prepare_t));
uv_prepare_init(pThrd->loop, pThrd->prepare); uv_prepare_init(pThrd->loop, pThrd->prepare);
pThrd->prepare->data = pThrd; pThrd->prepare->data = pThrd;
uv_prepare_start(pThrd->prepare, cliPrepareCb); // uv_prepare_start(pThrd->prepare, cliPrepareCb);
int32_t timerSize = 512; int32_t timerSize = 512;
pThrd->timerList = taosArrayInit(timerSize, sizeof(void*)); pThrd->timerList = taosArrayInit(timerSize, sizeof(void*));
...@@ -1125,7 +1128,6 @@ static void destroyThrdObj(SCliThrd* pThrd) { ...@@ -1125,7 +1128,6 @@ static void destroyThrdObj(SCliThrd* pThrd) {
taosMemoryFree(timer); taosMemoryFree(timer);
} }
taosArrayDestroy(pThrd->timerList); taosArrayDestroy(pThrd->timerList);
taosMemoryFree(pThrd->prepare); taosMemoryFree(pThrd->prepare);
taosMemoryFree(pThrd->loop); taosMemoryFree(pThrd->loop);
taosMemoryFree(pThrd); taosMemoryFree(pThrd);
......
...@@ -193,7 +193,13 @@ void* interlocked_sub_fetch_ptr(void* volatile* ptr, void* val) { ...@@ -193,7 +193,13 @@ void* interlocked_sub_fetch_ptr(void* volatile* ptr, void* val) {
} }
int32_t interlocked_fetch_sub_32(int32_t volatile* ptr, int32_t val) { return _InterlockedExchangeAdd(ptr, -val); } int32_t interlocked_fetch_sub_32(int32_t volatile* ptr, int32_t val) { return _InterlockedExchangeAdd(ptr, -val); }
int64_t interlocked_fetch_sub_64(int64_t volatile* ptr, int64_t val) { return _InterlockedExchangeAdd64(ptr, -val); } int64_t interlocked_fetch_sub_64(int64_t volatile* ptr, int64_t val) {
#ifdef _TD_WINDOWS_32
return _InterlockedExchangeAdd((int32_t volatile*)ptr, -(int32_t)val);
#else
return _InterlockedExchangeAdd64(ptr, -val);
#endif
}
void* interlocked_fetch_sub_ptr(void* volatile* ptr, void* val) { void* interlocked_fetch_sub_ptr(void* volatile* ptr, void* val) {
#ifdef WINDOWS #ifdef WINDOWS
...@@ -375,7 +381,11 @@ int32_t atomic_exchange_32(int32_t volatile* ptr, int32_t val) { ...@@ -375,7 +381,11 @@ int32_t atomic_exchange_32(int32_t volatile* ptr, int32_t val) {
int64_t atomic_exchange_64(int64_t volatile* ptr, int64_t val) { int64_t atomic_exchange_64(int64_t volatile* ptr, int64_t val) {
#ifdef WINDOWS #ifdef WINDOWS
#ifdef _TD_WINDOWS_32
return _InterlockedExchange((int32_t volatile*)(ptr), (int32_t)(val));
#else
return _InterlockedExchange64((int64_t volatile*)(ptr), (int64_t)(val)); return _InterlockedExchange64((int64_t volatile*)(ptr), (int64_t)(val));
#endif
#elif defined(_TD_NINGSI_60) #elif defined(_TD_NINGSI_60)
return atomic_exchange_64_impl((int64_t*)ptr, (int64_t)val); return atomic_exchange_64_impl((int64_t*)ptr, (int64_t)val);
#else #else
...@@ -529,7 +539,11 @@ int32_t atomic_fetch_add_32(int32_t volatile* ptr, int32_t val) { ...@@ -529,7 +539,11 @@ int32_t atomic_fetch_add_32(int32_t volatile* ptr, int32_t val) {
int64_t atomic_fetch_add_64(int64_t volatile* ptr, int64_t val) { int64_t atomic_fetch_add_64(int64_t volatile* ptr, int64_t val) {
#ifdef WINDOWS #ifdef WINDOWS
#ifdef _TD_WINDOWS_32
return _InterlockedExchangeAdd((int32_t volatile*)(ptr), (int32_t)(val));
#else
return _InterlockedExchangeAdd64((int64_t volatile*)(ptr), (int64_t)(val)); return _InterlockedExchangeAdd64((int64_t volatile*)(ptr), (int64_t)(val));
#endif
#elif defined(_TD_NINGSI_60) #elif defined(_TD_NINGSI_60)
return __sync_fetch_and_add((ptr), (val)); return __sync_fetch_and_add((ptr), (val));
#else #else
...@@ -631,7 +645,11 @@ int32_t atomic_fetch_sub_32(int32_t volatile* ptr, int32_t val) { ...@@ -631,7 +645,11 @@ int32_t atomic_fetch_sub_32(int32_t volatile* ptr, int32_t val) {
int64_t atomic_fetch_sub_64(int64_t volatile* ptr, int64_t val) { int64_t atomic_fetch_sub_64(int64_t volatile* ptr, int64_t val) {
#ifdef WINDOWS #ifdef WINDOWS
#ifdef _TD_WINDOWS_32
return _InterlockedExchangeAdd((int32_t volatile*)(ptr), -(int32_t)(val));
#else
return _InterlockedExchangeAdd64((int64_t volatile*)(ptr), -(int64_t)(val)); return _InterlockedExchangeAdd64((int64_t volatile*)(ptr), -(int64_t)(val));
#endif
#elif defined(_TD_NINGSI_60) #elif defined(_TD_NINGSI_60)
return __sync_fetch_and_sub((ptr), (val)); return __sync_fetch_and_sub((ptr), (val));
#else #else
......
...@@ -48,7 +48,7 @@ char *strsep(char **stringp, const char *delim) { ...@@ -48,7 +48,7 @@ char *strsep(char **stringp, const char *delim) {
/* NOTREACHED */ /* NOTREACHED */
} }
/* Duplicate a string, up to at most size characters */ /* Duplicate a string, up to at most size characters */
char *strndup(const char *s, size_t size) { char *strndup(const char *s, int size) {
size_t l; size_t l;
char * s2; char * s2;
l = strlen(s); l = strlen(s);
...@@ -62,7 +62,7 @@ char *strndup(const char *s, size_t size) { ...@@ -62,7 +62,7 @@ char *strndup(const char *s, size_t size) {
} }
/* Copy no more than N characters of SRC to DEST, returning the address of /* Copy no more than N characters of SRC to DEST, returning the address of
the terminating '\0' in DEST, if any, or else DEST + N. */ the terminating '\0' in DEST, if any, or else DEST + N. */
char *stpncpy(char *dest, const char *src, size_t n) { char *stpncpy(char *dest, const char *src, int n) {
size_t size = strnlen(src, n); size_t size = strnlen(src, n);
memcpy(dest, src, size); memcpy(dest, src, size);
dest += size; dest += size;
......
Markdown is supported
0% .
You are about to add 0 people to the discussion. Proceed with caution.
先完成此消息的编辑!
想要评论请 注册