diff --git a/docs/zh/07-develop/01-connect/index.md b/docs/zh/07-develop/01-connect/index.md index b1502bfd76eff85f0b5e6706a341cb2f131e8dfa..3e44e6c5dabc571946a405388c7c02cda2b2dccb 100644 --- a/docs/zh/07-develop/01-connect/index.md +++ b/docs/zh/07-develop/01-connect/index.md @@ -33,7 +33,7 @@ TDengine 提供了丰富的应用程序开发接口,为了便于用户快速 关键不同点在于: 1. 使用 REST 连接,用户无需安装客户端驱动程序 taosc,具有跨平台易用的优势,但性能要下降 30%左右。 -2. 使用原生连接可以体验 TDengine 的全部功能,如[参数绑定接口](../connector/cpp/#参数绑定-api)、[订阅](../connector/cpp/#订阅和消费-api)等等。 +2. 使用原生连接可以体验 TDengine 的全部功能,如[参数绑定接口](../../connector/cpp/#参数绑定-api)、[订阅](../../connector/cpp/#订阅和消费-api)等等。 ## 安装客户端驱动 taosc diff --git a/docs/zh/07-develop/index.md b/docs/zh/07-develop/index.md index f46f91df70e286f14095f13d898e758e4f6783fb..20c017084475fa7da8ebd848770ba8d65454b644 100644 --- a/docs/zh/07-develop/index.md +++ b/docs/zh/07-develop/index.md @@ -12,7 +12,7 @@ title: 开发指南 7. 在很多场景下(如车辆管理),应用需要获取每个数据采集点的最新状态,那么建议你采用TDengine的cache功能,而不用单独部署Redis等缓存软件。 8. 如果你发现TDengine的函数无法满足你的要求,那么你可以使用用户自定义函数来解决问题。 -本部分内容就是按照上述的顺序组织的。为便于理解,TDengine为每个功能为每个支持的编程语言都提供了示例代码。如果你希望深入了解SQL的使用,需要查看[SQL手册](/taos-sql/)。如果想更深入地了解各连接器的使用,请阅读[连接器参考指南](../reference/connector/)。如果还希望想将TDengine与第三方系统集成起来,比如Grafana, 请参考[第三方工具](/third-party/)。 +本部分内容就是按照上述的顺序组织的。为便于理解,TDengine为每个功能为每个支持的编程语言都提供了示例代码。如果你希望深入了解SQL的使用,需要查看[SQL手册](/taos-sql/)。如果想更深入地了解各连接器的使用,请阅读[连接器参考指南](../connector/)。如果还希望想将TDengine与第三方系统集成起来,比如Grafana, 请参考[第三方工具](../third-party/)。 如果在开发过程中遇到任何问题,请点击每个页面下方的["反馈问题"](https://github.com/taosdata/TDengine/issues/new/choose), 在GitHub上直接递交issue。 diff --git a/docs/zh/08-connector/cpp.mdx b/docs/zh/08-connector/03-cpp.mdx similarity index 99% rename from docs/zh/08-connector/cpp.mdx rename to docs/zh/08-connector/03-cpp.mdx index 6e7d6c25b94b3ce436cc3b2353198a0dc84fea85..d27eeb7dfb5a2ca65b1527f0dad152e782d4f4b1 100644 --- a/docs/zh/08-connector/cpp.mdx +++ b/docs/zh/08-connector/03-cpp.mdx @@ -22,7 +22,7 @@ TDengine 客户端驱动的动态库位于: ## 支持的平台 -请参考[支持的平台列表](../connector#支持的平台) +请参考[支持的平台列表](../#支持的平台) ## 支持的版本 @@ -30,7 +30,7 @@ TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一 ## 安装步骤 -TDengine 客户端驱动的安装请参考 [安装指南](../connector#安装步骤) +TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤) ## 建立连接 diff --git a/docs/zh/08-connector/java.mdx b/docs/zh/08-connector/04-java.mdx similarity index 99% rename from docs/zh/08-connector/java.mdx rename to docs/zh/08-connector/04-java.mdx index 723b2ad6812e544e9815607d82642dbeefa716ad..20d2e4fabde9665f3f5672938cb9a71aee2249c3 100644 --- a/docs/zh/08-connector/java.mdx +++ b/docs/zh/08-connector/04-java.mdx @@ -35,7 +35,7 @@ REST 连接支持所有能运行 Java 的平台。 ## 版本支持 -请参考[版本支持列表](../connector#版本支持) +请参考[版本支持列表](../#版本支持) ## TDengine DataType 和 Java DataType @@ -64,7 +64,7 @@ TDengine 目前支持时间戳、数字、字符、布尔类型,与 Java 对 使用 Java Connector 连接数据库前,需要具备以下条件: - 已安装 Java 1.8 或以上版本运行时环境和 Maven 3.6 或以上版本 -- 已安装 TDengine 客户端驱动(使用原生连接必须安装,使用 REST 连接无需安装),具体步骤请参考[安装客户端驱动](../connector#安装客户端驱动) +- 已安装 TDengine 客户端驱动(使用原生连接必须安装,使用 REST 连接无需安装),具体步骤请参考[安装客户端驱动](../#安装客户端驱动) ### 安装连接器 @@ -630,7 +630,7 @@ public void setNString(int columnIndex, ArrayList list, int size) throws ### 无模式写入 -TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../schemaless)。 +TDengine 支持无模式写入功能。无模式写入兼容 InfluxDB 的 行协议(Line Protocol)、OpenTSDB 的 telnet 行协议和 OpenTSDB 的 JSON 格式协议。详情请参见[无模式写入](../../reference/schemaless/)。 **注意**: diff --git a/docs/zh/08-connector/go.mdx b/docs/zh/08-connector/05-go.mdx similarity index 98% rename from docs/zh/08-connector/go.mdx rename to docs/zh/08-connector/05-go.mdx index e883598c1231dd25b3913be78d778fd7238016dd..9d30f75190cddbb17c40e97655002a158cd6aae6 100644 --- a/docs/zh/08-connector/go.mdx +++ b/docs/zh/08-connector/05-go.mdx @@ -30,7 +30,7 @@ REST 连接支持所有能运行 Go 的平台。 ## 版本支持 -请参考[版本支持列表](../connector#版本支持) +请参考[版本支持列表](../#版本支持) ## 支持的功能特性 @@ -56,7 +56,7 @@ REST 连接支持所有能运行 Go 的平台。 ### 安装前准备 * 安装 Go 开发环境(Go 1.14 及以上,GCC 4.8.5 及以上) -* 如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../connector#安装客户端驱动) +* 如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动) 配置好环境变量,检查命令: diff --git a/docs/zh/08-connector/rust.mdx b/docs/zh/08-connector/06-rust.mdx similarity index 99% rename from docs/zh/08-connector/rust.mdx rename to docs/zh/08-connector/06-rust.mdx index ddb0885f9d5b03b1f067fd041de9a853b4781cb7..187e2f0b333062a7ea18f81d370562961bc1486d 100644 --- a/docs/zh/08-connector/rust.mdx +++ b/docs/zh/08-connector/06-rust.mdx @@ -28,7 +28,7 @@ Websocket 连接支持所有能运行 Rust 的平台。 ## 版本支持 -请参考[版本支持列表](../connector#版本支持) +请参考[版本支持列表](../#版本支持) Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容。建议使用 3.0 版本以上的 TDengine,以避免已知问题。 @@ -37,7 +37,7 @@ Rust 连接器仍然在快速开发中,1.0 之前无法保证其向后兼容 ### 安装前准备 * 安装 Rust 开发工具链 -* 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../connector#安装客户端驱动) +* 如果使用原生连接,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动) ### 添加 taos 依赖 diff --git a/docs/zh/08-connector/python.mdx b/docs/zh/08-connector/07-python.mdx similarity index 97% rename from docs/zh/08-connector/python.mdx rename to docs/zh/08-connector/07-python.mdx index 41cffc9f3b2f8edbe9f2e2197824270177112d96..88a5d4f84dd52f8a1a9f09d34b7fc98644bd8748 100644 --- a/docs/zh/08-connector/python.mdx +++ b/docs/zh/08-connector/07-python.mdx @@ -8,7 +8,7 @@ description: "taospy 是 TDengine 的官方 Python 连接器。taospy 提供了 import Tabs from "@theme/Tabs"; import TabItem from "@theme/TabItem"; -`taospy` 是 TDengine 的官方 Python 连接器。`taospy` 提供了丰富的 API, 使得 Python 应用可以很方便地使用 TDengine。`taospy` 对 TDengine 的[原生接口](../connector/cpp)和 [REST 接口](../rest-api)都进行了封装, 分别对应 `taospy` 包的 `taos` 模块 和 `taosrest` 模块。 +`taospy` 是 TDengine 的官方 Python 连接器。`taospy` 提供了丰富的 API, 使得 Python 应用可以很方便地使用 TDengine。`taospy` 对 TDengine 的[原生接口](../cpp)和 [REST 接口](../rest-api)都进行了封装, 分别对应 `taospy` 包的 `taos` 模块 和 `taosrest` 模块。 除了对原生接口和 REST 接口的封装,`taospy` 还提供了符合 [Python 数据访问规范(PEP 249)](https://peps.python.org/pep-0249/) 的编程接口。这使得 `taospy` 和很多第三方工具集成变得简单,比如 [SQLAlchemy](https://www.sqlalchemy.org/) 和 [pandas](https://pandas.pydata.org/)。 使用客户端驱动提供的原生接口直接与服务端建立的连接的方式下文中称为“原生连接”;使用 taosAdapter 提供的 REST 接口与服务端建立的连接的方式下文中称为“REST 连接”。 @@ -17,7 +17,7 @@ Python 连接器的源码托管在 [GitHub](https://github.com/taosdata/taos-con ## 支持的平台 -- 原生连接[支持的平台](../connector/#支持的平台)和 TDengine 客户端支持的平台一致。 +- 原生连接[支持的平台](../#支持的平台)和 TDengine 客户端支持的平台一致。 - REST 连接支持所有能运行 Python 的平台。 ## 版本选择 diff --git a/docs/zh/08-connector/node.mdx b/docs/zh/08-connector/08-node.mdx similarity index 96% rename from docs/zh/08-connector/node.mdx rename to docs/zh/08-connector/08-node.mdx index c4004a5f59e80d87bbae6c70cde507735fda1b2c..63d690e5541c886af84a85621a07d549803240eb 100644 --- a/docs/zh/08-connector/node.mdx +++ b/docs/zh/08-connector/08-node.mdx @@ -28,7 +28,7 @@ REST 连接器支持所有能运行 Node.js 的平台。 ## 版本支持 -请参考[版本支持列表](../connector#版本支持) +请参考[版本支持列表](../#版本支持) ## 支持的功能特性 @@ -52,7 +52,7 @@ REST 连接器支持所有能运行 Node.js 的平台。 ### 安装前准备 - 安装 Node.js 开发环境 -- 如果使用 REST 连接器,跳过此步。但如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../connector#安装客户端驱动)。我们使用 [node-gyp](https://github.com/nodejs/node-gyp) 和 TDengine 实例进行交互,还需要根据具体操作系统来安装下文提到的一些依赖工具。 +- 如果使用 REST 连接器,跳过此步。但如果使用原生连接器,请安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动)。我们使用 [node-gyp](https://github.com/nodejs/node-gyp) 和 TDengine 实例进行交互,还需要根据具体操作系统来安装下文提到的一些依赖工具。 diff --git a/docs/zh/08-connector/csharp.mdx b/docs/zh/08-connector/09-csharp.mdx similarity index 98% rename from docs/zh/08-connector/csharp.mdx rename to docs/zh/08-connector/09-csharp.mdx index 1650aef4bf925db496a9f069d72d1f71fabcc600..821471758374fa22a0f2b9805c25dbdbe40c0aed 100644 --- a/docs/zh/08-connector/csharp.mdx +++ b/docs/zh/08-connector/09-csharp.mdx @@ -32,7 +32,7 @@ import CSAsyncQuery from "../07-develop/04-query-data/_cs_async.mdx" ## 版本支持 -请参考[版本支持列表](../connector/#版本支持) +请参考[版本支持列表](../#版本支持) ## 支持的功能特性 @@ -49,7 +49,7 @@ import CSAsyncQuery from "../07-develop/04-query-data/_cs_async.mdx" * 安装 [.NET SDK](https://dotnet.microsoft.com/download) * [Nuget 客户端](https://docs.microsoft.com/en-us/nuget/install-nuget-client-tools) (可选安装) -* 安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../connector/#安装客户端驱动) +* 安装 TDengine 客户端驱动,具体步骤请参考[安装客户端驱动](../#安装客户端驱动) ### 使用 dotnet CLI 安装 diff --git a/docs/zh/08-connector/php.mdx b/docs/zh/08-connector/10-php.mdx similarity index 97% rename from docs/zh/08-connector/php.mdx rename to docs/zh/08-connector/10-php.mdx index 5c7525842ab76d124f9a249d47125f0596dde8c1..53611c027480a0a00445f2d31490107da5c475c7 100644 --- a/docs/zh/08-connector/php.mdx +++ b/docs/zh/08-connector/10-php.mdx @@ -38,7 +38,7 @@ TDengine 客户端驱动的版本号与 TDengine 服务端的版本号是一一 ### 安装 TDengine 客户端驱动 -TDengine 客户端驱动的安装请参考 [安装指南](../connector#安装步骤) +TDengine 客户端驱动的安装请参考 [安装指南](../#安装步骤) ### 编译安装 php-tdengine diff --git a/docs/zh/08-connector/03-connector.mdx b/docs/zh/08-connector/index.md similarity index 98% rename from docs/zh/08-connector/03-connector.mdx rename to docs/zh/08-connector/index.md index bdad0b7e25a3a94fa34b14bf47403ba2afd7db8d..17de8e926cd9a3633dc8746b0fb49c38ff8ca61f 100644 --- a/docs/zh/08-connector/03-connector.mdx +++ b/docs/zh/08-connector/index.md @@ -1,5 +1,7 @@ --- +sidebar_label: 连接器 title: 连接器 +description: 详细介绍各种语言的连接器及 REST API --- TDengine 提供了丰富的应用程序开发接口,为了便于用户快速开发自己的应用,TDengine 支持了多种编程语言的连接器,其中官方连接器包括支持 C/C++、Java、Python、Go、Node.js、C# 和 Rust 的连接器。这些连接器支持使用原生接口(taosc)和 REST 接口(部分语言暂不支持)连接 TDengine 集群。社区开发者也贡献了多个非官方连接器,例如 ADO.NET 连接器、Lua 连接器和 PHP 连接器。 diff --git a/docs/zh/12-taos-sql/14-stream.md b/docs/zh/12-taos-sql/14-stream.md index a967299e4093a4a8654d7aaf1b8c3914726aeadf..28f52be59a2a9867b92a8d94346dc9839a6bfba8 100644 --- a/docs/zh/12-taos-sql/14-stream.md +++ b/docs/zh/12-taos-sql/14-stream.md @@ -18,7 +18,7 @@ stream_options: { 其中 subquery 是 select 普通查询语法的子集: ```sql -subquery: SELECT [DISTINCT] select_list +subquery: SELECT select_list from_clause [WHERE condition] [PARTITION BY tag_list] @@ -37,7 +37,7 @@ window_clause: { 其中,SESSION 是会话窗口,tol_val 是时间间隔的最大范围。在 tol_val 时间间隔范围内的数据都属于同一个窗口,如果连续的两条数据的时间超过 tol_val,则自动开启下一个窗口。 -窗口的定义与时序数据特色查询中的定义完全相同。 +窗口的定义与时序数据特色查询中的定义完全相同,详见 [TDengine 特色查询](../distinguished) 例如,如下语句创建流式计算,同时自动创建名为 avg_vol 的超级表,此流计算以一分钟为时间窗口、30 秒为前向增量统计这些电表的平均电压,并将来自 meters 表的数据的计算结果写入 avg_vol 表,不同 partition 的数据会分别创建子表并写入不同子表。 diff --git a/examples/c/stream_demo.c b/examples/c/stream_demo.c index 2fcf4dd62c1e0a2f5aabda4ce5eb9fae6aa72be8..55556f21a1c0996b97a6cb4240a9e205ea7fb131 100644 --- a/examples/c/stream_demo.c +++ b/examples/c/stream_demo.c @@ -13,6 +13,7 @@ * along with this program. If not, see . */ +// clang-format off #include #include #include @@ -94,13 +95,8 @@ int32_t create_stream() { } taos_free_result(pRes); - /*const char* sql = "select min(k), max(k), sum(k) from tu1";*/ - /*const char* sql = "select min(k), max(k), sum(k) as sum_of_k from st1";*/ - /*const char* sql = "select sum(k) from tu1 interval(10m)";*/ - /*pRes = tmq_create_stream(pConn, "stream1", "out1", sql);*/ pRes = taos_query(pConn, - "create stream stream1 trigger max_delay 10s watermark 10s into outstb as select _wstart start, " - "count(k) from st1 partition by tbname interval(20s) "); + "create stream stream1 trigger at_once watermark 10s into outstb as select _wstart start, k from st1 partition by tbname state_window(k)"); if (taos_errno(pRes) != 0) { printf("failed to create stream stream1, reason:%s\n", taos_errstr(pRes)); return -1; diff --git a/include/libs/executor/executor.h b/include/libs/executor/executor.h index a64815f14fe0a0dbe5b85ffd0969a68d43f50d8e..1ce88905c23e8e6b9db35694ea0a9aa3197d5ba9 100644 --- a/include/libs/executor/executor.h +++ b/include/libs/executor/executor.h @@ -29,7 +29,7 @@ typedef void* DataSinkHandle; struct SRpcMsg; struct SSubplan; -typedef struct SReadHandle { +typedef struct { void* tqReader; void* meta; void* config; @@ -41,6 +41,7 @@ typedef struct SReadHandle { bool initTableReader; bool initTqReader; int32_t numOfVgroups; + void* pStateBackend; } SReadHandle; // in queue mode, data streams are seperated by msg @@ -78,8 +79,8 @@ int32_t qSetMultiStreamInput(qTaskInfo_t tinfo, const void* pBlocks, size_t numO /** * @brief Cleanup SSDataBlock for StreamScanInfo - * - * @param tinfo + * + * @param tinfo */ void tdCleanupStreamInputDataBlock(qTaskInfo_t tinfo); @@ -163,7 +164,7 @@ int32_t qGetQualifiedTableIdList(void* pTableList, const char* tagCond, int32_t void qProcessRspMsg(void* parent, struct SRpcMsg* pMsg, struct SEpSet* pEpSet); -int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList/*,int32_t* resNum, SExplainExecInfo** pRes*/); +int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, SArray* pExecInfoList /*,int32_t* resNum, SExplainExecInfo** pRes*/); int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len); diff --git a/include/libs/stream/tstream.h b/include/libs/stream/tstream.h index 384c6a289f304e4c59a097663bb4224e979bd226..16b259cf597e618388e5eec674c79154eff9ceb9 100644 --- a/include/libs/stream/tstream.h +++ b/include/libs/stream/tstream.h @@ -263,6 +263,14 @@ typedef struct { SArray* checkpointVer; } SStreamRecoveringState; +// incremental state storage +typedef struct { + SStreamTask* pOwner; + TDB* db; + TTB* pStateDb; + TXN txn; +} SStreamState; + typedef struct SStreamTask { int64_t streamId; int32_t taskId; @@ -312,6 +320,10 @@ typedef struct SStreamTask { // msg handle SMsgCb* pMsgCb; + + // state backend + SStreamState* pState; + } SStreamTask; int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo); @@ -507,7 +519,7 @@ typedef struct SStreamMeta { char* path; TDB* db; TTB* pTaskDb; - TTB* pStateDb; + TTB* pCheckpointDb; SHashObj* pTasks; SHashObj* pRecoverStatus; void* ahandle; @@ -528,6 +540,36 @@ int32_t streamMetaCommit(SStreamMeta* pMeta); int32_t streamMetaRollBack(SStreamMeta* pMeta); int32_t streamLoadTasks(SStreamMeta* pMeta); +SStreamState* streamStateOpen(char* path, SStreamTask* pTask); +void streamStateClose(SStreamState* pState); +int32_t streamStateBegin(SStreamState* pState); +int32_t streamStateCommit(SStreamState* pState); +int32_t streamStateAbort(SStreamState* pState); + +typedef struct { + TBC* pCur; +} SStreamStateCur; + +#if 1 +int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen); +int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen); +int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen); + +SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen); +SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen); +SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen); +void streamStateFreeCur(SStreamStateCur* pCur); + +int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen); + +int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur); +int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur); + +int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur); +int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur); + +#endif + #ifdef __cplusplus } #endif diff --git a/source/dnode/mnode/impl/test/sma/CMakeLists.txt b/source/dnode/mnode/impl/test/sma/CMakeLists.txt index 3f9ec123a80e88371a98fa54c99342726831372d..a55b45ca11d32f4aa0baa2462007f06e970ae3d6 100644 --- a/source/dnode/mnode/impl/test/sma/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/sma/CMakeLists.txt @@ -5,7 +5,9 @@ target_link_libraries( PUBLIC sut ) -add_test( - NAME smaTest - COMMAND smaTest -) +if(NOT ${TD_WINDOWS}) + add_test( + NAME smaTest + COMMAND smaTest + ) +endif(NOT ${TD_WINDOWS}) diff --git a/source/dnode/mnode/impl/test/stb/CMakeLists.txt b/source/dnode/mnode/impl/test/stb/CMakeLists.txt index dcfbe658fcca82f928400b1e9eed2efcfb09a052..e3a3fc2e793fa84a5da05519ae727bb572edaa27 100644 --- a/source/dnode/mnode/impl/test/stb/CMakeLists.txt +++ b/source/dnode/mnode/impl/test/stb/CMakeLists.txt @@ -5,7 +5,9 @@ target_link_libraries( PUBLIC sut ) -add_test( - NAME stbTest - COMMAND stbTest -) \ No newline at end of file +if(NOT ${TD_WINDOWS}) + add_test( + NAME stbTest + COMMAND stbTest + ) +endif(NOT ${TD_WINDOWS}) \ No newline at end of file diff --git a/source/dnode/vnode/src/tq/tq.c b/source/dnode/vnode/src/tq/tq.c index c6bc8e6e598507ea820bb109e84fbb41a0ed099b..1456c6c0674692351fa89250e20c148d1d5cf9a6 100644 --- a/source/dnode/vnode/src/tq/tq.c +++ b/source/dnode/vnode/src/tq/tq.c @@ -79,6 +79,10 @@ STQ* tqOpen(const char* path, SVnode* pVnode) { ASSERT(0); } + if (streamLoadTasks(pTq->pStreamMeta) < 0) { + ASSERT(0); + } + return pTq; } @@ -664,6 +668,11 @@ int32_t tqExpandTask(STQ* pTq, SStreamTask* pTask) { ASSERT(pTask->exec.executor); } + pTask->pState = streamStateOpen(pTq->pStreamMeta->path, pTask); + if (pTask->pState == NULL) { + return -1; + } + // sink /*pTask->ahandle = pTq->pVnode;*/ if (pTask->outputType == TASK_OUTPUT__SMA) { diff --git a/source/libs/executor/inc/executorimpl.h b/source/libs/executor/inc/executorimpl.h index 601c22a3ba793b17a8ccc8de2cd4dfa37d9c7682..67f7cb2f6fb7360886459381401598f98aa194ba 100644 --- a/source/libs/executor/inc/executorimpl.h +++ b/source/libs/executor/inc/executorimpl.h @@ -150,6 +150,7 @@ typedef struct { SQueryTableDataCond tableCond; int64_t recoverStartVer; int64_t recoverEndVer; + SStreamState* pState; } SStreamTaskInfo; typedef struct { diff --git a/source/libs/executor/src/executorimpl.c b/source/libs/executor/src/executorimpl.c index 1836ca6d9b9ce81bb39b7504275bd7a0305cdb27..98c7c56d729c350b5cb573d7349e8f0c6dbf605e 100644 --- a/source/libs/executor/src/executorimpl.c +++ b/source/libs/executor/src/executorimpl.c @@ -392,7 +392,7 @@ static void functionCtxSave(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { static void functionCtxRestore(SqlFunctionCtx* pCtx, SFunctionCtxStatus* pStatus) { pCtx->input.colDataAggIsSet = pStatus->hasAgg; - pCtx->input.numOfRows = pStatus->numOfRows; + pCtx->input.numOfRows = pStatus->numOfRows; pCtx->input.startRowIndex = pStatus->startOffset; } @@ -3715,7 +3715,7 @@ static int32_t initFillInfo(SFillOperatorInfo* pInfo, SExprInfo* pExpr, int32_t const char* id, SInterval* pInterval, int32_t fillType, int32_t order) { SFillColInfo* pColInfo = createFillColInfo(pExpr, numOfCols, pNotFillExpr, numOfNotFillCols, pValNode); - int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey; + int64_t startKey = (order == TSDB_ORDER_ASC) ? win.skey : win.ekey; STimeWindow w = getAlignQueryTimeWindow(pInterval, pInterval->precision, startKey); w = getFirstQualifiedTimeWindow(startKey, &w, pInterval, order); @@ -3988,15 +3988,15 @@ int32_t generateGroupIdMap(STableListInfo* pTableListInfo, SReadHandle* pHandle, bool assignUid = groupbyTbname(group); - size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); + size_t numOfTables = taosArrayGetSize(pTableListInfo->pTableList); - if(assignUid){ + if (assignUid) { for (int32_t i = 0; i < numOfTables; i++) { STableKeyInfo* info = taosArrayGet(pTableListInfo->pTableList, i); info->groupId = info->uid; taosHashPut(pTableListInfo->map, &(info->uid), sizeof(uint64_t), &info->groupId, sizeof(uint64_t)); } - }else{ + } else { int32_t code = getColInfoResultForGroupby(pHandle->meta, group, pTableListInfo); if (code != TSDB_CODE_SUCCESS) { return code; @@ -4615,6 +4615,10 @@ int32_t createExecTaskInfoImpl(SSubplan* pPlan, SExecTaskInfo** pTaskInfo, SRead goto _complete; } + if (pHandle && pHandle->pStateBackend) { + (*pTaskInfo)->streamInfo.pState = pHandle->pStateBackend; + } + (*pTaskInfo)->sql = sql; sql = NULL; (*pTaskInfo)->pSubplan = pPlan; diff --git a/source/libs/executor/src/timewindowoperator.c b/source/libs/executor/src/timewindowoperator.c index 551180f639caea6212403d66e3a2fca4f8f4ca39..3d54b791a8f2049d5c0a90b2554d9f91cf38e41e 100644 --- a/source/libs/executor/src/timewindowoperator.c +++ b/source/libs/executor/src/timewindowoperator.c @@ -3128,8 +3128,8 @@ static SSDataBlock* doStreamFinalIntervalAgg(SOperatorInfo* pOperator) { maxTs = TMAX(maxTs, pBlock->info.window.ekey); maxTs = TMAX(maxTs, pBlock->info.watermark); - if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA || - pBlock->info.type == STREAM_INVALID) { + ASSERT(pBlock->info.type != STREAM_INVERT); + if (pBlock->info.type == STREAM_NORMAL || pBlock->info.type == STREAM_PULL_DATA) { pInfo->binfo.pRes->info.type = pBlock->info.type; } else if (pBlock->info.type == STREAM_CLEAR) { SArray* pUpWins = taosArrayInit(8, sizeof(SWinRes)); diff --git a/source/libs/stream/src/streamExec.c b/source/libs/stream/src/streamExec.c index 06ca26f0292df2447fa7c267a0d43e65f4117964..102bad742652005df440b5d4d7a87bcef34ba636 100644 --- a/source/libs/stream/src/streamExec.c +++ b/source/libs/stream/src/streamExec.c @@ -140,7 +140,6 @@ int32_t streamPipelineExec(SStreamTask* pTask, int32_t batchNum, bool dispatch) return 0; } -// TODO: handle version int32_t streamExecForAll(SStreamTask* pTask) { while (1) { int32_t batchCnt = 1; diff --git a/source/libs/stream/src/streamMeta.c b/source/libs/stream/src/streamMeta.c index 5ff700546cf63acd3e0c4d0798383d6724947de1..20a2f7d332ce97511ec7bcd752539ff626ce0f54 100644 --- a/source/libs/stream/src/streamMeta.c +++ b/source/libs/stream/src/streamMeta.c @@ -14,7 +14,7 @@ */ #include "executor.h" -#include "tstream.h" +#include "streamInc.h" #include "ttimer.h" SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandFunc) { @@ -23,17 +23,23 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF terrno = TSDB_CODE_OUT_OF_MEMORY; return NULL; } - pMeta->path = strdup(path); + int32_t len = strlen(path) + 20; + char* streamPath = taosMemoryCalloc(1, len); + sprintf(streamPath, "%s/%s", path, "stream"); + pMeta->path = strdup(streamPath); if (tdbOpen(pMeta->path, 16 * 1024, 1, &pMeta->db) < 0) { goto _err; } + sprintf(streamPath, "%s/%s", pMeta->path, "checkpoints"); + mkdir(streamPath, 0755); + taosMemoryFree(streamPath); + if (tdbTbOpen("task.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pTaskDb) < 0) { goto _err; } - // open state storage backend - if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pStateDb) < 0) { + if (tdbTbOpen("checkpoint.db", sizeof(int32_t), -1, NULL, pMeta->db, &pMeta->pCheckpointDb) < 0) { goto _err; } @@ -49,16 +55,13 @@ SStreamMeta* streamMetaOpen(const char* path, void* ahandle, FTaskExpand expandF pMeta->ahandle = ahandle; pMeta->expandFunc = expandFunc; - if (streamLoadTasks(pMeta) < 0) { - goto _err; - } return pMeta; _err: if (pMeta->path) taosMemoryFree(pMeta->path); if (pMeta->pTasks) taosHashCleanup(pMeta->pTasks); - if (pMeta->pStateDb) tdbTbClose(pMeta->pStateDb); if (pMeta->pTaskDb) tdbTbClose(pMeta->pTaskDb); + if (pMeta->pCheckpointDb) tdbTbClose(pMeta->pCheckpointDb); if (pMeta->db) tdbClose(pMeta->db); taosMemoryFree(pMeta); return NULL; @@ -67,7 +70,7 @@ _err: void streamMetaClose(SStreamMeta* pMeta) { tdbCommit(pMeta->db, &pMeta->txn); tdbTbClose(pMeta->pTaskDb); - tdbTbClose(pMeta->pStateDb); + tdbTbClose(pMeta->pCheckpointDb); tdbClose(pMeta->db); void* pIter = NULL; diff --git a/source/libs/stream/src/streamRecover.c b/source/libs/stream/src/streamRecover.c index 263053778b1ae94de5a5353edf158e37604baf98..0505c3edd6dd8211792679b7164bcc001bde6c4e 100644 --- a/source/libs/stream/src/streamRecover.c +++ b/source/libs/stream/src/streamRecover.c @@ -176,6 +176,7 @@ int32_t tDecodeSStreamTaskRecoverRsp(SDecoder* pDecoder, SStreamRecoverDownstrea } int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { +#if 0 void* buf = NULL; ASSERT(pTask->taskLevel == TASK_LEVEL__SINK); @@ -224,10 +225,12 @@ int32_t streamSaveStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { FAIL: if (buf) taosMemoryFree(buf); return -1; +#endif return 0; } int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { +#if 0 void* pVal = NULL; int32_t vLen = 0; if (tdbTbGet(pMeta->pStateDb, &pTask->taskId, sizeof(void*), &pVal, &vLen) < 0) { @@ -241,7 +244,7 @@ int32_t streamLoadStateInfo(SStreamMeta* pMeta, SStreamTask* pTask) { pTask->nextCheckId = aggCheckpoint.checkpointId + 1; pTask->checkpointInfo = aggCheckpoint.checkpointVer; - +#endif return 0; } diff --git a/source/libs/stream/src/streamState.c b/source/libs/stream/src/streamState.c new file mode 100644 index 0000000000000000000000000000000000000000..6ccc90fa5172492ce9299bd2bc311965913e9004 --- /dev/null +++ b/source/libs/stream/src/streamState.c @@ -0,0 +1,187 @@ +/* + * Copyright (c) 2019 TAOS Data, Inc. + * + * This program is free software: you can use, redistribute, and/or modify + * it under the terms of the GNU Affero General Public License, version 3 + * or later ("AGPL"), as published by the Free Software Foundation. + * + * This program is distributed in the hope that it will be useful, but WITHOUT + * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or + * FITNESS FOR A PARTICULAR PURPOSE. + * + * You should have received a copy of the GNU Affero General Public License + * along with this program. If not, see . + */ + +#include "executor.h" +#include "streamInc.h" +#include "ttimer.h" + +SStreamState* streamStateOpen(char* path, SStreamTask* pTask) { + SStreamState* pState = taosMemoryCalloc(1, sizeof(SStreamState)); + if (pState == NULL) { + terrno = TSDB_CODE_OUT_OF_MEMORY; + return NULL; + } + char statePath[200]; + sprintf(statePath, "%s/%d", path, pTask->taskId); + if (tdbOpen(statePath, 16 * 1024, 1, &pState->db) < 0) { + goto _err; + } + + // open state storage backend + if (tdbTbOpen("state.db", sizeof(int32_t), -1, NULL, pState->db, &pState->pStateDb) < 0) { + goto _err; + } + + pState->pOwner = pTask; + + return pState; + +_err: + if (pState->pStateDb) tdbTbClose(pState->pStateDb); + if (pState->db) tdbClose(pState->db); + taosMemoryFree(pState); + return NULL; +} + +void streamStateClose(SStreamState* pState) { + tdbCommit(pState->db, &pState->txn); + tdbTbClose(pState->pStateDb); + tdbClose(pState->db); + + taosMemoryFree(pState); +} + +int32_t streamStateBegin(SStreamState* pState) { + if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { + return -1; + } + + if (tdbBegin(pState->db, &pState->txn) < 0) { + return -1; + } + return 0; +} + +int32_t streamStateCommit(SStreamState* pState) { + if (tdbCommit(pState->db, &pState->txn) < 0) { + return -1; + } + memset(&pState->txn, 0, sizeof(TXN)); + if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { + return -1; + } + if (tdbBegin(pState->db, &pState->txn) < 0) { + return -1; + } + return 0; +} + +int32_t streamStateAbort(SStreamState* pState) { + if (tdbAbort(pState->db, &pState->txn) < 0) { + return -1; + } + memset(&pState->txn, 0, sizeof(TXN)); + if (tdbTxnOpen(&pState->txn, 0, tdbDefaultMalloc, tdbDefaultFree, NULL, TDB_TXN_WRITE | TDB_TXN_READ_UNCOMMITTED) < + 0) { + return -1; + } + if (tdbBegin(pState->db, &pState->txn) < 0) { + return -1; + } + return 0; +} + +int32_t streamStatePut(SStreamState* pState, const void* key, int32_t kLen, const void* value, int32_t vLen) { + return tdbTbUpsert(pState->pStateDb, key, kLen, value, vLen, &pState->txn); +} +int32_t streamStateGet(SStreamState* pState, const void* key, int32_t kLen, void** pVal, int32_t* pVLen) { + return tdbTbGet(pState->pStateDb, key, kLen, pVal, pVLen); +} + +int32_t streamStateDel(SStreamState* pState, const void* key, int32_t kLen) { + return tdbTbDelete(pState->pStateDb, key, kLen, &pState->txn); +} + +SStreamStateCur* streamStateGetCur(SStreamState* pState, const void* key, int32_t kLen) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) return NULL; + tdbTbcOpen(pState->pStateDb, &pCur->pCur, NULL); + + int32_t c; + tdbTbcMoveTo(pCur->pCur, key, kLen, &c); + if (c != 0) { + taosMemoryFree(pCur); + return NULL; + } + return 0; +} + +int32_t streamGetKVByCur(SStreamStateCur* pCur, void** pKey, int32_t* pKLen, void** pVal, int32_t* pVLen) { + return tdbTbcGet(pCur->pCur, (const void**)pKey, pKLen, (const void**)pVal, pVLen); +} + +int32_t streamStateSeekFirst(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToFirst(pCur->pCur); +} + +int32_t streamStateSeekLast(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToLast(pCur->pCur); +} + +SStreamStateCur* streamStateSeekKeyNext(SStreamState* pState, const void* key, int32_t kLen) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + + int32_t c; + if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) { + taosMemoryFree(pCur); + return NULL; + } + if (c > 0) return pCur; + + if (tdbTbcMoveToNext(pCur->pCur) < 0) { + taosMemoryFree(pCur); + return NULL; + } + + return pCur; +} + +SStreamStateCur* streamStateSeekKeyPrev(SStreamState* pState, const void* key, int32_t kLen) { + SStreamStateCur* pCur = taosMemoryCalloc(1, sizeof(SStreamStateCur)); + if (pCur == NULL) { + return NULL; + } + + int32_t c; + if (tdbTbcMoveTo(pCur->pCur, key, kLen, &c) < 0) { + taosMemoryFree(pCur); + return NULL; + } + if (c < 0) return pCur; + + if (tdbTbcMoveToPrev(pCur->pCur) < 0) { + taosMemoryFree(pCur); + return NULL; + } + + return pCur; +} + +int32_t streamStateCurNext(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToNext(pCur->pCur); +} + +int32_t streamStateCurPrev(SStreamState* pState, SStreamStateCur* pCur) { + // + return tdbTbcMoveToPrev(pCur->pCur); +} diff --git a/source/libs/stream/src/streamTask.c b/source/libs/stream/src/streamTask.c index 4009a47c65af469bc8a6f4fe5443411306e4ec2b..ce5917de296c317f739e79cb78cda21660769aa8 100644 --- a/source/libs/stream/src/streamTask.c +++ b/source/libs/stream/src/streamTask.c @@ -165,5 +165,8 @@ void tFreeSStreamTask(SStreamTask* pTask) { if (pTask->outputType == TASK_OUTPUT__SHUFFLE_DISPATCH) { taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos); } + + if (pTask->pState) streamStateClose(pTask->pState); + taosMemoryFree(pTask); } diff --git a/source/libs/wal/src/walMeta.c b/source/libs/wal/src/walMeta.c index a8da6809100fa5789e5b7e57e051631257782e1e..93ced912f8e2358c2aab6f04957ce060cf61c924 100644 --- a/source/libs/wal/src/walMeta.c +++ b/source/libs/wal/src/walMeta.c @@ -121,7 +121,7 @@ static FORCE_INLINE int64_t walScanLogGetLastVer(SWal* pWal) { if (found == NULL) { // file corrupted, no complete log // TODO delete and search in previous files - ASSERT(0); + /*ASSERT(0);*/ terrno = TSDB_CODE_WAL_FILE_CORRUPTED; return -1; } @@ -221,7 +221,6 @@ int walCheckAndRepairMeta(SWal* pWal) { int code = walSaveMeta(pWal); if (code < 0) { - taosArrayDestroy(actualLog); return -1; } }