14-stream.md 8.5 KB
Newer Older
1
---
2
title: Stream Processing
D
danielclow 已提交
3 4
sidebar_label: Stream Processing
description: This document describes the SQL statements related to the stream processing component of TDengine.
5 6
---

7
Raw time-series data is often cleaned and preprocessed before being permanently stored in a database. Stream processing components like Kafka, Flink, and Spark are often deployed alongside a time-series database to handle these operations, increasing system complexity and maintenance costs.
8

9
Because stream processing is built in to TDengine, you are no longer reliant on middleware. TDengine offers a unified platform for writing, preprocessing, permanent storage, complex analysis, and real-time computation and alerting. Additionally, you can use SQL to perform all these tasks.
10

11
## Create a Stream
12 13

```sql
L
Liu Jicong 已提交
14
CREATE STREAM [IF NOT EXISTS] stream_name [stream_options] INTO stb_name SUBTABLE(expression) AS subquery
15
stream_options: {
H
Haojun Liao 已提交
16 17 18 19 20
 TRIGGER        [AT_ONCE | WINDOW_CLOSE | MAX_DELAY time]
 WATERMARK      time
 IGNORE EXPIRED [0|1]
 DELETE_MARK    time
 FILL_HISTORY   [0|1]
21 22 23 24
}

```

25
The subquery is a subset of standard SELECT query syntax:
26 27 28 29 30 31 32 33 34

```sql
subquery: SELECT [DISTINCT] select_list
    from_clause
    [WHERE condition]
    [PARTITION BY tag_list]
    [window_clause]
```

35
Session windows, state windows, and sliding windows are supported. When you configure a session or state window for a supertable, you must use PARTITION BY TBNAME.
36

L
Liu Jicong 已提交
37 38
Subtable Clause defines the naming rules of auto-created subtable, you can see more details in below part: Partitions of Stream.

39 40 41 42 43 44 45 46 47 48 49
```sql
window_clause: {
    SESSION(ts_col, tol_val)
  | STATE_WINDOW(col)
  | INTERVAL(interval_val [, interval_offset]) [SLIDING (sliding_val)]
}
```

`SESSION` indicates a session window, and `tol_val` indicates the maximum range of the time interval. If the time interval between two continuous rows are within the time interval specified by `tol_val` they belong to the same session window; otherwise a new session window is started automatically.

For example, the following SQL statement creates a stream and automatically creates a supertable named `avg_vol`. The stream has a 1 minute time window that slides forward in 30 second intervals to calculate the average voltage of the meters supertable.
50 51 52

```sql
CREATE STREAM avg_vol_s INTO avg_vol AS
L
Liu Jicong 已提交
53
SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname INTERVAL(1m) SLIDING(30s);
54 55
```

L
Liu Jicong 已提交
56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96
## Partitions of Stream

A Stream can process data in multiple partitions. Partition rules can be defined by PARTITION BY clause in stream processing. Each partition will have different timelines and windows, and will be processed separately and be written into different subtables of target supertable.

If a stream is created without PARTITION BY clause, all data will be written into one subtable.

If a stream is created with PARTITION BY clause without SUBTABLE clause, each partition will be given a random name. 

If a stream is created with PARTITION BY clause and SUBTABLE clause, the name of each partition will be calculated according to SUBTABLE clause. For example:

```sql
CREATE STREAM avg_vol_s INTO avg_vol SUBTABLE(CONCAT('new-', tname)) AS SELECT _wstart, count(*), avg(voltage) FROM meters PARTITION BY tbname tname INTERVAL(1m);
```

IN PARTITION clause, 'tbname', representing each subtable name of source supertable, is given alias 'tname'. And 'tname' is used in SUBTABLE clause. In SUBTABLE clause, each auto created subtable will concat 'new-' and source subtable name as their name. Other expressions are also allowed in SUBTABLE clause, but the output type must be varchar.

If the output length exceeds the limitation of TDengine(192), the name will be truncated. If the generated name is occupied by some other table, the creation and writing of the new subtable will be failed.

## Filling history data

Normally a stream does not process data already or being written into source table when it's being creating. But adding FILL_HISTORY 1 as a stream option when creating the stream will allow it to process data written before and while creating the stream. For example:

```sql
create stream if not exists s1 fill_history 1 into st1  as select count(*) from t1 interval(10s)
```

