gemfire.md 14.5 KB
Newer Older
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 81 82 83 84 85 86 87 88 89 90 91 92 93 94 95 96 97 98 99 100 101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118 119 120 121 122 123 124 125 126 127 128 129 130 131 132 133 134 135 136 137 138 139 140 141 142 143 144 145 146 147 148 149 150 151 152 153 154 155 156 157 158 159 160 161 162 163 164 165 166 167 168 169 170 171 172 173 174 175 176 177 178 179 180 181 182 183 184 185 186 187 188 189 190 191 192 193 194 195 196 197 198 199 200 201 202 203 204 205 206 207 208 209 210 211 212 213 214
# Pivotal Gemfire 和 Apache Geode 支持

## Pivotal Gemfire 和 Apache Geode 支持

Spring 集成为 Pivotal Gemfire 和 Apache Geode 提供了支持。

你需要在项目中包含此依赖项:

Maven

```
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-gemfire</artifactId>
    <version>5.5.9</version>
</dependency>
```

Gradle

```
compile "org.springframework.integration:spring-integration-gemfire:5.5.9"
```

Gemfire 是一个分布式数据管理平台,它提供了一个键值数据网格以及先进的分布式系统功能,例如事件处理,连续查询和远程功能执行。本指南假定你熟悉商业[Pivotal Gemfire](https://pivotal.io/pivotal-gemfire)或开源[Apache Geode](https://geode.apache.org)

Spring 集成通过实现用于条目和连续查询事件的入站适配器、用于将条目写到缓存的出站适配器、以及消息和元数据存储和`GemfireLockRegistry`实现来提供对 Gemfire 的支持。 Spring 集成利用了[Spring Data for Pivotal GemFire](https://projects.spring.io/spring-data-gemfire)项目,在其组件上提供了一个薄的包装器。

从版本 5.1 开始, Spring Integration Gemfire 模块默认使用[Spring Data for Apache Geode](https://github.com/spring-projects/spring-data-geode)传递依赖关系。要为 Pivotal Gemfire 切换到基于 Pivotal Gemfire Spring 的商业数据,从依赖项中排除`spring-data-geode`,并添加`spring-data-gemfire`,如下 Maven 片段所示:

```
<dependency>
    <groupId>org.springframework.integration</groupId>
    <artifactId>spring-integration-gemfire</artifactId>
    <exclusions>
        <exclusion>
            <groupId>org.springframework.data</groupId>
            <artifactId>spring-data-geode</artifactId>
        </exclusion>
    </exclusions>
</dependency>

<dependency>
    <groupId>org.springframework.data</groupId>
    <artifactId>spring-data-gemfire</artifactId>
</dependency>
```

要配置“INT-GFE”名称空间,在 XML 配置文件的头中包含以下元素:

```
xmlns:int-gfe="http://www.springframework.org/schema/integration/gemfire"
xsi:schemaLocation="http://www.springframework.org/schema/integration/gemfire
	https://www.springframework.org/schema/integration/gemfire/spring-integration-gemfire.xsd"
```

### 入站通道适配器

当 Gemfire`EntryEvent`触发时,入站通道适配器在通道上生成消息。每当相关区域中的条目`CREATED``UPDATED``DESTROYED``INVALIDATED`时,Gemfire 都会生成事件。入站通道适配器允许你筛选这些事件的一个子集。例如,你可能希望仅在响应正在创建的条目时生成消息。此外,如果希望消息有效负载包含事件属性(如新的 entry 值),则入站通道适配器可以计算 SPEL 表达式。下面的示例展示了如何使用 SPEL 语言(在`expression`属性中)配置入站通道适配器:

```
<gfe:cache/>
<gfe:replicated-region id="region"/>
<int-gfe:inbound-channel-adapter id="inputChannel" region="region"
    cache-events="CREATED" expression="newValue"/>
```

前面的配置使用 Spring Gemfire 的“GFE”命名空间创建了一个 Gemfire`Cache``Region``inbound-channel-adapter`元素需要对适配器监听事件的 Gemfire 区域进行引用。可选属性包括`cache-events`,它可以包含一个逗号分隔的事件类型列表,这些事件类型的消息是在输入通道上产生的。默认情况下,`CREATED``UPDATED`是启用的。如果不提供`channel`属性,则从`id`属性创建通道。这个适配器还支持`error-channel`。Gemfire[`EntryEvent`](https://geode.apache.org/releases/latest/javadoc/org/apache/geode/cache/entryevent.html)是`#root`评估的对象。下面的示例展示了一个替换键值的表达式:

```
expression="new something.MyEvent(key, oldValue, newValue)"
```

如果不提供`expression`属性,则消息有效负载就是 Gemfire`EntryEvent`本身。

|   |此适配器符合 Spring 集成约定。|
|---|--------------------------------------------------------|

### 连续查询入站通道适配器

当 Gemfire 连续查询或`CqEvent`事件触发时,连续查询入站通道适配器在通道上生成消息。在版本`1.1`中, Spring Data 引入了连续查询支持,包括`ContinuousQueryListenerContainer`,这在 Gemfire 原生 API 上提供了一个很好的抽象。这个适配器需要对`ContinuousQueryListenerContainer`实例的引用,为给定的`query`创建一个侦听器,并执行查询。连续查询充当事件源,每当其结果集更改状态时,它都会触发事件源。

|   |Gemfire 查询是用 OQL 编写的,其作用域是整个缓存(而不仅仅是一个区域)。<br/>此外,连续查询需要一个远程(即在单独的进程或远程主机中运行)缓存服务器。<br/>有关实现连续查询的更多信息,请参见[Gemfire 文档](https://gemfire82.docs.pivotal.io/docs-gemfire/gemfire_nativeclient/continuous-querying/continuous-querying.html)。|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

下面的配置创建了一个 Gemfire 客户端缓存(该实现需要一个远程缓存服务器,其地址被配置为池的子元素)、一个客户端区域和一个使用 Spring 数据的`ContinuousQueryListenerContainer`:

```
<gfe:client-cache id="client-cache" pool-name="client-pool"/>

<gfe:pool id="client-pool" subscription-enabled="true" >
    <!--configure server or locator here required to address the cache server -->
</gfe:pool>

<gfe:client-region id="test" cache-ref="client-cache" pool-name="client-pool"/>

<gfe:cq-listener-container id="queryListenerContainer" cache="client-cache"
    pool-name="client-pool"/>

<int-gfe:cq-inbound-channel-adapter id="inputChannel"
    cq-listener-container="queryListenerContainer"
    query="select * from /test"/>
```

连续查询入站通道适配器需要`cq-listener-container`属性,该属性必须包含对`ContinuousQueryListenerContainer`的引用。可选地,它接受`expression`属性,该属性使用 SPEL 转换`CqEvent`或根据需要提取单个属性。`cq-inbound-channel-adapter`提供了一个`query-events`属性,该属性包含一个以逗号分隔的事件类型列表,该事件类型的消息是在输入通道上产生的。可用的事件类型是`CREATED``UPDATED``DESTROYED``REGION_DESTROYED``REGION_INVALIDATED`。默认情况下,`CREATED``UPDATED`是启用的。其他可选属性包括`query-name`(它提供了一个可选的查询名称)、`expression`(它的工作原理如上一节所述)和`durable`(一个布尔值,指示查询是否持久——默认情况下它是 false)。如果不提供`channel`,则通道将从`id`属性创建。这个适配器还支持`error-channel`

|   |此适配器符合 Spring 集成约定。|
|---|--------------------------------------------------------|

### 出站通道适配器

出站通道适配器写入从消息有效负载映射的缓存项。在其最简单的形式中,它期望类型`java.util.Map`的有效负载,并将映射条目放入其配置的区域中。下面的示例展示了如何配置出站通道适配器:

```
<int-gfe:outbound-channel-adapter id="cacheChannel" region="region"/>
```

给定上述配置,如果有效负载不是`Map`,则会引发异常。此外,你可以配置出站通道适配器,以通过使用 SPEL 创建高速缓存条目的映射。下面的示例展示了如何做到这一点:

```
<int-gfe:outbound-channel-adapter id="cacheChannel" region="region">
    <int-gfe:cache-entries>
        <entry key="payload.toUpperCase()" value="payload.toLowerCase()"/>
        <entry key="'thing1'" value="'thing2'"/>
    </int-gfe:cache-entries>
</int-gfe:outbound-channel-adapter>
```

在前面的配置中,内部元素(`cache-entries`)在语义上等同于 Spring’map’元素。适配器将`key``value`属性解释为 SPEL 表达式,并将消息作为求值上下文。请注意,这可以包含任意的缓存条目(不仅仅是从消息派生的条目),并且文字值必须用单引号括起来。在前面的示例中,如果发送到`cacheChannel`的消息具有`String``Hello`的有效负载,则在缓存区域中写入两个条目(`[HELLO:hello, thing1:thing2]`)(要么创建要么更新)。此适配器还支持`order`属性,如果将其绑定到`PublishSubscribeChannel`,则该属性可能很有用。

### Gemfire 消息存储

正如 EIP 中所描述的,[消息存储](https://www.enterpriseintegrationpatterns.com/MessageStore.html)允许你持久化消息。在处理具有缓冲消息能力的组件(`QueueChannel``Aggregator``Resequencer`,以及其他)时,如果可靠性是一个问题,这可能是有用的。在 Spring 集成中,`MessageStore`策略接口还为[索赔检查](https://www.enterpriseintegrationpatterns.com/StoreInLibrary.html)模式提供了基础,这也在 EIP 中进行了描述。

Spring Integration 的 Gemfire 模块提供了`GemfireMessageStore`,这是`MessageStore`策略(主要由`QueueChannel``ClaimCheck`模式使用)和`MessageGroupStore`策略(主要由`Aggregator``Resequencer`模式使用)的一种实现。

下面的示例通过使用`spring-gemfire`命名空间(不要与`spring-integration-gemfire`命名空间混淆)来配置缓存和区域:

```
<bean id="gemfireMessageStore" class="o.s.i.gemfire.store.GemfireMessageStore">
    <constructor-arg ref="myRegion"/>
</bean>

<gfe:cache/>

<gfe:replicated-region id="myRegion"/>

<int:channel id="somePersistentQueueChannel">
    <int:queue message-store="gemfireMessageStore"/>
<int:channel>

<int:aggregator input-channel="inputChannel" output-channel="outputChannel"
    message-store="gemfireMessageStore"/>
```

通常,希望在客户机-服务器配置中在一个或多个远程缓存服务器中维护消息存储。在这种情况下,你应该配置一个客户机缓存、一个客户机区域和一个客户机池,并将该区域注入`MessageStore`。下面的示例展示了如何做到这一点:

```
<bean id="gemfireMessageStore"
    class="org.springframework.integration.gemfire.store.GemfireMessageStore">
    <constructor-arg ref="myRegion"/>
</bean>

<gfe:client-cache/>

<gfe:client-region id="myRegion" shortcut="PROXY" pool-name="messageStorePool"/>

<gfe:pool id="messageStorePool">
    <gfe:server host="localhost" port="40404" />
</gfe:pool>
```

请注意,`pool`元素被配置为缓存服务器的地址(你可以在此替换定位器)。该区域被配置为“代理”,因此不会在本地存储任何数据。该区域的`id`对应于缓存服务器中同名的区域。

从版本 4.3.12 开始,`GemfireMessageStore`支持键`prefix`选项,以允许在同一 Gemfire 区域上的存储实例之间进行区分。

### Gemfire Lock 注册表

从版本 4.0 开始,`GemfireLockRegistry`是可用的。某些组件(例如,聚合器和重排序程序)使用从`LockRegistry`实例获得的锁,以确保在任何给定的时间只有一个线程正在操作组。`DefaultLockRegistry`在单个组件中执行此功能。现在,你可以在这些组件上配置一个外部锁注册中心。当你使用与`GemfireLockRegistry`共享的`MessageGroupStore`时,它可以跨多个应用程序实例提供此功能,从而一次只有一个实例可以操作组。

|   |其中一个`GemfireLockRegistry`构造函数需要一个`Region`作为参数。<br/>它用于从`getDistributedLock()`方法获得一个`Lock`<br/>该操作需要`GLOBAL`用于`Region`的作用域。,<br/>另一个构造函数需要一个`Cache`,而`Region`是用`GLOBAL`作用域和名称`LockRegistry`创建的。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

### Gemfire 元数据存储

4.0 版本引入了一个新的基于 Gemfire 的`MetadataStore`[元数据存储](./meta-data-store.html#metadata-store))实现。你可以使用`GemfireMetadataStore`在应用程序重新启动期间维护元数据状态。这个新的`MetadataStore`实现可以与适配器一起使用,例如:

* [馈入入通道适配器](./feed.html#feed-inbound-channel-adapter)

* [读取文件](./file.html#file-reading)

* [FTP 入站通道适配器](./ftp.html#ftp-inbound)

* [SFTP 入站通道适配器](./sftp.html#sftp-inbound)

要让这些适配器使用新的`GemfireMetadataStore`,请声明一个 Spring  Bean,其 Bean 名称为`metadataStore`。提要入站通道适配器自动拾取并使用声明的`GemfireMetadataStore`

|   |`GemfireMetadataStore`还实现了`ConcurrentMetadataStore`,从而可以在多个应用程序实例之间可靠地共享它,只有一个实例可以存储或修改键值。<br/>这些方法根据该区域的范围和数据策略提供不同级别的并发保证。<br/>它们在对等缓存和客户机-服务器缓存中实现,但在对等区域中不允许。具有`NORMAL``EMPTY`数据策略的。|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

|   |从版本 5.0 开始,`GemfireMetadataStore`还实现了`ListenableMetadataStore`,它允许你通过向存储提供`MetadataStoreListener`实例来侦听缓存事件,如下例所示:|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|

```
GemfireMetadataStore metadataStore = new GemfireMetadataStore(cache);
metadataStore.addListener(new MetadataStoreListenerAdapter() {

    @Override
    public void onAdd(String key, String value) {
         ...
    }

});
```