# R2DBC 支持 ## R2DBC 支持 Spring 集成通过使用通过[R2DBC](https://r2dbc.io/)驱动程序对数据库的反应性访问,为接收和发送消息提供了通道适配器。 你需要在项目中包含此依赖项: Maven ``` org.springframework.integration spring-integration-r2dbc 5.5.9 ``` Gradle ``` compile "org.springframework.integration:spring-integration-r2dbc:5.5.9" ``` ### R2DBC 入站通道适配器 `R2dbcMessageSource`是一个基于`R2dbcEntityOperations`的 pollable`MessageSource`实现,并生成带有`Flux`或`Mono`的消息,作为根据`expectSingleResult`选项从数据库中获取数据的有效负载。可以静态地提供对`SELECT`的查询,也可以基于对每个`receive()`调用求值的 SPEL 表达式。`R2dbcMessageSource.SelectCreator`作为求值上下文的根对象存在,以允许使用`StatementMapper.SelectSpec`fluent API。默认情况下,此通道适配器将从 SELECT 映射到`LinkedCaseInsensitiveMap`实例中的记录。可以根据`this.r2dbcEntityOperations.getConverter()`定制提供`payloadType`选项,该选项由`EntityRowMapper`在下面使用。`updateSql`是可选的,用于在数据库中 Mark Read 记录,以便跳过后续的轮询。`UPDATE`操作可以提供一个`BiFunction`,根据`SELECT`结果中的记录,将值绑定到`UPDATE`中。 此通道适配器的典型配置可能如下所示: ``` @Bean @InboundChannelAdapter("fromR2dbcChannel") public R2dbcMessageSource r2dbcMessageSourceSelectMany() { R2dbcMessageSource r2dbcMessageSource = new R2dbcMessageSource(this.r2dbcEntityTemplate, "SELECT * FROM person WHERE name='Name'"); r2dbcMessageSource.setPayloadType(Person.class); r2dbcMessageSource.setUpdateSql("UPDATE Person SET name='SomeOtherName' WHERE id = :id"); r2dbcMessageSource.setBindFunction( (DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId()));} return r2dbcMessageSource; } ``` 对于 Java DSL,此通道适配器的配置如下所示: ``` @Bean IntegrationFlow r2dbcDslFlow(R2dbcEntityTemplate r2dbcEntityTemplate) { return IntegrationFlows .from(R2dbc.inboundChannelAdapter(r2dbcEntityTemplate, (selectCreator) -> selectCreator.createSelect("person") .withProjection("*") .withCriteria(Criteria.where("id").is(1))) .expectSingleResult(true) .payloadType(Person.class) .updateSql("UPDATE Person SET id='2' where id = :id") .bindFunction((DatabaseClient.GenericExecuteSpec bindSpec, Person o) -> bindSpec.bind("id", o.getId())), e -> e.poller(p -> p.fixedDelay(100))) .>handle((p, h) -> p, e -> e.async(true)) .channel(MessageChannels.flux()) .get(); } ``` ### R2DBC 出站通道适配器 `R2dbcMessageHandler`是一个`ReactiveMessageHandler`实现,用于在数据库中使用提供的`R2dbcEntityOperations`执行`INSERT`(默认)、`UPDATE`或`DELETE`查询。`R2dbcMessageHandler.Type`可以静态配置,也可以通过针对请求消息的 SPEL 表达式配置。要执行的查询可以基于`tableName`、`values`和`criteria`表达式选项,或者(如果不提供`tableName`)将整个消息有效负载作为`org.springframework.data.relational.core.mapping.Table`实体来执行 SQL。将包`org.springframework.data.relational.core.query`注册为导入到 SPEL 评估上下文中,以直接访问`Criteria`Fluent API,该 API 用于`UPDATE`和`DELETE`查询。`valuesExpression`在`INSERT`和`UPDATE`中使用,并且必须对`Map`进行求值,以便对列值对在目标表中执行针对请求消息的更改。 此通道适配器的典型配置可能如下所示: ``` @Bean @ServiceActivator(inputChannel = "toR2dbcChannel") public R2dbcMessageHandler r2dbcMessageHandler(R2dbcEntityTemplate r2dbcEntityTemplate) { R2dbcMessageHandler messageHandler = new R2dbcMessageHandler(r2dbcEntityTemplate) messageHandler.setValuesExpression(new FunctionExpression>(Message::getPayload)); messageHandler.setQueryType(R2dbcMessageHandler.Type.UPDATE); messageHandler.setCriteriaExpression( EXPRESSION_PARSER.parseExpression("T(Criteria).where('id).is(headers.personId))); return messageHandler; } ``` 对于 Java DSL,此通道适配器的配置如下所示: ``` .handle(R2dbc.outboundChannelAdapter(r2dbcEntityTemplate) .queryType(R2dbcMessageHandler.Type.UPDATE) .tableNameExpression("payload.class.simpleName") .criteria((message) -> Criteria.where("id").is(message.getHeaders().get("personId"))) .values("{age:36}")) ```