# R2DBC Support ## R2DBC Support Spring Integration provides channel adapters for receiving and sending messages by using reactive access to databases via [R2DBC](https://r2dbc.io/) drivers. You need to include this dependency into your project: Maven ``` org.springframework.integration spring-integration-r2dbc 5.5.9 ``` Gradle ``` compile "org.springframework.integration:spring-integration-r2dbc:5.5.9" ``` ### R2DBC Inbound Channel Adapter The `R2dbcMessageSource` is a pollable `MessageSource` implementation based on the `R2dbcEntityOperations` and produces messages with a `Flux` or `Mono` as a payload for data fetched from database according an `expectSingleResult` option. The query to `SELECT` can be statically provided or based on a SpEL expression which is evaluated on every `receive()` call. The `R2dbcMessageSource.SelectCreator` is present as a root object for evaluation context to allow to use a `StatementMapper.SelectSpec` fluent API. By default this channel adapter maps records from the select into a `LinkedCaseInsensitiveMap` instances. It can be customized providing a `payloadType` options which is used underneath by the `EntityRowMapper` based on the `this.r2dbcEntityOperations.getConverter()`. The `updateSql` is optional and used to mark read records in the databased for skipping from the subsequent polls. The `UPDATE` operation can be supplied with a `BiFunction` to bind values into an `UPDATE` based on records in the `SELECT` result. A typical configuration for this channel adapter might look like this: ``` @Bean @InboundChannelAdapter("fromR2dbcChannel") public R2dbcMessageSource r2dbcMessageSourceSelectMany() { R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate, "SELECT * FROM person WHERE name='Name'"); r2dbcMessageSource.setPayloadType(Person.class); r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id"); r2dbcMessageSource.setBindFunction( (DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));} return r2dbcMessageSource; } ``` With Java DSL a configuration for this channel adapter is like this: ``` @Bean IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) { return IntegrationFlows .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate, (selectCreator) -> selectCreator.createSelect("person") .withProjection("*") .withCriteria(Criteria.where("id").is(1))) .expectSingleResult(true) .payloadType(Person.class) .updateSql("UPDATE Person SET id='2' where id = :id") .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId())), e -> e.poller(p -> p.fixedDelay(100))) .>handle((p, h) -> p, e -> e.async(true)) .channel(MessageChannels.flux()) .get(); } ``` ### R2DBC Outbound Channel Adapter The `R2dbcMessageHandler` is a `ReactiveMessageHandler` implementation to perform an `INSERT` (default), `UPDATE` or `DELETE` query in database using a provided `R2dbcEntityOperations`. The `R2dbcMessageHandler.Type` can be configured statically or via a SpEL expression against request message. The query to execute can be based on the `tableName`, `values` and `criteria` expression options or (if `tableName` is not provided) the whole message payload is treated as an `org.springframework.data.relational.core.mapping.Table` entity to perform SQL against. The package `org.springframework.data.relational.core.query` is registered as an import into a SpEL evaluation context for direct access to the `Criteria` fluent API which is used for `UPDATE` and `DELETE` queries. The `valuesExpression` is used in the `INSERT` and `UPDATE` and must be evaluated to the `Map` for column-value pairs to perform a change in the target table against request message. A typical configuration for this channel adapter might look like this: ``` @Bean @ServiceActivator(inputChannel = "toR2dbcChannel") public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) { R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate) messageHandler.setValuesExpression(new FunctionExpression>(Message::getPayload)); messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE); messageHandler.setCriteriaExpression( EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId))); return messageHandler; } ``` With Java DSL a configuration for this channel adapter is like this: ``` .handle(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate) .queryType(R2dbcMessageHandler.Type.UPDATE) .tableNameExpression("payload.class.simpleName") .criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId"))) .values("{age:36}")) ```