Combining fill_history option and where clause, stream can processing data of specific time range. For example, only process data after a past time. (In this case, 2020-01-30)

```sql
create stream if not exists s1 fill_history 1 into st1  as select count(*) from t1 where ts > '2020-01-30' interval(10s)
```

As another example, only processing data starting from some past time, and ending at some future time.

```sql
create stream if not exists s1 fill_history 1 into st1  as select count(*) from t1 where ts > '2020-01-30' and ts < '2023-01-01' interval(10s)
```

If some streams are totally outdated, and you do not want it to monitor or process anymore, those streams can be manually dropped and output data will be still kept.


97
## Delete a Stream
98 99

```sql
100
DROP STREAM [IF EXISTS] stream_name
101 102
```

103
This statement deletes the stream processing service only. The data generated by the stream is retained.
104

105
## View Streams
106 107 108 109 110

```sql
SHOW STREAMS;
```

111
## Trigger Stream Processing
112

113
When you create a stream, you can use the TRIGGER parameter to specify triggering conditions for it.
114

J
jiajingbin 已提交
115
For non-windowed processing, triggering occurs in real time. For windowed processing, there are three methods of triggering,the default value is AT_ONCE:
116

117
1. AT_ONCE: triggers on write
118

119
2. WINDOW_CLOSE: triggers when the window closes. This is determined by the event time. You can use WINDOW_CLOSE together with `watermark`. For more information, see Stream Processing Strategy for Out-of-Order Data.
120

121
3. MAX_DELAY: triggers when the window closes. If the window has not closed but the time elapsed exceeds MAX_DELAY, stream processing is also triggered.
122

123
Because the window closing is determined by the event time, a delay or termination of an event stream will prevent the event time from being updated. This may result in an inability to obtain the latest results.
124

125
For this reason, MAX_DELAY is provided as a way to ensure that processing occurs even if the window does not close.
126

127
MAX_DELAY also triggers when the window closes. Additionally, if a write occurs but the processing is not triggered before MAX_DELAY expires, processing is also triggered. 
128

129
## Stream Processing Strategy for Out-of-Order Data
130

131
When you create a stream, you can specify a watermark in the `stream_option` parameter.
132

133
The watermark is used to specify the tolerance for out-of-order data. The default value is 0.
134

135
T = latest event time - watermark
136

137
The window closing time for each batch of data that arrives at the system is updated using the preceding formula, and all windows are closed whose closing time is less than T. If the triggering method is WINDOW_CLOSE or MAX_DELAY, the aggregate result for the window is pushed.
138

139 140
Stream processing strategy for expired data
The data in expired windows is tagged as expired. TDengine stream processing provides two methods for handling such data:
141

142
1. Drop the data. This is the default and often only handling method for most stream processing engines.
143

144
2. Recalculate the data. In this method, all data in the window is reobtained from the database and recalculated. The latest results are then returned.
145

146
In both of these methods, configuring the watermark is essential for obtaining accurate results (if expired data is dropped) and avoiding repeated triggers that affect system performance (if expired data is recalculated).
H
Haojun Liao 已提交
147 148 149

## Supported functions

H
Haojun Liao 已提交
150
All [scalar functions](../function/#scalar-functions) are available in stream processing. All [System information functions](../function/#system-information-functions) are <b>not</b> allowed in stream processing. All [Aggregate functions](../function/#aggregate-functions)  and  [Selection functions](../function/#selection-functions) are available in stream processing, except the followings:
H
Haojun Liao 已提交
151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169
  - [leastsquares](../function/#leastsquares)
  - [percentile](../function/#percentile)
  - [top](../function/#leastsquares)
  - [bottom](../function/#top)
  - [elapsed](../function/#leastsquares)
  - [interp](../function/#elapsed)
  - [derivative](../function/#derivative)
  - [irate](../function/#irate)
  - [twa](../function/#twa)
  - [histogram](../function/#histogram)
  - [diff](../function/#diff)
  - [statecount](../function/#statecount)
  - [stateduration](../function/#stateduration)
  - [csum](../function/#csum)
  - [mavg](../function/#mavg)
  - [sample](../function/#sample)
  - [tail](../function/#tail)
  - [unique](../function/#unique)
  - [mode](../function/#mode)
H
Haojun Liao 已提交
170