diff --git a/docs/en/07-develop/06-stream.md b/docs/en/07-develop/06-stream.md new file mode 100644 index 0000000000000000000000000000000000000000..36f903ee9a4f2d210e63d0b79e702bc199f790ed --- /dev/null +++ b/docs/en/07-develop/06-stream.md @@ -0,0 +1,113 @@ +--- +sidebar_label: Stream Processing +description: "The TDengine stream processing engine combines data inserts, preprocessing, analytics, real-time computation, and alerting into a single component." +title: Stream Processing +--- + +Raw time-series data is often cleaned and preprocessed before being permanently stored in a database. In a traditional time-series solution, this generally requires the deployment of stream processing systems such as Kafka or Flink. However, the complexity of such systems increases the cost of development and maintenance. + +With the stream processing engine built into TDengine, you can process incoming data streams in real time and define stream transformations in SQL. Incoming data is automatically processed, and the results are pushed to specified tables based on triggering rules that you define. This is a lightweight alternative to complex processing engines that returns computation results in milliseconds even in high throughput scenarios. + +The stream processing engine includes data filtering, scalar function computation (including user-defined functions), and window aggregation, with support for sliding windows, session windows, and event windows. Stream processing can write data to supertables from other supertables, standard tables, or subtables. When you create a stream, the target supertable is automatically created. New data is then processed and written to that supertable according to the rules defined for the stream. You can use PARTITION BY statements to partition the data by table name or tag. Separate partitions are then written to different subtables within the target supertable. + +TDengine stream processing supports the aggregation of supertables that are deployed across multiple vnodes. It can also handle out-of-order writes and includes a watermark mechanism that determines the extent to which out-of-order data is accepted by the system. You can configure whether to drop or reprocess out-of-order data through the **ignore expired** parameter. + +For more information, see [Stream Processing](../../taos-sql/stream). + + +## Create a Stream + +```sql +CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name AS subquery +stream_options: { + TRIGGER [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time] + WATERMARK time + IGNORE EXPIRED [0 | 1] +} +``` + +For more information, see [Stream Processing](../../taos-sql/stream). + +## Usage Scenario 1 + +It is common that smart electrical meter systems for businesses generate millions of data points that are widely dispersed and not ordered. The time required to clean and convert this data makes efficient, real-time processing impossible for traditional solutions. This scenario shows how you can configure TDengine stream processing to drop data points over 220 V, find the maximum voltage for 5 second windows, and output this data to a table. + +### Create a Database for Raw Data + +A database including one supertable and four subtables is created as follows: + +```sql +DROP DATABASE IF EXISTS power; +CREATE DATABASE power; +USE power; + +CREATE STABLE meters (ts timestamp, current float, voltage int, phase float) TAGS (location binary(64), groupId int); + +CREATE TABLE d1001 USING meters TAGS ("California.SanFrancisco", 2); +CREATE TABLE d1002 USING meters TAGS ("California.SanFrancisco", 3); +CREATE TABLE d1003 USING meters TAGS ("California.LosAngeles", 2); +CREATE TABLE d1004 USING meters TAGS ("California.LosAngeles", 3); +``` + +### Create a Stream + +```sql +create stream current_stream into current_stream_output_stb as select _wstart as start, _wend as end, max(current) as max_current from meters where voltage <= 220 interval (5s); +``` + +### Write Data +```sql +insert into d1001 values("2018-10-03 14:38:05.000", 10.30000, 219, 0.31000); +insert into d1001 values("2018-10-03 14:38:15.000", 12.60000, 218, 0.33000); +insert into d1001 values("2018-10-03 14:38:16.800", 12.30000, 221, 0.31000); +insert into d1002 values("2018-10-03 14:38:16.650", 10.30000, 218, 0.25000); +insert into d1003 values("2018-10-03 14:38:05.500", 11.80000, 221, 0.28000); +insert into d1003 values("2018-10-03 14:38:16.600", 13.40000, 223, 0.29000); +insert into d1004 values("2018-10-03 14:38:05.000", 10.80000, 223, 0.29000); +insert into d1004 values("2018-10-03 14:38:06.500", 11.50000, 221, 0.35000); +``` + +### Query the Results + +```sql +taos> select start, end, max_current from current_stream_output_stb; + start | end | max_current | +=========================================================================== + 2018-10-03 14:38:05.000 | 2018-10-03 14:38:10.000 | 10.30000 | + 2018-10-03 14:38:15.000 | 2018-10-03 14:38:20.000 | 12.60000 | +Query OK, 2 rows in database (0.018762s) +``` + +## Usage Scenario 2 + +In this scenario, the active power and reactive power are determined from the data gathered in the previous scenario. The location and name of each meter are concatenated with a period (.) between them, and the data set is partitioned by meter name and written to a new database. + +### Create a Database for Raw Data + +The procedure from the previous scenario is used to create the database. + +### Create a Stream + +```sql +create stream power_stream into power_stream_output_stb as select ts, concat_ws(".", location, tbname) as meter_location, current*voltage*cos(phase) as active_power, current*voltage*sin(phase) as reactive_power from meters partition by tbname; +``` + +### Write data + +The procedure from the previous scenario is used to write the data. + +### Query the Results +```sql +taos> select ts, meter_location, active_power, reactive_power from power_stream_output_stb; + ts | meter_location | active_power | reactive_power | +=================================================================================================================== + 2018-10-03 14:38:05.000 | California.LosAngeles.d1004 | 2307.834596289 | 688.687331847 | + 2018-10-03 14:38:06.500 | California.LosAngeles.d1004 | 2387.415754896 | 871.474763418 | + 2018-10-03 14:38:05.500 | California.LosAngeles.d1003 | 2506.240411679 | 720.680274962 | + 2018-10-03 14:38:16.600 | California.LosAngeles.d1003 | 2863.424274422 | 854.482390839 | + 2018-10-03 14:38:05.000 | California.SanFrancisco.d1001 | 2148.178871730 | 688.120784090 | + 2018-10-03 14:38:15.000 | California.SanFrancisco.d1001 | 2598.589176205 | 890.081451418 | + 2018-10-03 14:38:16.800 | California.SanFrancisco.d1001 | 2588.728381186 | 829.240910475 | + 2018-10-03 14:38:16.650 | California.SanFrancisco.d1002 | 2175.595991997 | 555.520860397 | +Query OK, 8 rows in database (0.014753s) +``` diff --git a/docs/en/07-develop/07-tmq.mdx b/docs/en/07-develop/07-tmq.mdx new file mode 100644 index 0000000000000000000000000000000000000000..ceeea64fca91473ea62de404fb9e92c179f7e6d4 --- /dev/null +++ b/docs/en/07-develop/07-tmq.mdx @@ -0,0 +1,841 @@ +--- +sidebar_label: Data Subscription +description: "The TDengine data subscription service automatically pushes data written in TDengine to subscribing clients." +title: Data Subscription +--- + +import Tabs from "@theme/Tabs"; +import TabItem from "@theme/TabItem"; +import Java from "./_sub_java.mdx"; +import Python from "./_sub_python.mdx"; +import Go from "./_sub_go.mdx"; +import Rust from "./_sub_rust.mdx"; +import Node from "./_sub_node.mdx"; +import CSharp from "./_sub_cs.mdx"; +import CDemo from "./_sub_c.mdx"; + +TDengine provides data subscription and consumption interfaces similar to message queue products. These interfaces make it easier for applications to obtain data written to TDengine either in real time and to process data in the order that events occurred. This simplifies your time-series data processing systems and reduces your costs because it is no longer necessary to deploy a message queue product such as Kafka. + +To use TDengine data subscription, you define topics like in Kafka. However, a topic in TDengine is based on query conditions for an existing supertable, standard table, or subtable - in other words, a SELECT statement. You can use SQL to filter data by tag, table name, column, or expression and then perform a scalar function or user-defined function on the data. Aggregate functions are not supported. This gives TDengine data subscription more flexibility than similar products. The granularity of data can be controlled on demand by applications, while filtering and preprocessing are handled by TDengine instead of the application layer. This implementation reduces the amount of data transmitted and the complexity of applications. + +By subscribing to a topic, a consumer can obtain the latest data in that topic in real time. Multiple consumers can be formed into a consumer group that consumes messages together. Consumer groups enable faster speed through multi-threaded, distributed data consumption. Note that consumers in different groups that are subscribed to the same topic do not consume messages together. A single consumer can subscribe to multiple topics. If the data in a supertable is sharded across multiple vnodes, consumer groups can consume it much more efficiently than single consumers. TDengine also includes an acknowledgement mechanism that ensures at-least-once delivery in complicated environments where machines may crash or restart. + +To implement these features, TDengine indexes its write-ahead log (WAL) file for fast random access and provides configurable methods for replacing and retaining this file. You can define a retention period and size for this file. For information, see the CREATE DATABASE statement. In this way, the WAL file is transformed into a persistent storage engine that remembers the order in which events occur. However, note that configuring an overly long retention period for your WAL files makes database compression inefficient. TDengine then uses the WAL file instead of the time-series database as its storage engine for queries in the form of topics. TDengine reads the data from the WAL file; uses a unified query engine instance to perform filtering, transformations, and other operations; and finally pushes the data to consumers. + + + +## Data Schema and API + +The related schemas and APIs in various languages are described as follows: + + + + +```c +typedef struct tmq_t tmq_t; +typedef struct tmq_conf_t tmq_conf_t; +typedef struct tmq_list_t tmq_list_t; + +typedef void(tmq_commit_cb(tmq_t *, int32_t code, void *param)); + +DLL_EXPORT tmq_list_t *tmq_list_new(); +DLL_EXPORT int32_t tmq_list_append(tmq_list_t *, const char *); +DLL_EXPORT void tmq_list_destroy(tmq_list_t *); +DLL_EXPORT tmq_t *tmq_consumer_new(tmq_conf_t *conf, char *errstr, int32_t errstrLen); +DLL_EXPORT const char *tmq_err2str(int32_t code); + +DLL_EXPORT int32_t tmq_subscribe(tmq_t *tmq, const tmq_list_t *topic_list); +DLL_EXPORT int32_t tmq_unsubscribe(tmq_t *tmq); +DLL_EXPORT TAOS_RES *tmq_consumer_poll(tmq_t *tmq, int64_t timeout); +DLL_EXPORT int32_t tmq_consumer_close(tmq_t *tmq); +DLL_EXPORT int32_t tmq_commit_sync(tmq_t *tmq, const TAOS_RES *msg); +DLL_EXPORT void tmq_commit_async(tmq_t *tmq, const TAOS_RES *msg, tmq_commit_cb *cb, void *param); + +enum tmq_conf_res_t { + TMQ_CONF_UNKNOWN = -2, + TMQ_CONF_INVALID = -1, + TMQ_CONF_OK = 0, +}; +typedef enum tmq_conf_res_t tmq_conf_res_t; + +DLL_EXPORT tmq_conf_t *tmq_conf_new(); +DLL_EXPORT tmq_conf_res_t tmq_conf_set(tmq_conf_t *conf, const char *key, const char *value); +DLL_EXPORT void tmq_conf_destroy(tmq_conf_t *conf); +DLL_EXPORT void tmq_conf_set_auto_commit_cb(tmq_conf_t *conf, tmq_commit_cb *cb, void *param); +``` + +For more information, see [C/C++ Connector](/reference/connector/cpp). + +The following example is based on the smart meter table described in Data Models. For complete sample code, see the C language section below. + + + + +```java +void subscribe(Collection topics) throws SQLException; + +void unsubscribe() throws SQLException; + +Set subscription() throws SQLException; + +ConsumerRecords poll(Duration timeout) throws SQLException; + +void commitAsync(); + +void commitAsync(OffsetCommitCallback callback); + +void commitSync() throws SQLException; + +void close() throws SQLException; +``` + + + + + +```python +class TaosConsumer(): + def __init__(self, *topics, **configs) + + def __iter__(self) + + def __next__(self) + + def sync_next(self) + + def subscription(self) + + def unsubscribe(self) + + def close(self) + + def __del__(self) +``` + + + + + +```go +func NewConsumer(conf *Config) (*Consumer, error) + +func (c *Consumer) Close() error + +func (c *Consumer) Commit(ctx context.Context, message unsafe.Pointer) error + +func (c *Consumer) FreeMessage(message unsafe.Pointer) + +func (c *Consumer) Poll(timeout time.Duration) (*Result, error) + +func (c *Consumer) Subscribe(topics []string) error + +func (c *Consumer) Unsubscribe() error +``` + + + + + +```rust +impl TBuilder for TmqBuilder + fn from_dsn(dsn: D) -> Result + fn build(&self) -> Result + +impl AsAsyncConsumer for Consumer + async fn subscribe, I: IntoIterator + Send>( + &mut self, + topics: I, + ) -> Result<(), Self::Error>; + fn stream( + &self, + ) -> Pin< + Box< + dyn '_ + + Send + + futures::Stream< + Item = Result<(Self::Offset, MessageSet), Self::Error>, + >, + >, + >; + async fn commit(&self, offset: Self::Offset) -> Result<(), Self::Error>; + + async fn unsubscribe(self); +``` + +For more information, see [Crate taos](https://docs.rs/taos). + + + + + +```js +function TMQConsumer(config) + +function subscribe(topic) + +function consume(timeout) + +function subscription() + +function unsubscribe() + +function commit(msg) + +function close() +``` + + + + + +```csharp +ConsumerBuilder(IEnumerable> config) + +virtual IConsumer Build() + +Consumer(ConsumerBuilder builder) + +void Subscribe(IEnumerable topics) + +void Subscribe(string topic) + +ConsumeResult Consume(int millisecondsTimeout) + +List Subscription() + +void Unsubscribe() + +void Commit(ConsumeResult consumerResult) + +void Close() +``` + + + + +## Insert Data into TDengine + +A database including one supertable and two subtables is created as follows: + +```sql +DROP DATABASE IF EXISTS tmqdb; +CREATE DATABASE tmqdb; +CREATE TABLE tmqdb.stb (ts TIMESTAMP, c1 INT, c2 FLOAT, c3 VARCHAR(16) TAGS(t1 INT, t3 VARCHAR(16)); +CREATE TABLE tmqdb.ctb0 USING tmqdb.stb TAGS(0, "subtable0"); +CREATE TABLE tmqdb.ctb1 USING tmqdb.stb TAGS(1, "subtable1"); +INSERT INTO tmqdb.ctb0 VALUES(now, 0, 0, 'a0')(now+1s, 0, 0, 'a00'); +INSERT INTO tmqdb.ctb1 VALUES(now, 1, 1, 'a1')(now+1s, 11, 11, 'a11'); +``` + +## Create a Topic + +The following SQL statement creates a topic in TDengine: + +```sql +CREATE TOPIC topic_name AS SELECT ts, c1, c2, c3 FROM tmqdb.stb WHERE c1 > 1; +``` + +Multiple subscription types are supported. + +#### Subscribe to a Column + +Syntax: + +```sql +CREATE TOPIC topic_name as subquery +``` + +You can subscribe to a topic through a SELECT statement. Statements that specify columns, such as `SELECT *` and `SELECT ts, cl` are supported, as are filtering conditions and scalar functions. Aggregate functions and time window aggregation are not supported. Note: + +- The schema of topics created in this manner is determined by the subscribed data. +- You cannot modify (`ALTER MODIFY`) or delete (`ALTER
DROP`) columns or tags that are used in a subscription or calculation. +- Columns added to a table after the subscription is created are not displayed in the results. Deleting columns will cause an error. + +### Subscribe to a Supertable + +Syntax: + +```sql +CREATE TOPIC topic_name AS STABLE stb_name +``` + +Creating a topic in this manner differs from a `SELECT * from stbName` statement as follows: + +- The table schema can be modified. +- Unstructured data is returned. The format of the data returned changes based on the supertable schema. +- A different table schema may exist for every data block to be processed. +- The data returned does not include tags. + +### Subscribe to a Database + +Syntax: + +```sql +CREATE TOPIC topic_name [WITH META] AS DATABASE db_name; +``` + +This SQL statement creates a subscription to all tables in the database. You can add the `WITH META` parameter to include schema changes in the subscription, including creating and deleting supertables; adding, deleting, and modifying columns; and creating, deleting, and modifying the tags of subtables. Consumers can determine the message type from the API. Note that this differs from Kafka. + +## Create a Consumer + +You configure the following parameters when creating a consumer: + +| Parameter | Type | Description | Remarks | +| :----------------------------: | :-----: | -------------------------------------------------------- | ------------------------------------------- | +| `td.connect.ip` | string | Used in establishing a connection; same as `taos_connect` | | +| `td.connect.user` | string | Used in establishing a connection; same as `taos_connect` | | +| `td.connect.pass` | string | Used in establishing a connection; same as `taos_connect` | | +| `td.connect.port` | string | Used in establishing a connection; same as `taos_connect` | | +| `group.id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. | +| `client.id` | string | Client ID | Maximum length: 192. | +| `auto.offset.reset` | enum | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) | +| `enable.auto.commit` | boolean | Commit automatically | Specify `true` or `false`. | +| `auto.commit.interval.ms` | integer | Interval for automatic commits, in milliseconds | +| `enable.heartbeat.background` | boolean | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | | +| `experimental.snapshot.enable` | boolean | Specify whether to consume messages from the WAL or from TSBS | | +| `msg.with.table.name` | boolean | Specify whether to deserialize table names from messages | + +The method of specifying these parameters depends on the language used: + + + + +```c +/* Create consumer groups on demand (group.id) and enable automatic commits (enable.auto.commit), + an automatic commit interval (auto.commit.interval.ms), and a username (td.connect.user) and password (td.connect.pass) */ +tmq_conf_t* conf = tmq_conf_new(); +tmq_conf_set(conf, "enable.auto.commit", "true"); +tmq_conf_set(conf, "auto.commit.interval.ms", "1000"); +tmq_conf_set(conf, "group.id", "cgrpName"); +tmq_conf_set(conf, "td.connect.user", "root"); +tmq_conf_set(conf, "td.connect.pass", "taosdata"); +tmq_conf_set(conf, "auto.offset.reset", "earliest"); +tmq_conf_set(conf, "experimental.snapshot.enable", "true"); +tmq_conf_set(conf, "msg.with.table.name", "true"); +tmq_conf_set_auto_commit_cb(conf, tmq_commit_cb_print, NULL); + +tmq_t* tmq = tmq_consumer_new(conf, NULL, 0); +tmq_conf_destroy(conf); +``` + + + + +Java programs use the following parameters: + +| Parameter | Type | Description | Remarks | +| ----------------------------- | ------ | ----------------------------------------------------------------------------------------------------------------------------- | +| `bootstrap.servers` | string |Connection address, such as `localhost:6030` | +| `value.deserializer` | string | Value deserializer; to use this method, implement the `com.taosdata.jdbc.tmq.Deserializer` interface or inherit the `com.taosdata.jdbc.tmq.ReferenceDeserializer` type | +| `value.deserializer.encoding` | string | Specify the encoding for string deserialization | | + +Note: The `bootstrap.servers` parameter is used instead of `td.connect.ip` and `td.connect.port` to provide an interface that is consistent with Kafka. + +```java +Properties properties = new Properties(); +properties.setProperty("enable.auto.commit", "true"); +properties.setProperty("auto.commit.interval.ms", "1000"); +properties.setProperty("group.id", "cgrpName"); +properties.setProperty("bootstrap.servers", "127.0.0.1:6030"); +properties.setProperty("td.connect.user", "root"); +properties.setProperty("td.connect.pass", "taosdata"); +properties.setProperty("auto.offset.reset", "earliest"); +properties.setProperty("msg.with.table.name", "true"); +properties.setProperty("value.deserializer", "com.taos.example.MetersDeserializer"); + +TaosConsumer consumer = new TaosConsumer<>(properties); + +/* value deserializer definition. */ +import com.taosdata.jdbc.tmq.ReferenceDeserializer; + +public class MetersDeserializer extends ReferenceDeserializer { +} +``` + + + + + +```go +config := tmq.NewConfig() +defer config.Destroy() +err = config.SetGroupID("test") +if err != nil { + panic(err) +} +err = config.SetAutoOffsetReset("earliest") +if err != nil { + panic(err) +} +err = config.SetConnectIP("127.0.0.1") +if err != nil { + panic(err) +} +err = config.SetConnectUser("root") +if err != nil { + panic(err) +} +err = config.SetConnectPass("taosdata") +if err != nil { + panic(err) +} +err = config.SetConnectPort("6030") +if err != nil { + panic(err) +} +err = config.SetMsgWithTableName(true) +if err != nil { + panic(err) +} +err = config.EnableHeartBeat() +if err != nil { + panic(err) +} +err = config.EnableAutoCommit(func(result *wrapper.TMQCommitCallbackResult) { + if result.ErrCode != 0 { + errStr := wrapper.TMQErr2Str(result.ErrCode) + err := errors.NewError(int(result.ErrCode), errStr) + panic(err) + } +}) +if err != nil { + panic(err) +} +``` + + + + + +```rust +let mut dsn: Dsn = "taos://".parse()?; +dsn.set("group.id", "group1"); +dsn.set("client.id", "test"); +dsn.set("auto.offset.reset", "earliest"); + +let tmq = TmqBuilder::from_dsn(dsn)?; + +let mut consumer = tmq.build()?; +``` + + + + + +Python programs use the following parameters: + +| Parameter | Type | Description | Remarks | +| :----------------------------: | :----: | -------------------------------------------------------- | ------------------------------------------- | +| `td_connect_ip` | string | Used in establishing a connection; same as `taos_connect` | | +| `td_connect_user` | string | Used in establishing a connection; same as `taos_connect` | | +| `td_connect_pass` | string | Used in establishing a connection; same as `taos_connect` | | +| `td_connect_port` | string | Used in establishing a connection; same as `taos_connect` | | +| `group_id` | string | Consumer group ID; consumers with the same ID are in the same group | **Required**. Maximum length: 192. | +| `client_id` | string | Client ID | Maximum length: 192. | +| `auto_offset_reset` | string | Initial offset for the consumer group | Specify `earliest`, `latest`, or `none`(default) | +| `enable_auto_commit` | string | Commit automatically | Specify `true` or `false`. | +| `auto_commit_interval_ms` | string | Interval for automatic commits, in milliseconds | +| `enable_heartbeat_background` | string | Backend heartbeat; if enabled, the consumer does not go offline even if it has not polled for a long time | Specify `true` or `false`. | +| `experimental_snapshot_enable` | string | Specify whether to consume messages from the WAL or from TSBS | Specify `true` or `false`. | +| `msg_with_table_name` | string | Specify whether to deserialize table names from messages | Specify `true` or `false`. +| `timeout` | int | Consumer pull timeout | | + + + + + +```js +// Create consumer groups on demand (group.id) and enable automatic commits (enable.auto.commit), +// an automatic commit interval (auto.commit.interval.ms), and a username (td.connect.user) and password (td.connect.pass) + +let consumer = taos.consumer({ + 'enable.auto.commit': 'true', + 'auto.commit.interval.ms','1000', + 'group.id': 'tg2', + 'td.connect.user': 'root', + 'td.connect.pass': 'taosdata', + 'auto.offset.reset','earliest', + 'msg.with.table.name': 'true', + 'td.connect.ip','127.0.0.1', + 'td.connect.port','6030' + }); +``` + + + + + +```csharp +using TDengineTMQ; + +// Create consumer groups on demand (GourpID) and enable automatic commits (EnableAutoCommit), +// an automatic commit interval (AutoCommitIntervalMs), and a username (TDConnectUser) and password (TDConnectPasswd) +var cfg = new ConsumerConfig + { + EnableAutoCommit = "true" + AutoCommitIntervalMs = "1000" + GourpId = "TDengine-TMQ-C#", + TDConnectUser = "root", + TDConnectPasswd = "taosdata", + AutoOffsetReset = "earliest" + MsgWithTableName = "true", + TDConnectIp = "127.0.0.1", + TDConnectPort = "6030" + }; + +var consumer = new ConsumerBuilder(cfg).Build(); + +``` + + + + + +A consumer group is automatically created when multiple consumers are configured with the same consumer group ID. + +## Subscribe to a Topic + +A single consumer can subscribe to multiple topics. + + + + +```c +// Create a list of subscribed topics +tmq_list_t* topicList = tmq_list_new(); +tmq_list_append(topicList, "topicName"); +// Enable subscription +tmq_subscribe(tmq, topicList); +tmq_list_destroy(topicList); + +``` + + + + +```java +List topics = new ArrayList<>(); +topics.add("tmq_topic"); +consumer.subscribe(topics); +``` + + + + +```go +consumer, err := tmq.NewConsumer(config) +if err != nil { + panic(err) +} +err = consumer.Subscribe([]string{"example_tmq_topic"}) +if err != nil { + panic(err) +} +``` + + + + +```rust +consumer.subscribe(["tmq_meters"]).await?; +``` + + + + + +```python +consumer = TaosConsumer('topic_ctb_column', group_id='vg2') +``` + + + + + +```js +// Create a list of subscribed topics +let topics = ['topic_test'] + +// Enable subscription +consumer.subscribe(topics); +``` + + + + + +```csharp +// Create a list of subscribed topics +List topics = new List(); +topics.add("tmq_topic"); +// Enable subscription +consumer.Subscribe(topics); +``` + + + + + +## Consume messages + +The following code demonstrates how to consume the messages in a queue. + + + + +```c +## Consume data +while (running) { + TAOS_RES* msg = tmq_consumer_poll(tmq, timeOut); + msg_process(msg); +} +``` + +The `while` loop obtains a message each time it calls `tmq_consumer_poll()`. This message is exactly the same as the result returned by a query, and the same deserialization API can be used on it. + + + + +```java +while(running){ + ConsumerRecords meters = consumer.poll(Duration.ofMillis(100)); + for (Meters meter : meters) { + processMsg(meter); + } +} +``` + + + + + +```go +for { + result, err := consumer.Poll(time.Second) + if err != nil { + panic(err) + } + fmt.Println(result) + consumer.Commit(context.Background(), result.Message) + consumer.FreeMessage(result.Message) +} +``` + + + + + +```rust +{ + let mut stream = consumer.stream(); + + while let Some((offset, message)) = stream.try_next().await? { + // get information from offset + + // the topic + let topic = offset.topic(); + // the vgroup id, like partition id in kafka. + let vgroup_id = offset.vgroup_id(); + println!("* in vgroup id {vgroup_id} of topic {topic}\n"); + + if let Some(data) = message.into_data() { + while let Some(block) = data.fetch_raw_block().await? { + // one block for one table, get table name if needed + let name = block.table_name(); + let records: Vec = block.deserialize().try_collect()?; + println!( + "** table: {}, got {} records: {:#?}\n", + name.unwrap(), + records.len(), + records + ); + } + } + consumer.commit(offset).await?; + } +} +``` + + + + +```python +for msg in consumer: + for row in msg: + print(row) +``` + + + + + +```js +while(true){ + msg = consumer.consume(200); + // process message(consumeResult) + console.log(msg.topicPartition); + console.log(msg.block); + console.log(msg.fields) +} +``` + + + + + +```csharp +## Consume data +while (true) +{ + var consumerRes = consumer.Consume(100); + // process ConsumeResult + ProcessMsg(consumerRes); + consumer.Commit(consumerRes); +} +``` + + + + + +## Close the consumer + +After message consumption is finished, the consumer is unsubscribed. + + + + +```c +/* Unsubscribe */ +tmq_unsubscribe(tmq); + +/* Close consumer object */ +tmq_consumer_close(tmq); +``` + + + + +```java +/* Unsubscribe */ +consumer.unsubscribe(); + +/* Close consumer */ +consumer.close(); +``` + + + + + +```go +consumer.Close() +``` + + + + + +```rust +consumer.unsubscribe().await; +``` + + + + + +```py +# Unsubscribe +consumer.unsubscribe() +# Close consumer +consumer.close() +``` + + + + +```js +consumer.unsubscribe(); +consumer.close(); +``` + + + + + +```csharp +// Unsubscribe +consumer.Unsubscribe(); + +// Close consumer +consumer.Close(); +``` + + + + + +## Delete a Topic + +You can delete topics that are no longer useful. Note that you must unsubscribe all consumers from a topic before deleting it. + +```sql +/* Delete topic/ +DROP TOPIC topic_name; +``` + +## Check Status + +1. Query all existing topics. + +```sql +SHOW TOPICS; +``` + +2. Query the status and subscribed topics of all consumers. + +```sql +SHOW CONSUMERS; +``` + +3. Query the relationships between consumers and vgroups. + +```sql +SHOW SUBSCRIPTIONS; +``` + +## Examples + +The following section shows sample code in various languages. + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + diff --git a/docs/en/07-develop/08-cache.md b/docs/en/07-develop/08-cache.md index 8e86eff7414a02ad36a965eb092b8b9b65343301..4892c21c9ddb97b3f967053ee64be24f8cb78c85 100644 --- a/docs/en/07-develop/08-cache.md +++ b/docs/en/07-develop/08-cache.md @@ -1,52 +1,49 @@ --- -sidebar_label: Cache -title: Cache -description: "Caching System inside TDengine" +sidebar_label: Caching +title: Caching +description: "This document describes the caching component of TDengine." --- -To achieve the purpose of high performance data writing and querying, TDengine employs a lot of caching technologies in both server side and client side. +TDengine uses various kinds of caching techniques to efficiently write and query data. This document describes the caching component of TDengine. ## Write Cache -The cache management policy in TDengine is First-In-First-Out (FIFO). FIFO is also known as insert driven cache management policy and it is different from read driven cache management, which is more commonly known as Least-Recently-Used (LRU). FIFO simply stores the latest data in cache and flushes the oldest data in cache to disk, when the cache usage reaches a threshold. In IoT use cases, it is the current state i.e. the latest or most recent data that is important. The cache policy in TDengine, like much of the design and architecture of TDengine, is based on the nature of IoT data. +TDengine uses an insert-driven cache management policy, known as first in, first out (FIFO). This policy differs from read-driven "least recently used (LRU)" cache management. A FIFO policy stores the latest data in cache and flushes the oldest data from cache to disk when the cache usage reaches a threshold. In IoT use cases, the most recent data or the current state is most important. The cache policy in TDengine, like much of the design and architecture of TDengine, is based on the nature of IoT data. -The memory space used by each vnode as write cache is determined when creating a database. Parameter `vgroups` and `buffer` can be used to specify the number of vnode and the size of write cache for each vnode when creating the database. Then, the total size of write cache for this database is `vgroups * buffer`. +When you create a database, you can configure the size of the write cache on each vnode. The **vgroups** parameter determines the number of vgroups that process data in the database, and the **buffer** parameter determines the size of the write cache for each vnode. ```sql create database db0 vgroups 100 buffer 16MB ``` -The above statement creates a database of 100 vnodes while each vnode has a write cache of 16MB. - -Even though in theory it's always better to have a larger cache, the extra effect would be very minor once the size of cache grows beyond a threshold. So normally it's enough to use the default value of `buffer` parameter. +In theory, larger cache sizes are always better. However, at a certain point, it becomes impossible to improve performance by increasing cache size. In most scenarios, you can retain the default cache settings. ## Read Cache -When creating a database, it's also possible to specify whether to cache the latest data of each sub table, using parameter `cachelast`. There are 3 cases: -- 0: No cache for latest data -- 1: The last row of each table is cached, `last_row` function can benefit significantly from it -- 2: The latest non-NULL value of each column for each table is cached, `last` function can benefit very much when there is no `where`, `group by`, `order by` or `interval` clause -- 3: Bot hthe last row and the latest non-NULL value of each column for each table are cached, identical to the behavior of both 1 and 2 are set together - +When you create a database, you can configure whether the latest data from every subtable is cached. To do so, set the *cachelast* parameter as follows: +- 0: Caching is disabled. +- 1: The latest row of data in each subtable is cached. This option significantly improves the performance of the `LAST_ROW` function +- 2: The latest non-null value in each column of each subtable is cached. This option significantly improves the performance of the `LAST` function in normal situations, such as WHERE, ORDER BY, GROUP BY, and INTERVAL statements. +- 3: Rows and columns are both cached. This option is equivalent to simultaneously enabling options 1 and 2. -## Meta Cache +## Metadata Cache -To process data writing and querying efficiently, each vnode caches the metadata that's already retrieved. Parameters `pages` and `pagesize` are used to specify the size of metadata cache for each vnode. +To improve query and write performance, each vnode caches the metadata that it receives. When you create a database, you can configure the size of the metadata cache through the *pages* and *pagesize* parameters. ```sql create database db0 pages 128 pagesize 16kb ``` -The above statement will create a database db0 each of whose vnode is allocated a meta cache of `128 * 16 KB = 2 MB` . +The preceding SQL statement creates 128 pages on each vnode in the `db0` database. Each page has a 16 KB metadata cache. ## File System Cache -TDengine utilizes WAL to provide basic reliability. The essential of WAL is to append data in a disk file, so the file system cache also plays an important role in the writing performance. Parameter `wal` can be used to specify the policy of writing WAL, there are 2 cases: -- 1: Write data to WAL without calling fsync, the data is actually written to the file system cache without flushing immediately, in this way you can get better write performance -- 2: Write data to WAL and invoke fsync, the data is immediately flushed to disk, in this way you can get higher reliability +TDengine implements data reliability by means of a write-ahead log (WAL). Writing data to the WAL is essentially writing data to the disk in an ordered, append-only manner. For this reason, the file system cache plays an important role in write performance. When you create a database, you can use the *wal* parameter to choose higher performance or higher reliability. +- 1: This option writes to the WAL but does not enable fsync. New data written to the WAL is stored in the file system cache but not written to disk. This provides better performance. +- 2: This option writes to the WAL and enables fsync. New data written to the WAL is immediately written to disk. This provides better data reliability. ## Client Cache -To improve the overall efficiency of processing data, besides the above caches, the core library `libtaos.so` (also referred to as `taosc`) which all client programs depend on also has its own cache. `taosc` caches the metadata of the databases, super tables, tables that the invoking client has accessed, plus other critical metadata such as the cluster topology. +In addition to the server-side caching discussed previously, the core client library `libtaos.so` also makes use of caching. TDengine Client caches the metadata of all databases, supertables, and subtables that it has accessed, as well as the cluster topology. -When multiple client programs are accessing a TDengine cluster, if one of the clients modifies some metadata, the cache may become invalid in other clients. If this case happens, the client programs need to "reset query cache" to invalidate the whole cache so that `taosc` is enforced to repull the metadata it needs to rebuild the cache. +If a client modifies certain metadata while multiple clients are simultaneously accessing a TDengine cluster, the metadata caches on each client may fail or become out of sync. If this occurs, run the `reset query cache` command on the affected clientsto force them to obtain fresh metadata and reset their caches. diff --git a/docs/en/07-develop/09-udf.md b/docs/en/07-develop/09-udf.md index 49bc95bd91a4c31d42d2b21ef05d69225f1bd963..172a11f34115b1ff5887b894a401e22ee7d72af6 100644 --- a/docs/en/07-develop/09-udf.md +++ b/docs/en/07-develop/09-udf.md @@ -1,240 +1,245 @@ --- sidebar_label: UDF -title: User Defined Functions(UDF) -description: "Scalar functions and aggregate functions developed by users can be utilized by the query framework to expand query capability" +title: User-Defined Functions (UDF) +description: "You can define your own scalar and aggregate functions to expand the query capabilities of TDengine." --- -In some use cases, built-in functions are not adequate for the query capability required by application programs. With UDF, the functions developed by users can be utilized by the query framework to meet business and application requirements. UDF normally takes one column of data as input, but can also support the result of a sub-query as input. +The built-in functions of TDengine may not be sufficient for the use cases of every application. In this case, you can define custom functions for use in TDengine queries. These are known as user-defined functions (UDF). A user-defined function takes one column of data or the result of a subquery as its input. -From version 2.2.0.0, UDF written in C/C++ are supported by TDengine. +TDengine supports user-defined functions written in C or C++. This document describes the usage of user-defined functions. +User-defined functions can be scalar functions or aggregate functions. Scalar functions, such as `abs`, `sin`, and `concat`, output a value for every row of data. Aggregate functions, such as `avg` and `max` output one value for multiple rows of data. -## Types of UDF +When you create a user-defined function, you must implement standard interface functions: +- For scalar functions, implement the `scalarfn` interface function. +- For aggregate functions, implement the `aggfn_start`, `aggfn`, and `aggfn_finish` interface functions. +- To initialize your function, implement the `udf_init` function. To terminate your function, implement the `udf_destroy` function. -Two kinds of functions can be implemented by UDF: scalar functions and aggregate functions. +There are strict naming conventions for these interface functions. The names of the start, finish, init, and destroy interfaces must be _start, _finish, _init, and _destroy, respectively. Replace `scalarfn`, `aggfn`, and `udf` with the name of your user-defined function. -Scalar functions return multiple rows and aggregate functions return either 0 or 1 row. - -In the case of a scalar function you only have to implement the "normal" function template. - -In the case of an aggregate function, in addition to the "normal" function, you also need to implement the "merge" and "finalize" function templates even if the implementation is empty. This will become clear in the sections below. - -### Scalar Function - -As mentioned earlier, a scalar UDF only has to implement the "normal" function template. The function template below can be used to define your own scalar function. - -`void udfNormalFunc(char* data, short itype, short ibytes, int numOfRows, long long* ts, char* dataOutput, char* interBuf, char* tsOutput, int* numOfOutput, short otype, short obytes, SUdfInit* buf)` - -`udfNormalFunc` is the place holder for a function name. A function implemented based on the above template can be used to perform scalar computation on data rows. The parameters are fixed to control the data exchange between UDF and TDengine. - -- Definitions of the parameters: - - - data:input data - - itype:the type of input data, for details please refer to [type definition in column_meta](/reference/rest-api/), for example 4 represents INT - - iBytes:the number of bytes consumed by each value in the input data - - oType:the type of output data, similar to iType - - oBytes:the number of bytes consumed by each value in the output data - - numOfRows:the number of rows in the input data - - ts: the column of timestamp corresponding to the input data - - dataOutput:the buffer for output data, total size is `oBytes * numberOfRows` - - interBuf:the buffer for an intermediate result. Its size is specified by the `BUFSIZE` parameter when creating a UDF. It's normally used when the intermediate result is not same as the final result. This buffer is allocated and freed by TDengine. - - tsOutput:the column of timestamps corresponding to the output data; it can be used to output timestamp together with the output data if it's not NULL - - numOfOutput:the number of rows in output data - - buf:for the state exchange between UDF and TDengine +## Implementing a Scalar Function +The implementation of a scalar function is described as follows: +```c +#include "taos.h" +#include "taoserror.h" +#include "taosudf.h" + +// initialization function. if no initialization, we can skip definition of it. The initialization function shall be concatenation of the udf name and _init suffix +// @return error number defined in taoserror.h +int32_t scalarfn_init() { + // initialization. + return TSDB_CODE_SUCCESS; +} + +// scalar function main computation function +// @param inputDataBlock, input data block composed of multiple columns with each column defined by SUdfColumn +// @param resultColumn, output column +// @return error number defined in taoserror.h +int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn* resultColumn) { + // read data from inputDataBlock and process, then output to resultColumn. + return TSDB_CODE_SUCCESS; +} + +// cleanup function. if no cleanup related processing, we can skip definition of it. The destroy function shall be concatenation of the udf name and _destroy suffix. +// @return error number defined in taoserror.h +int32_t scalarfn_destroy() { + // clean up + return TSDB_CODE_SUCCESS; +} +``` +Replace `scalarfn` with the name of your function. - [add_one.c](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/add_one.c) is one example of a very simple UDF implementation, i.e. one instance of the above `udfNormalFunc` template. It adds one to each value of a passed in column, which can be filtered using the `where` clause, and outputs the result. +## Implementing an Aggregate Function -### Aggregate Function +The implementation of an aggregate function is described as follows: +```c +#include "taos.h" +#include "taoserror.h" +#include "taosudf.h" + +// Initialization function. if no initialization, we can skip definition of it. The initialization function shall be concatenation of the udf name and _init suffix +// @return error number defined in taoserror.h +int32_t aggfn_init() { + // initialization. + return TSDB_CODE_SUCCESS; +} + +// aggregate start function. The intermediate value or the state(@interBuf) is initialized in this function. The function name shall be concatenation of udf name and _start suffix +// @param interbuf intermediate value to intialize +// @return error number defined in taoserror.h +int32_t aggfn_start(SUdfInterBuf* interBuf) { + // initialize intermediate value in interBuf + return TSDB_CODE_SUCESS; +} + +// aggregate reduce function. This function aggregate old state(@interbuf) and one data bock(inputBlock) and output a new state(@newInterBuf). +// @param inputBlock input data block +// @param interBuf old state +// @param newInterBuf new state +// @return error number defined in taoserror.h +int32_t aggfn(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf) { + // read from inputBlock and interBuf and output to newInterBuf + return TSDB_CODE_SUCCESS; +} + +// aggregate function finish function. This function transforms the intermediate value(@interBuf) into the final output(@result). The function name must be concatenation of aggfn and _finish suffix. +// @interBuf : intermediate value +// @result: final result +// @return error number defined in taoserror.h +int32_t int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result) { + // read data from inputDataBlock and process, then output to result + return TSDB_CODE_SUCCESS; +} + +// cleanup function. if no cleanup related processing, we can skip definition of it. The destroy function shall be concatenation of the udf name and _destroy suffix. +// @return error number defined in taoserror.h +int32_t aggfn_destroy() { + // clean up + return TSDB_CODE_SUCCESS; +} +``` +Replace `aggfn` with the name of your function. -For aggregate UDF, as mentioned earlier you must implement a "normal" function template (described above) and also implement the "merge" and "finalize" templates. +## Interface Functions -#### Merge Function Template +There are strict naming conventions for interface functions. The names of the start, finish, init, and destroy interfaces must be _start, _finish, _init, and _destroy, respectively. Replace `scalarfn`, `aggfn`, and `udf` with the name of your user-defined function. -The function template below can be used to define your own merge function for an aggregate UDF. +Interface functions return a value that indicates whether the operation was successful. If an operation fails, the interface function returns an error code. Otherwise, it returns TSDB_CODE_SUCCESS. The error codes are defined in `taoserror.h` and in the common API error codes in `taos.h`. For example, TSDB_CODE_UDF_INVALID_INPUT indicates invalid input. TSDB_CODE_OUT_OF_MEMORY indicates insufficient memory. -`void udfMergeFunc(char* data, int32_t numOfRows, char* dataOutput, int32_t* numOfOutput, SUdfInit* buf)` +For information about the parameters for interface functions, see Data Model -`udfMergeFunc` is the place holder for a function name. The function implemented with the above template is used to aggregate intermediate results and can only be used in the aggregate query for STable. +### Interfaces for Scalar Functions -Definitions of the parameters: + `int32_t scalarfn(SUdfDataBlock* inputDataBlock, SUdfColumn *resultColumn)` + + Replace `scalarfn` with the name of your function. This function performs scalar calculations on data blocks. You can configure a value through the parameters in the `resultColumn` structure. -- data:array of output data, if interBuf is used it's an array of interBuf -- numOfRows:number of rows in `data` -- dataOutput:the buffer for output data, the size is same as that of the final result; If the result is not final, it can be put in the interBuf, i.e. `data`. -- numOfOutput:number of rows in the output data -- buf:for the state exchange between UDF and TDengine +The parameters in the function are defined as follows: + - inputDataBlock: The data block to input. + - resultColumn: The column to output. The column to output. -#### Finalize Function Template +### Interfaces for Aggregate Functions -The function template below can be used to finalize the result of your own UDF, normally used when interBuf is used. +`int32_t aggfn_start(SUdfInterBuf *interBuf)` -`void udfFinalizeFunc(char* dataOutput, char* interBuf, int* numOfOutput, SUdfInit* buf)` +`int32_t aggfn(SUdfDataBlock* inputBlock, SUdfInterBuf *interBuf, SUdfInterBuf *newInterBuf)` -`udfFinalizeFunc` is the place holder of function name, definitions of the parameter are as below: +`int32_t aggfn_finish(SUdfInterBuf* interBuf, SUdfInterBuf *result)` -- dataOutput:buffer for output data -- interBuf:buffer for intermediate result, can be used as input for next processing step -- numOfOutput:number of output data, can only be 0 or 1 for aggregate function -- buf:for state exchange between UDF and TDengine +Replace `aggfn` with the name of your function. In the function, aggfn_start is called to generate a result buffer. Data is then divided between multiple blocks, and aggfn is called on each block to update the result. Finally, aggfn_finish is called to generate final results from the intermediate results. The final result contains only one or zero data points. -### Example abs_max.c +The parameters in the function are defined as follows: + - interBuf: The intermediate result buffer. + - inputBlock: The data block to input. + - newInterBuf: The new intermediate result buffer. + - result: The final result. -[abs_max.c](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/abs_max.c) is an example of a user defined aggregate function to get the maximum from the absolute values of a column. -The internal processing happens as follows. The results of the select statement are divided into multiple row blocks and `udfNormalFunc`, i.e. `abs_max` in this case, is performed on each row block to generate the intermediate results for each sub table. Then `udfMergeFunc`, i.e. `abs_max_merge` in this case, is performed on the intermediate result of sub tables to aggregate and generate the final or intermediate result of STable. The intermediate result of STable is finally processed by `udfFinalizeFunc`, i.e. `abs_max_finalize` in this example, to generate the final result, which contains either 0 or 1 row. +### Initializing and Terminating User-Defined Functions +`int32_t udf_init()` -Other typical aggregation functions such as covariance, can also be implemented using aggregate UDF. +`int32_t udf_destroy()` -## UDF Naming Conventions +Replace `udf`with the name of your function. udf_init initializes the function. udf_destroy terminates the function. If it is not necessary to initialize your function, udf_init is not required. If it is not necessary to terminate your function, udf_destroy is not required. -The naming convention for the 3 kinds of function templates required by UDF is as follows: - - udfNormalFunc, udfMergeFunc, and udfFinalizeFunc are required to have same prefix, i.e. the actual name of udfNormalFunc. The udfNormalFunc doesn't need a suffix following the function name. - - udfMergeFunc should be udfNormalFunc followed by `_merge` - - udfFinalizeFunc should be udfNormalFunc followed by `_finalize`. - -The naming convention is part of TDengine's UDF framework. TDengine follows this convention to invoke the corresponding actual functions. -Depending on whether you are creating a scalar UDF or aggregate UDF, the functions that you need to implement are different. +## Data Structure of User-Defined Functions +```c +typedef struct SUdfColumnMeta { + int16_t type; + int32_t bytes; + uint8_t precision; + uint8_t scale; +} SUdfColumnMeta; + +typedef struct SUdfColumnData { + int32_t numOfRows; + int32_t rowsAlloc; + union { + struct { + int32_t nullBitmapLen; + char *nullBitmap; + int32_t dataLen; + char *data; + } fixLenCol; + + struct { + int32_t varOffsetsLen; + int32_t *varOffsets; + int32_t payloadLen; + char *payload; + int32_t payloadAllocLen; + } varLenCol; + }; +} SUdfColumnData; + +typedef struct SUdfColumn { + SUdfColumnMeta colMeta; + bool hasNull; + SUdfColumnData colData; +} SUdfColumn; + +typedef struct SUdfDataBlock { + int32_t numOfRows; + int32_t numOfCols; + SUdfColumn **udfCols; +} SUdfDataBlock; + +typedef struct SUdfInterBuf { + int32_t bufLen; + char* buf; + int8_t numOfResult; //zero or one +} SUdfInterBuf; +``` +The data structure is described as follows: -- Scalar function:udfNormalFunc is required. -- Aggregate function:udfNormalFunc, udfMergeFunc (if query on STable) and udfFinalizeFunc are required. +- The SUdfDataBlock block includes the number of rows (numOfRows) and number of columns (numCols). udfCols[i] (0 <= i <= numCols-1) indicates that each column is of type SUdfColumn. +- SUdfColumn includes the definition of the data type of the column (colMeta) and the data in the column (colData). +- The member definitions of SUdfColumnMeta are the same as the data type definitions in `taos.h`. +- The data in SUdfColumnData can become longer. varLenCol indicates variable-length data, and fixLenCol indicates fixed-length data. +- SUdfInterBuf defines the intermediate structure `buffer` and the number of results in the buffer `numOfResult`. -For clarity, assuming we want to implement a UDF named "foo": -- If the function is a scalar function, we only need to implement the "normal" function template and it should be named simply `foo`. -- If the function is an aggregate function, we need to implement `foo`, `foo_merge`, and `foo_finalize`. Note that for aggregate UDF, even though one of the three functions is not necessary, there must be an empty implementation. +Additional functions are defined in `taosudf.h` to make it easier to work with these structures. ## Compile UDF -The source code of UDF in C can't be utilized by TDengine directly. UDF can only be loaded into TDengine after compiling to dynamically linked library (DLL). +To use your user-defined function in TDengine, first compile it to a dynamically linked library (DLL). -For example, the example UDF `add_one.c` mentioned earlier, can be compiled into DLL using the command below, in a Linux Shell. +For example, the sample UDF `add_one.c` can be compiled into a DLL as follows: ```bash gcc -g -O0 -fPIC -shared add_one.c -o add_one.so ``` -The generated DLL file `add_one.so` can be used later when creating a UDF. It's recommended to use GCC not older than 7.5. - -## Create and Use UDF - -When a UDF is created in a TDengine instance, it is available across the databases in that instance. - -### Create UDF - -SQL command can be executed on the host where the generated UDF DLL resides to load the UDF DLL into TDengine. This operation cannot be done through REST interface or web console. Once created, any client of the current TDengine can use these UDF functions in their SQL commands. UDF are stored in the management node of TDengine. The UDFs loaded in TDengine would be still available after TDengine is restarted. - -When creating UDF, the type of UDF, i.e. a scalar function or aggregate function must be specified. If the specified type is wrong, the SQL statements using the function would fail with errors. The input type and output type don't need to be the same in UDF, but the input data type and output data type must be consistent with the UDF definition. - -- Create Scalar Function - -```sql -CREATE FUNCTION userDefinedFunctionName AS "/absolute/path/to/userDefinedFunctionName.so" OUTPUTTYPE [BUFSIZE B]; -``` - -- userDefinedFunctionName:The function name to be used in SQL statement which must be consistent with the function name defined by `udfNormalFunc` and is also the name of the compiled DLL (.so file). -- path:The absolute path of the DLL file including the name of the shared object file (.so). The path must be quoted with single or double quotes. -- outputtype:The output data type, the value is the literal string of the supported TDengine data type. -- B:the size of intermediate buffer, in bytes; it is an optional parameter and the range is [0,512]. - -For example, below SQL statement can be used to create a UDF from `add_one.so`. - -```sql -CREATE FUNCTION add_one AS "/home/taos/udf_example/add_one.so" OUTPUTTYPE INT; -``` - -- Create Aggregate Function - -```sql -CREATE AGGREGATE FUNCTION userDefinedFunctionName AS "/absolute/path/to/userDefinedFunctionName.so" OUTPUTTYPE [ BUFSIZE B ]; -``` - -- userDefinedFunctionName:the function name to be used in SQL statement which must be consistent with the function name defined by `udfNormalFunc` and is also the name of the compiled DLL (.so file). -- path:the absolute path of the DLL file including the name of the shared object file (.so). The path needs to be quoted by single or double quotes. -- OUTPUTTYPE:the output data type, the value is the literal string of the type -- B:the size of intermediate buffer, in bytes; it's an optional parameter and the range is [0,512] - -For details about how to use intermediate result, please refer to example program [demo.c](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/demo.c). - -For example, below SQL statement can be used to create a UDF from `demo.so`. - -```sql -CREATE AGGREGATE FUNCTION demo AS "/home/taos/udf_example/demo.so" OUTPUTTYPE DOUBLE bufsize 14; -``` - -### Manage UDF - -- Delete UDF - -``` -DROP FUNCTION ids(X); -``` - -- ids(X):same as that in `CREATE FUNCTION` statement - -```sql -DROP FUNCTION add_one; -``` - -- Show Available UDF +The generated DLL file `add_one.so` can now be used to implement your function. Note: GCC 7.5 or later is required. -```sql -SHOW FUNCTIONS; -``` - -### Use UDF - -The function name specified when creating UDF can be used directly in SQL statements, just like builtin functions. - -```sql -SELECT X(c) FROM table/STable; -``` - -The above SQL statement invokes function X for column c. - -## Restrictions for UDF - -In current version there are some restrictions for UDF +## Manage and Use User-Defined Functions +After compiling your function into a DLL, you add it to TDengine. For more information, see [User-Defined Functions](../12-taos-sql/26-udf.md). -1. Only Linux is supported when creating and invoking UDF for both client side and server side -2. UDF can't be mixed with builtin functions -3. Only one UDF can be used in a SQL statement -4. Only a single column is supported as input for UDF -5. Once created successfully, UDF is persisted in MNode of TDengineUDF -6. UDF can't be created through REST interface -7. The function name used when creating UDF in SQL must be consistent with the function name defined in the DLL, i.e. the name defined by `udfNormalFunc` -8. The name of a UDF should not conflict with any of TDengine's built-in functions +## Sample Code -## Examples +### Sample scalar function: [bit_and](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/bit_and.c) -### Scalar function example [add_one](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/add_one.c) +The bit_add function implements bitwise addition for multiple columns. If there is only one column, the column is returned. The bit_add function ignores null values.
-add_one.c +bit_and.c ```c -{{#include tests/script/sh/add_one.c}} +{{#include tests/script/sh/bit_and.c}} ```
-### Aggregate function example [abs_max](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/abs_max.c) - -
-abs_max.c - -```c -{{#include tests/script/sh/abs_max.c}} -``` - -
+### Sample aggregate function: [l2norm](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/l2norm.c) -### Example for using intermediate result [demo](https://github.com/taosdata/TDengine/blob/develop/tests/script/sh/demo.c) +The l2norm function finds the second-order norm for all data in the input column. This squares the values, takes a cumulative sum, and finds the square root.
-demo.c +l2norm.c ```c -{{#include tests/script/sh/demo.c}} +{{#include tests/script/sh/l2norm.c}} ```
diff --git a/docs/en/07-develop/_sub_c.mdx b/docs/en/07-develop/_sub_c.mdx index da492a0269f064d8cdf9dfb80969894131d94015..b0667268e9978533e84e68ea3fe5f285538df762 100644 --- a/docs/en/07-develop/_sub_c.mdx +++ b/docs/en/07-develop/_sub_c.mdx @@ -1,3 +1,3 @@ ```c -{{#include docs/examples/c/subscribe_demo.c}} -``` \ No newline at end of file +{{#include docs/examples/c/tmq_example.c}} +``` diff --git a/docs/en/07-develop/_sub_python.mdx b/docs/en/07-develop/_sub_python.mdx index 490b76fca6deb61e61dc59c2096b30742a7d25f7..1309da5b416799492a6b85aae4b775e227c0ad6e 100644 --- a/docs/en/07-develop/_sub_python.mdx +++ b/docs/en/07-develop/_sub_python.mdx @@ -1,3 +1,3 @@ ```py -{{#include docs/examples/python/subscribe_demo.py}} -``` \ No newline at end of file +{{#include docs/examples/python/tmq_example.py}} +``` diff --git a/docs/en/07-develop/index.md b/docs/en/07-develop/index.md index e3f55f290753f79ac1708337082ce90bb050b21f..1ef5e23f72f707f7a9decce6ea0bfed8fd642c0c 100644 --- a/docs/en/07-develop/index.md +++ b/docs/en/07-develop/index.md @@ -2,13 +2,12 @@ title: Developer Guide --- -To develop an application to process time-series data using TDengine, we recommend taking the following steps: - -1. Choose the method to connect to TDengine. No matter what programming language you use, you can always use the REST interface to access TDengine, but you can also use connectors unique to each programming language. -2. Design the data model based on your own use cases. Learn the [concepts](/concept/) of TDengine including "one table for one data collection point" and the "super table" (STable) concept; learn about static labels, collected metrics, and subtables. Depending on the characteristics of your data and your requirements, you may decide to create one or more databases, and you should design the STable schema to fit your data. +Before creating an application to process time-series data with TDengine, consider the following: +1. Choose the method to connect to TDengine. TDengine offers a REST API that can be used with any programming language. It also has connectors for a variety of languages. +2. Design the data model based on your own use cases. Consider the main [concepts](/concept/) of TDengine, including "one table per data collection point" and the supertable. Learn about static labels, collected metrics, and subtables. Depending on the characteristics of your data and your requirements, you decide to create one or more databases and design a supertable schema that fit your data. 3. Decide how you will insert data. TDengine supports writing using standard SQL, but also supports schemaless writing, so that data can be written directly without creating tables manually. 4. Based on business requirements, find out what SQL query statements need to be written. You may be able to repurpose any existing SQL. -5. If you want to run real-time analysis based on time series data, including various dashboards, it is recommended that you use the TDengine continuous query feature instead of deploying complex streaming processing systems such as Spark or Flink. +5. If you want to run real-time analysis based on time series data, including various dashboards, use the TDengine stream processing component instead of deploying complex systems such as Spark or Flink. 6. If your application has modules that need to consume inserted data, and they need to be notified when new data is inserted, it is recommended that you use the data subscription function provided by TDengine without the need to deploy Kafka. 7. In many use cases (such as fleet management), the application needs to obtain the latest status of each data collection point. It is recommended that you use the cache function of TDengine instead of deploying Redis separately. 8. If you find that the SQL functions of TDengine cannot meet your requirements, then you can use user-defined functions to solve the problem.