async-streams.md 16.4 KB
Newer Older
1 2 3
async-streams (C# 8.0)
----------------------

4
Async-streams are asynchronous variants of enumerables, where getting the next element may involve an asynchronous operation. They are types that implement `IAsyncEnumerable<T>`.
5

6
```csharp
7 8 9 10 11
// Those interfaces will ship as part of .NET Core 3
namespace System.Collections.Generic
{
    public interface IAsyncEnumerable<out T>
    {
12
        IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token = default);
13 14 15 16
    }

    public interface IAsyncEnumerator<out T> : System.IAsyncDisposable
    {
17 18
        System.Threading.Tasks.ValueTask<bool> MoveNextAsync();
        T Current { get; }
19 20 21 22 23 24 25 26 27 28 29
    }
}
namespace System
{
    public interface IAsyncDisposable
    {
        System.Threading.Tasks.ValueTask DisposeAsync();
    }
}
```

30 31 32 33 34
When you have an async-stream, you can enumerate its items using an asynchronous `foreach` statement: `await foreach (var item in asyncStream) { ... }`.
An `await foreach` statement is just like a `foreach` statement, but it uses `IAsyncEnumerable` instead of `IEnumerable`, each iteration evaluates an `await MoveNextAsync()`, and the disposable of the enumerator is asynchronous.

Similarly, if you have an async-disposable, you can use and dispose it with asynchronous `using` statement: `await using (var resource = asyncDisposable) { ... }`
An `await using` statement is just like a `using` statement, but it uses `IAsyncDisposable` instead of `IDisposable`, and `await DisposeAsync()` instead of `Dispose()`.
35 36 37 38

The user can implement those interfaces manually, or can take advantage of the compiler generating a state-machine from a user-defined method (called an "async-iterator" method).
An async-iterator method is a method that:
1. is declared `async`,
39 40
2. returns an `IAsyncEnumerable<T>` or `IAsyncEnumerator<T>` type,
3. uses both `await` syntax (`await` expression, `await foreach` or `await using` statements) and `yield` statements (`yield return`, `yield break`).
41 42

For example:
43
```csharp
44 45 46 47 48 49 50 51 52 53 54 55 56 57 58
async IAsyncEnumerable<int> GetValuesFromServer()
{
    while (true)
    {
        IEnumerable<int> batch = await GetNextBatch();
        if (batch == null) yield break;

        foreach (int item in batch)
        {
            yield return item;
        }
    }
}
```

59 60 61
Just like in iterator methods, there are several restrictions on where a yield statement can appear in async-iterator methods:
- It is a compile-time error for a `yield` statement (of either form) to appear in the `finally` clause of a `try` statement.
- It is a compile-time error for a `yield return` statement to appear anywhere in a `try` statement that contains any `catch` clauses.
62

63 64 65 66
### Detailed design for `await using` statement

An asynchronous `using` is lowered just like a regular `using`, except that `Dispose()` is replaced with `await DisposeAsync()`.

67 68 69
Note that pattern-based lookup for `DisposeAsync` binds to instance methods that can be invoked without arguments.
Extension methods do not contribute. The result of `DisposeAsync` must be awaitable.

70
### Detailed design for `await foreach` statement
71

72
An `await foreach` is lowered just like a regular `foreach`, except that:
73
- `GetEnumerator()` is replaced with `await GetAsyncEnumerator()`
74 75
- `MoveNext()` is replaced with `await MoveNextAsync()`
- `Dispose()` is replaced with `await DisposeAsync()`
76

77 78 79
Note that pattern-based lookup for `GetAsyncEnumerator`, `MoveNextAsync` and `DisposeAsync` binds to instance methods that can be invoked without arguments.
Extension methods do not contribute. The result of `MoveNextAsync` and `DisposeAsync` must be awaitable.
Disposal for `await foreach` does not include a fallback to a runtime check for an interface implementation.
80

81 82 83 84 85 86 87
Asynchronous foreach loops are disallowed on collections of type dynamic,
as there is no asynchronous equivalent of the non-generic `IEnumerable` interface.

But wrapper types can pass non-default values (see `.WithCancellation(CancellationToken)` extension method),
thereby allowing consumers of async-streams to control cancellation.
A producer of async-streams can make use of the cancellation token by writing an
`IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken)` async-iterator method in a custom type.
88

89
```csharp
90
E e = ((C)(x)).GetAsyncEnumerator(default);
91 92
try
{
93
    while (await e.MoveNextAsync())
94
    {
95 96
        V v = (V)(T)e.Current;  -OR-  (D1 d1, ...) = (V)(T)e.Current;
        // body
97 98
    }
}
99 100
finally
{
J
Julien Couvreur 已提交
101
    await e.DisposeAsync();
102
}
103 104 105 106
```

### Detailed design for async-iterator methods

107
An async-iterator method is replaced by a kick-off method, which initializes a state machine. It does not start running the state machine (unlike kick-off methods for regular async method).
J
Julien Couvreur 已提交
108
The kick-off method method is marked with `AsyncIteratorStateMachineAttribute`.
109

110 111 112 113 114 115
The state machine for an enumerable async-iterator method primarily implements `IAsyncEnumerable<T>` and `IAsyncEnumerator<T>`.
For an enumerator async-iterator, it only implements `IAsyncEnumerator<T>`.
It is similar to a state machine produced for an async method.
It contains builder and awaiter fields, used to run the state machine in the background (when an `await` is reached in the async-iterator).
It also captures parameter values (if any) or `this` (if needed).

116 117
But it contains additional state:
- a promise of a value-or-end,
118
- a current yielded value of type `T`,
119
- an `int` capturing the id of the thread that created it,
120 121
- a `bool` flag indicating "dispose mode",
- a `CancellationTokenSource` for combining tokens (in enumerables).
122

123
The central method of the state machine is `MoveNext()`. It gets run by `MoveNextAsync()`, or as a background continuation initiated from these from an `await` in the method.
124

125
The promise of a value-or-end is returned from `MoveNextAsync`. It can be fulfilled with either:
126 127 128
- `true` (when a value becomes available following background execution of the state machine),
- `false` (if the end is reached),
- an exception.
129 130 131
The promise is implemented as a `ManualResetValueTaskSourceCore<bool>` (which is a re-usable and allocation-free way of producing and fulfilling `ValueTask<bool>` or `ValueTask` instances)
and its surrounding interfaces on the state machine: `IValueTaskSource<bool>` and `IValueTaskSource`.
See more details about those types at https://blogs.msdn.microsoft.com/dotnet/2018/11/07/understanding-the-whys-whats-and-whens-of-valuetask/
132 133

Compared to the state machine for a regular async method, the `MoveNext()` for an async-iterator method adds logic:
J
tweaks  
Julien Couvreur 已提交
134
- to support handling a `yield return` statement, which saves the current value and fulfills the promise with result `true`,
135
- to support handling a `yield break` statement, which sets the dispose mode on and jumps to the enclosing `finally` or exit,
136
- to dispatch execution to `finally` blocks (when disposing),
137 138
- to exit the method, which disposes the `CancellationTokenSource` (if any) and fulfills the promise with result `false`,
- to catch exceptions, which disposes the `CancellationTokenSource` (if any) and sets the exception into the promise.
139
(The handling of an `await` is unchanged)
140 141

This is reflected in the implementation, which extends the lowering machinery for async methods to:
142 143 144 145
1. handle `yield return` and `yield break` statements (see methods `VisitYieldReturnStatement` and `VisitYieldBreakStatement` to `AsyncIteratorMethodToStateMachineRewriter`),
2. handle `try` statements (see methods `VisitTryStatement` and `VisitExtractedFinallyBlock` in `AsyncIteratorMethodToStateMachineRewriter`)
3. produce additional state and logic for the promise itself (see `AsyncIteratorRewriter`, which produces various other members: `MoveNextAsync`, `Current`, `DisposeAsync`,
and some members supporting the resettable `ValueTask` behavior, namely `GetResult`, `SetStatus`, `OnCompleted`).
146

147
```csharp
148
ValueTask<bool> MoveNextAsync()
149
{
150
    if (state == StateMachineStates.FinishedStateMachine)
151
    {
J
tweaks  
Julien Couvreur 已提交
152
        return default(ValueTask<bool>);
153
    }
154
    valueOrEndPromise.Reset();
155
    var inst = this;
156
    builder.Start(ref inst);
157 158 159 160 161 162
    var version = valueOrEndPromise.Version;
    if (valueOrEndPromise.GetStatus(version) == ValueTaskSourceStatus.Succeeded)
    {
        return new ValueTask<bool>(valueOrEndPromise.GetResult(version));
    }
    return new ValueTask<bool>(this, version); // note this leverages the state machine's implementation of IValueTaskSource<bool>
163 164 165
}
```

166
```csharp
167 168 169 170
T Current => current;
```

The kick-off method and the initialization of the state machine for an async-iterator method follows those for regular iterator methods.
171
In particular, the synthesized `GetAsyncEnumerator()` method is like `GetEnumerator()` except that it sets the initial state to to StateMachineStates.NotStartedStateMachine (-1):
172
```csharp
173
IAsyncEnumerator<T> GetAsyncEnumerator(CancellationToken token)
174 175 176 177
{
    {StateMachineType} result;
    if (initialThreadId == /*managedThreadId*/ && state == StateMachineStates.FinishedStateMachine)
    {
178
        state = InitialState; // -3
179
        builder = AsyncIteratorMethodBuilder.Create();
180
        disposeMode = false;
181 182 183 184
        result = this;
    }
    else
    {
185
        result = new {StateMachineType}(InitialState);
186
    }
187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204
    /* copy each parameter proxy, or in the case of the parameter marked with [EnumeratorCancellation] combine it with `GetAsyncEnumerator`'s `token` parameter */
}
```

For the parameter with `[EnumeratorCancellation]`, `GetAsyncEnumerator` initializes it by combining the two available tokens:
```csharp
if (this.parameterProxy.Equals(default))
{
    result.parameter = token;
}
else if (token.Equals(this.parameterProxy) || token.Equals(default))
{
    result.parameter = this.parameterProxy;
}
else
{
    result.combinedTokens = CancellationTokenSource.CreateLinkedTokenSource(this.parameterProxy, token);
    result.parameter = combinedTokens.Token;
205 206 207 208 209
}
```
For a discussion of the threadID check, see https://github.com/dotnet/corefx/issues/3481

Similarly, the kick-off method is much like those of regular iterator methods:
210
```csharp
211 212 213 214 215
{
    {StateMachineType} result = new {StateMachineType}(StateMachineStates.FinishedStateMachine); // -2
    /* save parameters into parameter proxies */
    return result;
}
216
```
217 218 219 220 221 222 223 224 225

#### Disposal

Iterator and async-iterator methods need disposal because their execution steps are controlled by the caller, which could choose to dispose the enumerator before getting all of its elements.
For example, `foreach (...) { if (...) break; }`.
In contrast, async methods continue running autonomously until they are done. They are never left suspended in the middle of execution from the caller's perspective, so they don't need to be disposed.

In summary, disposal of an async-iterator works based on four design elements:
- `yield return` (jumps to finally when resuming in dispose mode)
226 227
- `yield break` (enters dispose mode and jumps to enclosing finally)
- `finally` (after a `finally` we jump to the next enclosing one)
228 229 230 231 232
- `DisposeAsync` (enters dispose mode and resumes execution)

The caller of an async-iterator method should only call `DisposeAsync()` when the method completed or was suspended by a `yield return`.
`DisposeAsync` sets a flag on the state machine ("dispose mode") and (if the method wasn't completed) resumes the execution from the current state.
The state machine can resume execution from a given state (even those located within a `try`).
233
When the execution is resumed in dispose mode, it jumps straight to the enclosing `finally`.
234
`finally` blocks may involve pauses and resumes, but only for `await` expressions. As a result of the restrictions imposed on `yield return` (described above), dispose mode never runs into a `yield return`.
235
Once a `finally` block completes, the execution in dispose mode jumps to the next enclosing `finally`, or the end of the method once we reach the top-level.
236

237
Reaching a `yield break` also sets the dispose mode flag and jumps to the enclosing `finally` (or end of the method).
238 239 240 241 242 243
By the time we return control to the caller (completing the promise as `false` by reaching the end of the method) all disposal was completed,
and the state machine is left in finished state. So `DisposeAsync()` has no work left to do.

Looking at disposal from the perspective of a given `finally` block, the code in that block can get executed:
- by normal execution (ie. after the code in the `try` block),
- by raising an exception inside the `try` block (which will execute the necessary `finally` blocks and terminate the method in Finished state),
244 245
- by calling `DisposeAsync()` (which resumes execution in dispose mode and jumps to the enclosing finally),
- following a `yield break` (which enters dispose mode and jumps to the enclosing finally),
246 247 248
- in dispose mode, following a nested `finally`.

A `yield return` is lowered as:
249
```csharp
250 251 252 253 254 255 256 257 258 259 260
_current = expression;
_state = <next_state>;
goto <exprReturnTruelabel>; // which does _valueOrEndPromise.SetResult(true); return;

// resuming from state=<next_state> will dispatch execution to this label
<next_state_label>: ;
this.state = cachedState = NotStartedStateMachine;
if (disposeMode) /* jump to enclosing finally or exit */
```

A `yield break` is lowered as:
261
```csharp
262 263 264 265
disposeMode = true;
/* jump to enclosing finally or exit */
```

266
```csharp
267 268
ValueTask IAsyncDisposable.DisposeAsync()
{
269 270 271 272 273
    if (state >= StateMachineStates.NotStartedStateMachine /* -1 */)
    {
        throw new NotSupportedException();
    }
    if (state == StateMachineStates.FinishedStateMachine /* -2 */)
274 275 276
    {
        return default;
    }
277
    disposeMode = true;
278 279 280
    _valueOrEndPromise.Reset();
    var inst = this;
    _builder.Start(ref inst);
281
    return new ValueTask(this, _valueOrEndPromise.Version);  // note this leverages the state machine's implementation of IValueTaskSource
282 283 284 285 286 287
}
```

##### Regular versus extracted finally

When the `finally` clause contains no `await` expressions, a `try/finally` is lowered as:
288
```csharp
289 290 291 292 293 294 295 296 297 298 299 300 301
try
{
    ...
    finallyEntryLabel:
}
finally
{
    ...
}
if (disposeMode) /* jump to enclosing finally or exit */
```

When a `finally` contains `await` expressions, it is extracted before async rewriting (by AsyncExceptionHandlerRewriter). In those cases, we get:
302
```csharp
303 304 305 306 307 308 309 310 311 312 313 314 315 316 317
try
{
    ...
    goto finallyEntryLabel;
}
catch (Exception e)
{
    ... save exception ...
}
finallyEntryLabel:
{
    ... original code from finally and additional handling for exception ...
}
```

318
In both cases, we will add a `if (disposeMode) /* jump to enclosing finally or exit */` after the block for `finally` logic.
319 320 321 322

#### State values and transitions

The enumerable starts with state -2.
323
Calling GetAsyncEnumerator sets the state to -3, or returns a fresh enumerator (also with state -3).
324 325

From there, MoveNext will either:
326 327 328 329
- reach the end of the method (-2, we're done and disposed)
- reach a `yield break` (state unchanged, dispose mode = true)
- reach a `yield return` (-N, decreasing from -4)
- reach an `await` (N, increasing from 0)
330

331 332
From suspended state N or -N, MoveNext will resume execution (-1).
But if the suspension was a `yield return` (-N), you could also call DisposeAsync, which resumes execution (-1) in dispose mode.
333 334 335

When in dispose mode, MoveNext continues to suspend (N) and resume (-1) until the end of the method is reached (-2).

336
The result of invoking `DisposeAsync` from states -1 or N is unspecified. This compiler generates `throw new NotSupportException()` for those cases.
337

338
```
339 340 341 342 343 344 345 346
        DisposeAsync                              await
 +------------------------+             +------------------------> N
 |                        |             |                          |
 v   GetAsyncEnumerator   |             |        resuming          |
-2 --------------------> -3 --------> -1 <-------------------------+    Dispose mode = false
 ^                                   |  |                          |
 |         done and disposed         |  |      yield return        |
 +-----------------------------------+  +-----------------------> -N
347
 |        or exception thrown        |                             |
348 349 350 351 352 353 354 355 356 357 358
 |                                   |                             |
 |                             yield |                             |
 |                             break |           DisposeAsync      |
 |                                   |  +--------------------------+
 |                                   |  |
 |                                   |  |
 |         done and disposed         v  v    suspension (await)
 +----------------------------------- -1 ------------------------> N
                                        ^                          |    Dispose mode = true
                                        |         resuming         |
                                        +--------------------------+
359
```
360