r2dbc.md 5.1 KB
Newer Older
茶陵後's avatar
茶陵後 已提交
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105
# R2DBC Support

## R2DBC Support

Spring Integration provides channel adapters for receiving and sending messages by using reactive access to databases via [R2DBC](https://r2dbc.io/) drivers.

You need to include this dependency into your project:

Maven

```
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-r2dbc</artifactId>
    <version>5.5.9</version>
</dependency>
```

Gradle

```
compile "org.springframework.integration:spring-integration-r2dbc:5.5.9"
```

### R2DBC Inbound Channel Adapter

The `R2dbcMessageSource` is a pollable `MessageSource` implementation based on the `R2dbcEntityOperations` and produces messages with a `Flux` or `Mono` as a payload for data fetched from database according an `expectSingleResult` option.
The query to `SELECT` can be statically provided or based on a SpEL expression which is evaluated on every `receive()` call.
The `R2dbcMessageSource.SelectCreator` is present as a root object for evaluation context to allow to use a `StatementMapper.SelectSpec` fluent API.
By default this channel adapter maps records from the select into a `LinkedCaseInsensitiveMap` instances.
It can be customized providing a `payloadType` options which is used underneath by the `EntityRowMapper` based on the `this.r2dbcEntityOperations.getConverter()`.
The `updateSql` is optional and used to mark read records in the databased for skipping from the subsequent polls.
The `UPDATE` operation can be supplied with a `BiFunction<DatabaseClient.GenericExecuteSpec, ?, DatabaseClient.GenericExecuteSpec>` to bind values into an `UPDATE` based on records in the `SELECT` result.

A typical configuration for this channel adapter might look like this:

```
@Bean
@InboundChannelAdapter("fromR2dbcChannel")
public R2dbcMessageSource r2dbcMessageSourceSelectMany() {
    R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate,
            "SELECT * FROM person WHERE name='Name'");
    r2dbcMessageSource.setPayloadType(Person.class);
    r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id");
    r2dbcMessageSource.setBindFunction(
				(DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));}
    return r2dbcMessageSource;
}
```

With Java DSL a configuration for this channel adapter is like this:

```
@Bean
IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) {
    return IntegrationFlows
        .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate,
            (selectCreator) ->
                    selectCreator.createSelect("person")
                        .withProjection("*")
                        .withCriteria(Criteria.where("id").is(1)))
                    .expectSingleResult(true)
                    .payloadType(Person.class)
                    .updateSql("UPDATE Person SET id='2' where id = :id")
                    .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) ->
                            bindSpec.bind("id", o.getId())),
            e -> e.poller(p -> p.fixedDelay(100)))
        .<Mono<?>>handle((p, h) -> p, e -> e.async(true))
        .channel(MessageChannels.flux())
        .get();
}
```

### R2DBC Outbound Channel Adapter

The `R2dbcMessageHandler` is a `ReactiveMessageHandler` implementation to perform an `INSERT` (default), `UPDATE` or `DELETE` query in database using a provided `R2dbcEntityOperations`.
The `R2dbcMessageHandler.Type` can be configured statically or via a SpEL expression against request message.
The query to execute can be based on the `tableName`, `values` and `criteria` expression options or (if `tableName` is not provided) the whole message payload is treated as an `org.springframework.data.relational.core.mapping.Table` entity to perform SQL against.
The package `org.springframework.data.relational.core.query` is registered as an import into a SpEL evaluation context for direct access to the `Criteria` fluent API which is used for `UPDATE` and `DELETE` queries.
The `valuesExpression` is used in the `INSERT` and `UPDATE` and must be evaluated to the `Map` for column-value pairs to perform a change in the target table against request message.

A typical configuration for this channel adapter might look like this:

```
@Bean
@ServiceActivator(inputChannel = "toR2dbcChannel")
public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) {
    R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate)
    messageHandler.setValuesExpression(new FunctionExpression<Message<?>>(Message::getPayload));
    messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE);
    messageHandler.setCriteriaExpression(
        EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId)));
    return messageHandler;
}
```

With Java DSL a configuration for this channel adapter is like this:

```
.handle(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate)
        .queryType(R2dbcMessageHandler.Type.UPDATE)
        .tableNameExpression("payload.class.simpleName")
        .criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId")))
        .values("{age:36}"))
```