# FTP/FTPS
## FTP/FTPS 适配器
Spring 集成为使用 FTP 和 FTPS 的文件传输操作提供了支持。
文件传输协议是一种简单的网络协议,它允许你在互联网上的两台计算机之间传输文件。FTPS 代表“FTP over SSL”。
你需要在项目中包含此依赖项:
Maven
```
org.springframework.integration
spring-integration-ftp
5.5.9
```
Gradle
```
compile "org.springframework.integration:spring-integration-ftp:5.5.9"
```
当涉及到 FTP 通信时,有两个参与者:客户机和服务器。要使用 FTP 或 FTPS 传输文件,你可以使用一个客户端,该客户端启动到运行 FTP 服务器的远程计算机的连接。在建立连接之后,客户端可以选择发送或接收文件的副本。
Spring 集成通过提供三个客户端端点来支持通过 FTP 或 FTPS 发送和接收文件:入站通道适配器、出站通道适配器和出站网关。它还为定义这些客户机组件提供了方便的基于名称空间的配置选项。
要使用 FTP 名称空间,请将以下内容添加到 XML 文件的头中:
```
xmlns:int-ftp="http://www.springframework.org/schema/integration/ftp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/ftp
https://www.springframework.org/schema/integration/ftp/spring-integration-ftp.xsd"
```
### ftp会话工厂
Spring 集成提供了你可以用来创建 FTP(或 FTPS)会话的工厂。
#### 默认工厂
| |从版本 3.0 开始,默认情况下会话不再缓存。
参见[FTP Session Caching](#ftp-session-caching)。|
|---|-------------------------------------------------------------------------------------------------------------------------|
在配置 FTP 适配器之前,你必须配置一个 FTP会话工厂。你可以使用常规的 Bean 定义来配置 FTP会话工厂,其中实现类是`o.s.i.ftp.session.DefaultFtpSessionFactory`。下面的示例展示了一个基本配置:
```
```
对于 FTPS 连接,你可以使用`o.s.i.ftp.session.DefaultFtpsSessionFactory`代替。
下面的示例展示了一个完整的配置:
```
```
| |如果你遇到连接问题,并且希望跟踪会话创建以及查看轮询的会话,则可以通过将记录器设置为`TRACE`级别(例如,`log4j.category.org.springframework.integration.file=TRACE`)来启用会话跟踪。|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
现在你只需要将这些工厂注入到适配器中即可。适配器使用的协议(FTP 或 FTPS)取决于已注入适配器的会话工厂的类型。
| |为 FTP 或 FTPS会话工厂提供值的一种更实用的方法是使用 Spring 的属性占位符支持(参见[https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#beans-factory-placeholderconfigurer](https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#beans-factory-placeholderconfigurer))。|
|---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
### 高级配置
`DefaultFtpSessionFactory`提供了对底层客户机 API 的抽象,它(自 Spring Integration2.0 以来)是[Apache Commons 网](https://commons.apache.org/net/)。这将使你免于了解`org.apache.commons.net.ftp.FTPClient`的低级配置细节。在会话工厂上公开了几个常见的属性(自版本 4.0 以来,现在包括`connectTimeout`、`defaultTimeout`和`dataTimeout`)。然而,有时需要访问较低级别的`FTPClient`配置以实现更高级的配置(例如设置活动模式的端口范围)。为此,`AbstractFtpSessionFactory`(所有 FTP会话工厂的基类)以以下清单中所示的两种后处理方法的形式公开钩子:
```
/**
* Will handle additional initialization after client.connect() method was invoked,
* but before any action on the client has been taken
*/
protected void postProcessClientAfterConnect(T t) throws IOException {
// NOOP
}
/**
* Will handle additional initialization before client.connect() method was invoked.
*/
protected void postProcessClientBeforeConnect(T client) throws IOException {
// NOOP
}
```
正如你所看到的,这两种方法都没有默认的实现。然而,通过扩展`DefaultFtpSessionFactory`,你可以覆盖这些方法,以提供更高级的`FTPClient`配置,如下例所示:
```
public class AdvancedFtpSessionFactory extends DefaultFtpSessionFactory {
protected void postProcessClientBeforeConnect(FTPClient ftpClient) throws IOException {
ftpClient.setActivePortRange(4000, 5000);
}
}
```
#### FTPS 和共享 SSLSession
当通过 SSL 或 TLS 使用 FTP 时,一些服务器需要在控制和数据连接上使用相同的`SSLSession`。这是为了防止“窃取”数据连接。有关更多信息,请参见[https://scarybeastsecurity.blogspot.cz/2009/02/vsftpd-210-released.html](https://scarybeastsecurity.blogspot.cz/2009/02/vsftpd-210-released.html)。
目前,Apache FTPSclient 不支持此功能。见[NET-408](https://issues.apache.org/jira/browse/NET-408)。
下面的解决方案由[堆栈溢出](https://stackoverflow.com/questions/32398754/how-to-connect-to-ftps-server-with-data-connection-using-same-tls-session)提供,它在`sun.security.ssl.SSLSessionContextImpl`上使用反射,因此它可能无法在其他 JVM 上工作。堆栈溢出答案是在 2015 年提交的, Spring 集成团队最近在 JDK1.8.0\_112 上测试了该解决方案。
下面的示例展示了如何创建 FTPS会话:
```
@Bean
public DefaultFtpsSessionFactory sf() {
DefaultFtpsSessionFactory sf = new DefaultFtpsSessionFactory() {
@Override
protected FTPSClient createClientInstance() {
return new SharedSSLFTPSClient();
}
};
sf.setHost("...");
sf.setPort(21);
sf.setUsername("...");
sf.setPassword("...");
sf.setNeedClientAuth(true);
return sf;
}
private static final class SharedSSLFTPSClient extends FTPSClient {
@Override
protected void _prepareDataSocket_(final Socket socket) throws IOException {
if (socket instanceof SSLSocket) {
// Control socket is SSL
final SSLSession session = ((SSLSocket) _socket_).getSession();
final SSLSessionContext context = session.getSessionContext();
context.setSessionCacheSize(0); // you might want to limit the cache
try {
final Field sessionHostPortCache = context.getClass()
.getDeclaredField("sessionHostPortCache");
sessionHostPortCache.setAccessible(true);
final Object cache = sessionHostPortCache.get(context);
final Method method = cache.getClass().getDeclaredMethod("put", Object.class,
Object.class);
method.setAccessible(true);
String key = String.format("%s:%s", socket.getInetAddress().getHostName(),
String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT);
method.invoke(cache, key, session);
key = String.format("%s:%s", socket.getInetAddress().getHostAddress(),
String.valueOf(socket.getPort())).toLowerCase(Locale.ROOT);
method.invoke(cache, key, session);
}
catch (NoSuchFieldException e) {
// Not running in expected JRE
logger.warn("No field sessionHostPortCache in SSLSessionContext", e);
}
catch (Exception e) {
// Not running in expected JRE
logger.warn(e.getMessage());
}
}
}
}
```
### 委托会话工厂
版本 4.2 引入了`DelegatingSessionFactory`,它允许在运行时选择实际的会话工厂。在调用 FTP 端点之前,在工厂上调用`setThreadKey()`将一个键与当前线程关联。然后使用该键查找要使用的实际会话工厂。使用后,你可以通过调用`clearThreadKey()`清除密钥。
我们添加了方便的方法,这样你就可以从消息流中轻松地使用委托工厂。
下面的示例展示了如何声明一个委托会话工厂:
```
```
| |当你使用会话缓存(参见[FTP Session Caching](#ftp-session-caching))时,每个委托都应该被缓存。
你不能缓存`DelegatingSessionFactory`本身。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
从版本 5.0.7 开始,`DelegatingSessionFactory`可以与`RotatingServerAdvice`一起用于轮询多个服务器;请参见[入站通道适配器:轮询多个服务器和目录](#ftp-rotating-server-advice)。
### FTP 入站通道适配器
FTP 入站通道适配器是一种特殊的侦听器,它连接到 FTP 服务器并侦听远程目录事件(例如,创建的新文件),此时它将启动文件传输。下面的示例展示了如何配置`inbound-channel-adapter`:
```
```
如前面的配置所示,你可以通过使用`inbound-channel-adapter`元素来配置 FTP 入站通道适配器,同时还提供各种属性的值,例如`local-directory`,`filename-pattern`(这是基于简单的模式匹配,而不是正则表达式),以及对`session-factory`的引用。
默认情况下,传输的文件带有与原始文件相同的名称。如果要重写此行为,可以设置`local-filename-generator-expression`属性,该属性允许你提供一个 SPEL 表达式来生成本地文件的名称。与出站网关和适配器(SPEL 求值上下文的根对象是`Message`)不同,此入站适配器在求值时还没有消息,因为这是它最终以传输的文件作为有效负载生成的消息。因此,SPEL 求值上下文的根对象是远程文件的原始名称(a`String`)。
入站通道适配器首先检索本地目录的`File`对象,然后根据 Poller 配置发出每个文件。从版本 5.0 开始,你现在可以在需要新的文件检索时限制从 FTP 服务器获取的文件的数量。当目标文件非常大时,或者当你在具有持久性文件列表过滤器的集群系统中运行时,这可能是有益的,将在后面讨论。为此,请使用`max-fetch-size`。负值(默认值)表示没有限制,并检索所有匹配的文件。有关更多信息,请参见[入站通道适配器:控制远程文件获取](#ftp-max-fetch)。从版本 5.0 开始,你还可以通过设置`scanner`属性,为`inbound-channel-adapter`提供一个自定义的`DirectoryScanner`实现。
从 Spring Integration3.0 开始,你可以指定`preserve-timestamp`属性(它的默认值是`false`)。当`true`时,本地文件的修改时间戳被设置为从服务器检索到的值。否则,它被设置为当前时间。
从版本 4.2 开始,你可以指定`remote-directory-expression`而不是`remote-directory`,让你动态地确定每个轮询上的目录——例如,`remote-directory-expression="@myBean.determineRemoteDir()"`。
从版本 4.3 开始,你可以省略`remote-directory`和`remote-directory-expression`属性。它们默认为`null`。在这种情况下,根据 FTP 协议,客户机工作目录被用作默认的远程目录。
有时,基于`filename-pattern`属性指定的简单模式的文件过滤可能还不够。如果是这种情况,可以使用`filename-regex`属性来指定正则表达式(例如`filename-regex=".*\.test$"`)。此外,如果需要完全控制,则可以使用`filter`属性并提供对`o.s.i.file.filters.FileListFilter`的任何自定义实现的引用,这是用于过滤文件列表的策略接口。此筛选器确定要检索哪些远程文件。还可以通过使用`CompositeFileListFilter`将基于模式的过滤器与其他过滤器(例如`AcceptOnceFileListFilter`,以避免同步先前已获取的文件)结合在一起。
`AcceptOnceFileListFilter`将其状态存储在内存中。如果你希望该状态在系统重新启动后仍然有效,请考虑使用`FtpPersistentAcceptOnceFileListFilter`代替。这个过滤器将接受的文件名存储在`MetadataStore`策略的实例中(参见[元数据存储](./meta-data-store.html#metadata-store))。此筛选器匹配文件名和远程修改时间。
从版本 4.0 开始,这个过滤器需要`ConcurrentMetadataStore`。当与共享数据存储一起使用时(例如`Redis`与`RedisMetadataStore`),它允许跨多个应用程序或服务器实例共享筛选键。
从版本 5.0 开始,在内存中`FtpPersistentAcceptOnceFileListFilter`的`SimpleMetadataStore`默认应用于`FtpInboundFileSynchronizer`。在 XML 配置中,`regex`或`pattern`选项以及在 Java DSL 中,`FtpInboundChannelAdapterSpec`选项也应用了该过滤器。任何其他用例都可以用`CompositeFileListFilter`(或`ChainFileListFilter`)来管理。
前面的讨论涉及在检索文件之前对文件进行过滤。检索完文件后,将对文件系统上的文件应用一个额外的过滤器。默认情况下,这是一个`AcceptOnceFileListFilter`,正如前面讨论的,它在内存中保留状态,并且不考虑文件的修改时间。除非你的应用程序在处理后删除文件,否则在应用程序重新启动后,适配器将在默认情况下重新处理磁盘上的文件。
另外,如果将`filter`配置为使用`FtpPersistentAcceptOnceFileListFilter`并更改远程文件的时间戳(导致重新获取它),则默认的本地筛选器不允许处理此新文件。
有关此过滤器的详细信息,以及如何使用它,请参见[远程持久文件列表过滤器](./file.html#remote-persistent-flf)。
你可以使用`local-filter`属性来配置本地文件系统过滤器的行为。从版本 4.3.8 开始,默认情况下配置了`FileSystemPersistentAcceptOnceFileListFilter`。该过滤器将接受的文件名和修改的时间戳存储在`MetadataStore`策略的实例中(参见[元数据存储](./meta-data-store.html#metadata-store)),并检测本地文件修改时间的更改。默认的`MetadataStore`是`SimpleMetadataStore`,它在内存中存储状态。
从版本 4.1.5 开始,这些过滤器有一个新的属性(`flushOnUpdate`),这会导致它们在每次更新时刷新元数据存储(如果存储实现`Flushable`)。
| |此外,如果使用分布式`MetadataStore`(例如[Redis](./redis.html#redis-metadata-store)或[GemFire](./gemfire.html#gemfire-metadata-store)),则可以使用同一个适配器或应用程序的多个实例,并确保每个文件只处理一次。|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
实际的本地筛选器是`CompositeFileListFilter`,其中包含提供的筛选器和模式筛选器,该筛选器防止正在下载过程中的处理文件(基于`temporary-file-suffix`)。文件是用这个后缀下载的(默认值是`.writing`),当传输完成时,文件被重命名为它的最终名称,使它对过滤器“可见”。
如果缺省的“/”不适用于你的特定环境,`remote-file-separator`属性允许你配置一个文件分隔符来使用。
有关这些属性的更多详细信息,请参见[schema](https://github.com/spring-projects/spring-integration/tree/main/spring-integration-core/src/main/resources/org/springframework/integration/config)。
你还应该理解 FTP 入站通道适配器是一个轮询消费者。因此,你必须配置一个 Poller(通过使用全局默认值或局部子元素)。一旦文件被传输,将生成一条以`java.io.File`作为有效负载的消息,并将其发送到由`channel`属性标识的通道。
#### 更多关于文件过滤和不完整文件的信息
有时,刚刚出现在监视(远程)目录中的文件是不完整的。通常,这样的文件是使用临时扩展名编写的(例如`somefile.txt.writing`),然后在编写过程完成后重新命名。在大多数情况下,你只对完整的文件感兴趣,并且只想筛选完整的文件。要处理这些场景,可以使用`filename-pattern`、`filename-regex`和`filter`属性提供的过滤支持。下面的示例使用了自定义过滤器实现:
```
remote-directory="some/remote/path"
```
#### 入站 FTP 适配器的 Poller 配置注意事项
入站 FTP 适配器的工作包括两个任务:
1. 与远程服务器通信,以便将文件从远程目录传输到本地目录。
2. 对于每个传输的文件,生成一条以该文件为有效负载的消息,并将其发送到由“通道”属性标识的通道。这就是为什么他们被称为“通道适配器”,而不仅仅是“适配器”。这种适配器的主要工作是生成要发送到消息通道的消息。从本质上讲,第二个任务以这样一种方式获得优先权,即如果你的本地目录已经有一个或多个文件,它首先从这些文件生成消息。只有当所有的本地文件都被处理后,它才会启动远程通信来检索更多的文件。
此外,在 Poller 上配置触发器时,你应该密切关注`max-messages-per-poll`属性。对于所有`SourcePollingChannelAdapter`实例(包括 FTP),它的默认值是`1`。这意味着,一旦一个文件被处理,它就会等待由触发器配置决定的下一个执行时间。如果你碰巧在`local-directory`中有一个或多个文件,那么它将在启动与远程 FTP 服务器的通信之前处理这些文件。此外,如果`max-messages-per-poll`被设置为`1`(默认值),则它一次只处理一个文件,其间隔由触发器定义,本质上是“One-poll===one-file”。
对于典型的文件传输用例,你很可能想要相反的行为:为每个投票处理所有可以处理的文件,然后等待下一个投票。如果是这种情况,则将`max-messages-per-poll`设置为-1。然后,在每个轮询中,适配器尝试尽可能多地生成消息。换句话说,它处理本地目录中的所有内容,然后连接到远程目录,将那里可用的所有内容转移到本地处理。只有这样,轮询操作才算完成,Poller 才会等待下一次执行时间。
你也可以将“max-messages-per-poll”值设置为正值,该值指示从每个 poll 文件创建的消息的向上限制。例如,值`10`意味着,在每个轮询中,它尝试处理的文件不超过 10 个。
#### 从故障中恢复
理解适配器的体系结构是很重要的。有一个文件同步器来获取文件,还有一个`FileReadingMessageSource`为每个同步文件发出一条消息。正如前面所讨论的,涉及两个过滤器。`filter`属性(和模式)引用远程文件列表,以避免获取已经获取的文件。`local-filter`由`FileReadingMessageSource`用来确定哪些文件将作为消息发送。
同步器列出远程文件并查看其过滤器。然后文件被转移。如果在文件传输过程中发生 IO 错误,那么已经添加到过滤器中的所有文件都将被删除,以便在下一次投票时有资格重新获取它们。这仅在过滤器实现`ReversibleFileListFilter`(例如`AcceptOnceFileListFilter`)时才适用。
如果在同步文件后,处理文件的下游流出现错误,则不会自动回滚过滤器,因此默认情况下不会重新处理失败的文件。
如果你希望在失败后重新处理此类文件,则可以使用类似于以下的配置,以便于从过滤器中删除失败的文件:
```
```
前面的配置适用于任何`ResettableFileListFilter`。
从版本 5.0 开始,入站通道适配器可以在本地构建与生成的本地文件名对应的子目录。这也可以是一个远程子路径。为了能够根据层次结构支持递归地读取本地目录以进行修改,你现在可以使用基于`Files.walk()`算法的新的`RecursiveDirectoryScanner`提供一个内部`FileReadingMessageSource`。参见[`AbstractInboundFileSynchronizingMessageSource.setScanner()`](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/file/remote/synchronizer/abstractinboundfilesynchronizingmessagesource.html#setscanner)了解更多信息。此外,你现在可以通过使用`setUseWatchService()`选项将`AbstractInboundFileSynchronizingMessageSource`切换到基于`WatchService`的`DirectoryScanner`。它还被配置为所有`WatchEventType`实例,以对本地目录中的任何修改做出反应。前面显示的再处理示例基于`FileReadingMessageSource.WatchServiceDirectoryScanner`的内置功能,当从本地目录中删除文件(`StandardWatchEventKinds.ENTRY_DELETE`)时执行`ResettableFileListFilter.remove()`。有关更多信息,请参见[`WatchServiceDirectoryScanner`](./file.html#watch-service-directory-scanner)。
#### 使用 Java 配置进行配置
Spring 以下引导应用程序展示了如何使用 Java 配置配置入站适配器的示例:
```
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory(sf);
}
@Bean
public FtpInboundFileSynchronizer ftpInboundFileSynchronizer() {
FtpInboundFileSynchronizer fileSynchronizer = new FtpInboundFileSynchronizer(ftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new FtpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}
@Bean
@InboundChannelAdapter(channel = "ftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource ftpMessageSource() {
FtpInboundFileSynchronizingMessageSource source =
new FtpInboundFileSynchronizingMessageSource(ftpInboundFileSynchronizer());
source.setLocalDirectory(new File("ftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter());
source.setMaxFetchSize(1);
return source;
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
return new MessageHandler() {
@Override
public void handleMessage(Message> message) throws MessagingException {
System.out.println(message.getPayload());
}
};
}
}
```
#### 使用 Java DSL 进行配置
Spring 以下引导应用程序展示了如何使用 Java DSL 配置入站适配器的示例:
```
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public IntegrationFlow ftpInboundFlow() {
return IntegrationFlows
.from(Ftp.inboundAdapter(this.ftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilename(f -> f.toUpperCase() + ".a")
.localDirectory(new File("d:\\ftp_files")),
e -> e.id("ftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}
```
#### 处理不完整数据
见[处理不完整的数据](./file.html#file-incomplete)。
提供`FtpSystemMarkerFilePresentFileListFilter`以过滤远程系统上没有相应标记文件的远程文件。有关配置信息,请参见[Javadoc](https://docs.spring.io/spring-integration/api/org/springframework/integration/ftp/filters/FtpSystemMarkerFilePresentFileListFilter.html)(并浏览到父类)。
### FTP 流媒体入站通道适配器
版本 4.3 引入了流入站通道适配器。此适配器生成的消息的有效负载类型为`InputStream`,这样就可以在不将文件写入本地文件系统的情况下获取文件。由于会话保持打开,所以当文件已被消费时,消费应用程序负责关闭会话。会话在`closeableResource`报头(`IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE`)中提供。标准框架组件,例如`FileSplitter`和`StreamTransformer`,会自动关闭会话。有关这些组件的更多信息,请参见[文件拆分器](./file.html#file-splitter)和[流变压器](./transformer.html#stream-transformer)。下面的示例展示了如何配置`inbound-streaming-channel-adapter`:
```
```
只有`filename-pattern`、`filename-regex`、`filter`或`filter-expression`中的一个是允许的。
| |从版本 5.0 开始,默认情况下,`FtpStreamingMessageSource`适配器将防止基于内存`FtpPersistentAcceptOnceFileListFilter`的`SimpleMetadataStore`的远程文件的重复。
默认情况下,该过滤器还将与文件名模式(或 regex)一起应用。
如果你需要允许重复,你可以使用`AcceptAllFileListFilter`。
任何其他用例都可以由`CompositeFileListFilter`(或`ChainFileListFilter`)处理。
Java 配置([稍后在文档中](#ftp-streaming-java))显示了一种在处理后删除远程文件以避免重复的技术。|
|---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
有关`FtpPersistentAcceptOnceFileListFilter`及其使用方式的更多信息,请参见[远程持久文件列表过滤器](./file.html#remote-persistent-flf)。
使用`max-fetch-size`属性来限制在需要进行获取时在每个轮询中获取的文件数量。将其设置为`1`,并在集群环境中运行时使用持久过滤器。有关更多信息,请参见[入站通道适配器:控制远程文件获取](#ftp-max-fetch)。
适配器将远程目录和文件名分别放入`FileHeaders.REMOTE_DIRECTORY`和`FileHeaders.REMOTE_FILE`标题中。从版本 5.0 开始,`FileHeaders.REMOTE_FILE_INFO`头提供了额外的远程文件信息(默认情况下用 JSON 表示)。如果将`FtpStreamingMessageSource`上的`fileInfoJson`属性设置为`false`,则头包含一个`FtpFileInfo`对象。可以使用`FtpFileInfo.getFileInfo()`方法访问底层 Apache 网库提供的`FTPFile`对象。当你使用 XML 配置时,`fileInfoJson`属性是不可用的,但是你可以通过将`FtpStreamingMessageSource`注入到你的一个配置类中来设置它。另见[远程文件信息](#ftp-remote-file-info)。
从版本 5.1 开始,`comparator`的通用类型是`FTPFile`。以前,它是`AbstractFileInfo`。这是因为排序现在是在处理的较早阶段执行的,在过滤和应用`maxFetch`之前。
#### 使用 Java 配置进行配置
Spring 以下引导应用程序展示了如何使用 Java 配置配置入站适配器的示例:
```
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource ftpMessageSource() {
FtpStreamingMessageSource messageSource = new FtpStreamingMessageSource(template());
messageSource.setRemoteDirectory("ftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}
@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}
@Bean
public FtpRemoteFileTemplate template() {
return new FtpRemoteFileTemplate(ftpSessionFactory());
}
@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}
@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}
}
```
请注意,在本例中,Transformer 下游的消息处理程序有一个建议,该建议在处理后删除远程文件。
### 入站通道适配器:轮询多个服务器和目录
从版本 5.0.7 开始,`RotatingServerAdvice`是可用的;当配置为 Poller 建议时,入站适配器可以轮询多个服务器和目录。配置建议,并将其正常添加到 Poller 的建议链中。a`DelegatingSessionFactory`用于选择服务器,有关更多信息,请参见[Delegating Session Factory](#ftp-dsf)。通知配置由`RotationPolicy.KeyDirectory`对象列表组成。
例子
```
@Bean
public RotatingServerAdvice advice() {
List keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}
```
此建议将轮询服务器`foo`上的目录`one`,直到没有新文件存在,然后移动到目录`bar`,然后是服务器`baz`上的目录`two`,等等。
可以使用`公平`构造函数 arg 修改此默认行为:
fair
```
@Bean
public RotatingServerAdvice advice() {
...
return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}
```
在这种情况下,无论上一次投票是否返回了文件,通知都将移动到下一个服务器/目录。
或者,你可以提供自己的`RotationPolicy`,以便根据需要重新配置消息源:
政策
```
public interface RotationPolicy {
void beforeReceive(MessageSource> source);
void afterReceive(boolean messageReceived, MessageSource> source);
}
```
and
习惯
```
@Bean
public RotatingServerAdvice advice() {
return new RotatingServerAdvice(myRotationPolicy());
}
```
`local-filename-generator-expression`属性(同步器上的`localFilenameGeneratorExpression`)现在可以包含`#remoteDirectory`变量。这允许将从不同目录检索到的文件下载到本地类似的目录中:
```
@Bean
public IntegrationFlow flow() {
return IntegrationFlows.from(Ftp.inboundAdapter(sf())
.filter(new FtpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localDirectory(new File(tmpDir))
.localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
}
```
| |使用此建议时,不要在 poller 上配置`TaskExecutor`;有关更多信息,请参见[消息源的条件 Poller](./polling-consumer.html#conditional-pollers)。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
### 入站通道适配器:控制远程文件获取
在配置入站通道适配器时,应该考虑两个属性。`max-messages-per-poll`,与所有 Poller 一样,可以用来限制在每个轮询上发出的消息的数量(如果超过配置的值已经准备好)。`max-fetch-size`(自版本 5.0 起)可以限制一次从远程服务器检索到的文件的数量。
以下场景假定起始状态是一个空的本地目录:
* `max-messages-per-poll=2`和`max-fetch-size=1`:适配器获取一个文件,发出它,获取下一个文件,发出它,然后睡眠,直到下一个投票。
* `max-messages-per-poll=2`和`max-fetch-size=2`):适配器获取这两个文件,然后发射每个文件。
* `max-messages-per-poll=2`和`max-fetch-size=4`:适配器获取最多四个文件(如果可用)并发出前两个文件(如果至少有两个)。接下来的两个文件将在下一次投票时发出。
* `max-messages-per-poll=2`和`max-fetch-size`未指定:适配器获取所有远程文件并发出前两个文件(如果至少有两个)。后续的文件将在后续的轮询中发出(一次两个)。当所有文件都被消耗时,将再次尝试远程获取,以获取任何新的文件。
| |当部署一个应用程序的多个实例时,我们建议使用一个小的`max-fetch-size`,以避免一个实例“抓取”所有文件并使其他实例无法使用。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
`max-fetch-size`的另一种用法是,如果你想停止获取远程文件,而是继续处理已经获取的文件。在`MessageSource`上设置`maxFetchSize`属性(以编程方式使用 JMX,或使用[控制总线](./control-bus.html#control-bus))可以有效地阻止适配器获取更多文件,但可以让 Poller 继续为先前已获取的文件发送消息。如果属性更改时 poller 处于活动状态,则更改将在下一次投票时生效。
从版本 5.1 开始,同步器可以提供`Comparator`。这在限制使用`maxFetchSize`获取的文件数量时很有用。
### FTP 出站通道适配器
FTP 出站通道适配器依赖于一个`MessageHandler`实现,该实现连接到 FTP 服务器,并为它在传入消息的有效负载中接收的每个文件发起 FTP 传输。它还支持一个文件的几种表示,因此你不仅限于`java.io.File`类型的有效负载。FTP 出站通道适配器支持以下有效负载:
* `java.io.File`:实际的文件对象
* `byte[]`:表示文件内容的字节数组
* `java.lang.String`:表示文件内容的文本
* `java.io.InputStream`:要传输到远程文件的数据流
* `org.springframework.core.io.Resource`:用于将数据传输到远程文件的资源
下面的示例展示了如何配置`outbound-channel-adapter`:
```
```
前面的配置显示了如何通过使用`outbound-channel-adapter`元素来配置 FTP 出站通道适配器,同时还提供各种属性的值,例如`filename-generator`(`o.s.i.file.FileNameGenerator`策略接口的实现)、对`session-factory`的引用以及其他属性。你还可以看到一些`*expression`属性的示例,这些属性允许你使用 SPEL 来配置设置,例如`remote-directory-expression`、`temporary-remote-directory-expression`和`remote-filename-generator-expression`(上一个示例中显示的`filename-generator`的 SPEL 替代)。与允许使用 SPEL 的任何组件一样,可以通过“有效负载”和“头”变量访问有效负载和消息头。有关可用属性的更多详细信息,请参见[schema](https://github.com/spring-projects/spring-integration/tree/main/spring-integration-core/src/main/resources/org/springframework/integration/config)。
| |默认情况下,如果没有指定文件名生成器, Spring 集成使用`o.s.i.file.DefaultFileNameGenerator`,`DefaultFileNameGenerator`根据`file_name`报头的值(如果存在的话)确定文件名,或者,如果消息的有效负载已经是`java.io.File`,它使用了该文件的原始名称。|
|---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| |定义某些值(例如`remote-directory`)可能与平台或 FTP 服务器有关。
例如,正如[https://forum.spring.io/showthread.php?p=333478&posted=1#post333478](https://forum.spring.io/showthread.php?p=333478&posted=1#post333478)上报告的那样,在某些平台上,必须在目录定义的末尾添加斜杠(例如,`remote-directory="/thing1/thing2/"`而不是`remote-directory="/thing1/thing2"`)。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
从版本 4.1 开始,你可以在传输文件时指定`mode`。默认情况下,现有文件将被覆盖。模式由`FileExistsMode`枚举定义,其中包括以下值:
* `REPLACE`(默认)
* `REPLACE_IF_MODIFIED`
* `APPEND`
* `APPEND_NO_FLUSH`
* `IGNORE`
* `FAIL`
`IGNORE`和`FAIL`不传输文件。`FAIL`会导致抛出异常,而`IGNORE`会默默地忽略传输(尽管会产生`DEBUG`日志条目)。
版本 5.2 引入了`chmod`属性,你可以使用该属性在上传后更改远程文件权限。你可以使用常规的 UNIX 八进制格式(例如,`600`仅允许文件所有者进行读写)。当使用 Java 配置适配器时,可以使用`setChmodOctal("600")`或`setChmod(0600)`。仅当你的 FTP 服务器支持`SITE CHMOD`子命令时才适用。
#### 避免部分写入的文件
处理文件传输时出现的一个常见问题是处理部分文件的可能性。也就是说,一个文件可能会在其传输真正完成之前出现在文件系统中。
Spring 为了处理这个问题,集成 FTP 适配器使用一种常见的算法:文件以临时名称传输,然后在文件完全传输后重新命名。
默认情况下,正在传输过程中的每个文件都会在文件系统中出现一个附加的后缀,默认情况下,后缀是`.writing`。你可以通过设置`temporary-file-suffix`属性来更改此后缀。
但是,在某些情况下,你可能不想使用这种技术(例如,如果服务器不允许重命名文件)。对于这样的情况,你可以通过将`use-temporary-file-name`设置为`false`来禁用此功能(默认值为`true`)。当此属性`false`时,文件将以其最终名称写入,并且使用该应用程序的应用程序需要一些其他机制来检测该文件是否已完全上传,然后才能访问它。
#### 使用 Java 配置进行配置
Spring 以下引导应用程序展示了如何使用 Java 配置配置出站适配器的示例:
```
@SpringBootApplication
@IntegrationComponentScan
public class FtpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToFtp(new File("/foo/bar.txt"));
}
@Bean
public SessionFactory ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory(sf);
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
FtpMessageHandler handler = new FtpMessageHandler(ftpSessionFactory());
handler.setRemoteDirectoryExpressionString("headers['remote-target-dir']");
handler.setFileNameGenerator(new FileNameGenerator() {
@Override
public String generateFileName(Message> message) {
return "handlerContent.test";
}
});
return handler;
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(File file);
}
}
```
#### 使用 Java DSL 进行配置
Spring 以下引导应用程序展示了如何使用 Java DSL 配置出站适配器的示例:
```
@SpringBootApplication
@IntegrationComponentScan
public class FtpJavaApplication {
public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToFtp(new File("/foo/bar.txt"));
}
@Bean
public SessionFactory ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory(sf);
}
@Bean
public IntegrationFlow ftpOutboundFlow() {
return IntegrationFlows.from("toFtpChannel")
.handle(Ftp.outboundAdapter(ftpSessionFactory(), FileExistsMode.FAIL)
.useTemporaryFileName(false)
.fileNameExpression("headers['" + FileHeaders.FILENAME + "']")
.remoteDirectory(this.ftpServer.getTargetFtpDirectory().getName())
).get();
}
@MessagingGateway
public interface MyGateway {
@Gateway(requestChannel = "toFtpChannel")
void sendToFtp(File file);
}
}
```
### FTP 出站网关
FTP 出站网关提供了一组有限的命令来与远程 FTP 或 FTPS 服务器进行交互。支持的命令是:
* `ls`(列表文件)
* `nlst`(列出文件名)
* `get`(检索文件)
* `mget`(检索文件)
* `rm`(删除文件)
* `mv`(移动/重命名文件)
* `put`(发送文件)
* `mput`(发送多个文件)
#### 使用`ls`命令
`ls`列出了远程文件并支持以下选项:
* `-1`:检索文件名列表。默认值是检索`FileInfo`对象的列表。
* `-a`:包含所有文件(包括以“.”开头的文件)
* `-f`:不要对列表进行排序
* `-dirs`:包括目录(默认情况下不包括目录)
* `-links`:包括符号链接(默认情况下不包括这些链接)
* `-R`:递归列出远程目录
此外,以与`inbound-channel-adapter`相同的方式提供了文件名过滤。见[FTP 入站通道适配器](#ftp-inbound)。
由`ls`操作产生的消息有效负载是文件名列表或`FileInfo`对象列表。这些对象提供诸如修改时间、权限和其他详细信息等信息。
`ls`命令所作用的远程目录在`file_remoteDirectory`报头中提供。
当使用递归选项(`-R`)时,`fileName`包括任意子目录元素,表示文件的相对路径(相对于远程目录)。如果包含`-dirs`选项,那么每个递归目录也将作为列表中的一个元素返回。在这种情况下,建议你不要使用`-1`选项,因为你将无法区分文件和目录,这可以通过`FileInfo`对象来实现。
从版本 4.3 开始,`FtpSession`对`list()`和`listNames()`方法支持`null`。因此,你可以省略`expression`属性。为了方便起见,Java 配置有两个不具有`expression`参数的构造函数。或`LS`、`NLST`、`PUT`和`MPUT`命令,根据 FTP 协议,`null`被视为客户机工作目录。所有其他命令都必须提供`expression`,以根据请求消息计算远程路径。在扩展`DefaultFtpSessionFactory`并实现`postProcessClientAfterConnect()`回调时,可以使用`FTPClient.changeWorkingDirectory()`函数设置工作目录。
#### 使用`nlst`命令
版本 5 引入了对`nlst`命令的支持。
`nlst`列出了远程文件名,并且只支持一个选项:
* `-f`:不对列表进行排序
由`nlst`操作产生的消息有效负载是一个文件名列表。
`nlst`命令所作用的远程目录在`file_remoteDirectory`报头中提供。
与[`ls`命令](#ftp-using-ls)的`-1`选项不同,该选项使用`LIST`命令,`nlst`命令向目标 FTP 服务器发送`NLST`命令。当服务器不支持`LIST`(例如由于安全限制)时,此命令很有用。`nlst`操作的结果是没有其他详细信息的名称。因此,框架不能确定一个实体是否是一个目录,例如执行筛选或递归列表。
#### 使用`get`命令
`get`检索远程文件。它支持以下选项:
* `-P`:保留远程文件的时间戳。
* `-stream`:以流的形式检索远程文件。
* `-D`:成功传输后删除远程文件。如果忽略传输,则不会删除远程文件,因为`FileExistsMode`是`IGNORE`,并且本地文件已经存在。
`file_remoteDirectory`头提供远程目录名,`file_remoteFile`头提供文件名。
由`get`操作产生的消息有效负载是一个`File`对象,该对象表示检索到的文件,或者当你使用`-stream`选项时是一个`InputStream`对象。`-stream`选项允许将文件作为流检索。对于文本文件,一个常见的用例是将此操作与[文件拆分器](./file.html#file-splitter)或[流变压器](./transformer.html#stream-transformer)结合起来。当以流的形式使用远程文件时,你要负责在流被使用后关闭`Session`。为了方便起见,`Session`在`closeableResource`标头中提供了`closeableResource`标头,你可以在`IntegrationMessageHeaderAccessor`上使用方便方法访问它。下面的示例展示了如何使用方便方法:
```
Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource();
if (closeable != null) {
closeable.close();
}
```
框架组件如[文件拆分器](./file.html#file-splitter)和[流变压器](./transformer.html#stream-transformer)在数据传输后自动关闭会话。
下面的示例展示了如何将文件作为流使用:
```
```
| |如果在自定义组件中使用输入流,则必须关闭`Session`。
可以在自定义代码中关闭,也可以通过将消息的副本路由到`service-activator`并使用 SPEL 来关闭,如下例所示:|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
```
```
#### 使用`mget`命令
`mget`基于模式检索多个远程文件,并支持以下选项:
* `-P`:保留远程文件的时间戳。
* `-R`:递归地检索整个目录树。
* `-x`:如果没有与模式匹配的文件,则抛出异常(否则将返回空列表)。
* `-D`:成功传输后删除每个远程文件。如果忽略传输,则不会删除远程文件,因为`FileExistsMode`是`IGNORE`,并且本地文件已经存在。
由`mget`操作产生的消息负载是`List`对象(即`List`的`List`对象,每个对象代表一个检索到的文件)。
| |从版本 5.0 开始,如果`FileExistsMode`是`IGNORE`,则输出消息的有效负载不再包含由于文件已经存在而未被获取的文件。
以前,该列表包含所有文件,包括已经存在的文件。|
|---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
用于确定远程路径的表达式应该产生一个以``**结尾的结果-例如,`somedir/`** 将在`somedir`下获取完整的树。
从版本 5.0 开始,递归的`mget`与新的`FileExistsMode.REPLACE_IF_MODIFIED`模式相结合,可以用于在本地定期同步整个远程目录树。无论`-P`(保留时间戳)选项如何,此模式都会用远程时间戳替换本地文件上一次修改的时间戳。
| |Using recursion (`-R`)
The pattern is ignored, and `*` is assumed.
By default, the entire remote tree is retrieved.
However, files in the tree can be filtered, by providing a `FileListFilter`.
Directories in the tree can also be filtered this way.
A `FileListFilter` can be provided by reference, by `filename-pattern`, or by `filename-regex` attributes.
For example, `filename-regex="(subDir|.*1.TXT)“` retrieves all files ending with `1.TXT` in the remote directory and the `subdir` child directory.
However, the next example shows an alternative, which version 5.0 made available.
If a subdirectory is filtered, no additional traversal of that subdirectory is performed.
The `-dirs` option is not allowed (the recursive `mget` uses the recursive `ls` to obtain the directory tree, so the directories themselves cannot be included in the list).
Typically, you would use the `#remoteDirectory` variable in the `local-directory-expression` 使远程目录结构在本地保留。|
|---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
持久文件列表过滤器现在有一个布尔属性`forRecursion`。将此属性设置为`true`,还将设置`alwaysAcceptDirectories`,这意味着出站网关上的递归操作(`ls`和`mget`)现在每次都将遍历完整目录树。这是为了解决未检测到目录树中深层更改的问题。此外,`forRecursion=true`会导致文件的完整路径被用作元数据存储键;这解决了一个问题,即如果同名文件在不同的目录中多次出现,则过滤器无法正常工作。重要提示:这意味着,对于顶层目录下的文件,将找不到持久性元数据存储中的现有密钥。由于这个原因,默认情况下,该属性是`false`;这可能会在将来的版本中发生变化。
从版本 5.0 开始,`FtpSimplePatternFileListFilter`和`FtpRegexPatternFileListFilter`可以通过将`alwaysAcceptDirectories`属性设置为`true`来配置为始终传递目录。这样做允许对简单模式进行递归,如下例所示:
```
```
一旦定义了前面示例中的过滤器,就可以通过在网关上设置`filter`属性来使用过滤器。
另请参见[出站网关部分成功(`mget`和`mput`)]。
#### 使用`put`命令
`put`commad 将文件发送到远程服务器。消息的有效负载可以是`java.io.File`、`byte[]`或`String`。使用`remote-filename-generator`(或表达式)为远程文件命名。其他可用的属性包括`remote-directory`,`temporary-remote-directory`,以及它们的`*-expression`等价物:`use-temporary-file-name`和`auto-create-directory`。有关更多信息,请参见[schema](https://github.com/spring-projects/spring-integration/tree/main/spring-integration-core/src/main/resources/org/springframework/integration/config)文档。
由`put`操作产生的消息有效负载是一个`String`,它表示服务器上传输后文件的完整路径。
5.2 版本引入了`chmod`属性,该属性在上传后更改远程文件权限。你可以使用常规的 UNIX 八进制格式(例如,`600`仅允许文件所有者进行读写)。当使用 Java 配置适配器时,可以使用`setChmod(0600)`。仅当你的 FTP 服务器支持`SITE CHMOD`子命令时才适用。
#### 使用`mput`命令
`mput`将多个文件发送到服务器,并且只支持一个选项:
* `-R`:递归。发送目录及其子目录中的所有文件(可能已过滤)。
消息有效负载必须是表示本地目录的`java.io.File`(或`String`)。从版本 5.1 开始,还支持`File`或`String`的集合。
此命令支持与[`put`命令](#ftp-put-command)相同的属性。此外,可以使用`mput-pattern`、`mput-regex`、`mput-filter`或`mput-filter-expression`中的一个对本地目录中的文件进行过滤。只要子目录本身通过筛选器,筛选器就可以使用递归。不通过筛选器的子目录不会被递归。
由`mput`操作产生的消息有效负载是`List`对象(即由传输产生的远程文件路径的`List`)。
另请参见[出站网关部分成功(`mget`和`mput`)]。
版本 5.2 引入了`chmod`属性,它允许你在上传后更改远程文件权限。你可以使用常规的 UNIX 八进制格式(例如,`600`仅允许文件所有者进行读写)。当使用 Java 配置适配器时,可以使用`setChmodOctal("600")`或`setChmod(0600)`。仅当你的 FTP 服务器支持`SITE CHMOD`子命令时才适用。
#### 使用`rm`命令
`rm`命令删除文件。
`rm`命令没有选项。
如果删除成功,则`rm`操作产生的消息有效负载为`Boolean.TRUE`,否则为`Boolean.FALSE`。`file_remoteDirectory`头提供远程目录,`file_remoteFile`头提供文件名。
#### 使用`mv`命令
`mv`命令移动文件。
`mv`命令没有选项。
`expression`属性定义了“from”路径,`rename-expression`属性定义了“to”路径。默认情况下,`rename-expression`是`headers['file_renameTo']`。这个表达式不能计算为空或空`String`。如果需要,将创建任何必要的远程目录。结果消息的有效负载是`Boolean.TRUE`。`file_remoteDirectory`头提供原始远程目录,`file_remoteFile`头提供文件名。新路径位于`file_renameTo`标头中。
从版本 5.5.6 开始,`remoteDirectoryExpression`可以在`mv`命令中使用,以方便使用。如果“from”文件不是完整的文件路径,则将`remoteDirectoryExpression`的结果用作远程目录。这同样适用于“to”文件,例如,如果任务只是重命名某个目录中的远程文件。
#### 有关 FTP 出站网关命令的其他信息
`get`和`mget`命令支持`local-filename-generator-expression`属性。它定义了一个 SPEL 表达式,以便在传输过程中生成本地文件的名称。求值上下文的根对象是请求消息。对于`mget`特别有用的`remoteFileName`变量也是可用的——例如,`local-filename-generator-expression="#remoteFileName.toUpperCase() + headers.something"`。
`get`和`mget`命令支持`local-directory-expression`属性。它定义了一个 SPEL 表达式,以便在传输过程中生成本地目录的名称。求值上下文的根对象是请求消息 but。对于`mget`特别有用的`remoteDirectory`变量也是可用的——例如:`local-directory-expression="'/tmp/local/' + #remoteDirectory.toUpperCase() + headers.something"`。这个属性与`local-directory`属性是互斥的。
对于所有命令,网关的“表达式”属性提供了命令执行的路径。对于`mget`命令,表达式可以计算为’**’,意思是检索所有文件,或’SomeDirectory/**’,依此类推。
下面的示例显示了为`ls`命令配置的网关:
```
```
发送到`toSplitter`通道的消息的有效负载是`String`对象的列表,每个对象都包含一个文件的名称。如果省略了`command-options`属性,则它持有`FileInfo`对象。它使用空格分隔的选项——例如,`command-options="-1 -dirs -links"`。
从版本 4.2 开始,`GET`、`MGET`、`PUT`和`MPUT`命令支持`FileExistsMode`属性(使用名称空间支持时`mode`)。这会影响当本地文件存在(`GET`和`MGET`)或远程文件存在(`PUT`和`MPUT`)时的行为。支持的模式有`REPLACE`、`APPEND`、`FAIL`和`IGNORE`。对于向后兼容性,`PUT`和`MPUT`操作的默认模式是`REPLACE`。对于`GET`和`MGET`操作,缺省值是`FAIL`。
从版本 5.0 开始,`setWorkingDirExpression()`(在 XML 中为`working-dir-expression`)选项在`FtpOutboundGateway`(在 XML 中为``)上提供。它允许你在运行时更改客户机工作目录。表达式根据请求消息进行求值。在每次网关操作之后,都会恢复以前的工作目录。
#### 使用 Java 配置进行配置
Spring 以下引导应用程序展示了如何使用 Java 配置出站网关的示例:
```
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory(sf);
}
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler handler() {
FtpOutboundGateway ftpOutboundGateway =
new FtpOutboundGateway(ftpSessionFactory(), "ls", "'my_remote_dir/'");
ftpOutboundGateway.setOutputChannelName("lsReplyChannel");
return ftpOutboundGateway;
}
}
```
#### 使用 Java DSL 进行配置
Spring 以下引导应用程序展示了如何使用 Java DSL 配置出站网关的示例:
```
@SpringBootApplication
public class FtpJavaApplication {
public static void main(String[] args) {
new SpringApplicationBuilder(FtpJavaApplication.class)
.web(false)
.run(args);
}
@Bean
public SessionFactory ftpSessionFactory() {
DefaultFtpSessionFactory sf = new DefaultFtpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
sf.setTestSession(true);
return new CachingSessionFactory(sf);
}
@Bean
public FtpOutboundGatewaySpec ftpOutboundGateway() {
return Ftp.outboundGateway(ftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE)
.regexFileNameFilter("(subFtpSource|.*1.txt)")
.localDirectoryExpression("'localDirectory/' + #remoteDirectory")
.localFilenameExpression("#remoteFileName.replaceFirst('ftpSource', 'localTarget')");
}
@Bean
public IntegrationFlow ftpMGetFlow(AbstractRemoteFileOutboundGateway ftpOutboundGateway) {
return f -> f
.handle(ftpOutboundGateway)
.channel(c -> c.queue("remoteFileOutputChannel"));
}
}
```
#### 出站网关部分成功(`mget`和`mput`)
在对多个文件执行操作时(通过使用`mget`和`mput`),在传输了一个或多个文件后的一段时间内可能会发生异常。在这种情况下(从版本 4.2 开始),将抛出一个`PartialSuccessException`。除了通常的`MessagingException`属性(`failedMessage`和`cause`)外,该异常还具有两个附加属性:
* `partialResults`:成功的转移结果。
* `derivedInput`:从请求消息中生成的文件列表(例如,要传输的本地文件为`mput`)。
这些属性允许你确定哪些文件已成功传输,哪些文件未成功传输。
在递归`mput`的情况下,`PartialSuccessException`可能嵌套了`PartialSuccessException`出现。
考虑以下目录结构:
```
root/
|- file1.txt
|- subdir/
| - file2.txt
| - file3.txt
|- zoo.txt
```
如果异常发生在`file3.txt`上,则网关抛出的`PartialSuccessException`具有`derivedInput`of`file1.txt`,`subdir`,以及`zoo.txt`和`partialResults`of`file1.txt`。其`cause`是另一个`PartialSuccessException`与`derivedInput`之`file2.txt`和`file3.txt`之`partialResults`之`file2.txt`之。
### ftp会话缓存
| |从 Spring Integration3.0 开始,默认情况下不再缓存会话。
端点不再支持`cache-sessions`属性。
如果你希望缓存会话,则必须使用`CachingSessionFactory`(如下一个示例所示)。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
在 3.0 之前的版本中,默认情况下会话是自动缓存的。有一个`cache-sessions`属性可用于禁用自动缓存,但该解决方案没有提供一种配置其他会话缓存属性的方法。例如,你无法限制创建的会话的数量。为了支持该需求和其他配置选项,添加了`CachingSessionFactory`。它提供`sessionCacheSize`和`sessionWaitTimeout`属性。`sessionCacheSize`属性控制工厂在其缓存中维护的活动会话的数量(缺省情况是无界的)。如果已经达到`sessionCacheSize`阈值,则试图获取另一个会话的任何尝试都会阻塞,直到其中一个缓存的会话变得可用,或者直到一个会话的等待时间过期(默认的等待时间是`Integer.MAX_VALUE`)。`sessionWaitTimeout`属性配置该值。
如果你希望你的会话被缓存,那么按照前面的描述配置你的默认会话工厂,然后将其包装在`CachingSessionFactory`的实例中,在该实例中你可以提供这些附加属性。下面的示例展示了如何做到这一点:
```
```
前面的示例显示了一个`CachingSessionFactory`,该`sessionCacheSize`设置为`10`,而`sessionWaitTimeout`设置为一秒(其值以毫秒为单位)。
从 Spring Integration3.0 开始,`CachingConnectionFactory`提供了一个`resetCache()`方法。当调用时,所有空闲会话都会立即关闭,而当正在使用的会话被返回到缓存时,它们会被关闭。新的届会请求在必要时设立新的届会。
从版本 5.1 开始,`CachingSessionFactory`有一个新的属性`testSession`。当为真时,会话将通过发送一个 Noop 命令来进行测试,以确保它仍然处于活动状态;如果不是,它将从缓存中删除;如果缓存中没有活动会话,则创建一个新的会话。
### 使用`RemoteFileTemplate`
从 Spring Integration3.0 开始,在`FtpSession`对象上提供了一个新的抽象。该模板提供了发送、检索(作为`InputStream`)、删除和重命名文件的方法。此外还提供了一种方法,允许调用者在会话上执行多个操作。在所有情况下,模板都能可靠地关闭会话。有关更多信息,请参见[Javadoc for`RemoteFileTemplate`](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/file/remote/remotefiletemplate.html)。FTP 有一个子类:`FtpRemoteFileTemplate`。
版本 4.1 增加了额外的方法,包括`getClientInstance()`,它提供对底层`FTPClient`的访问,从而使你能够访问低级 API。
并不是所有的 FTP 服务器都正确地实现了`STAT `命令。对于不存在的路径,有些返回正结果。当路径是一个文件并且它存在时,`NLST`命令可靠地返回该名称。但是,这不支持检查空目录是否存在,因为当路径是目录时,`NLST`总是返回一个空列表。由于模板不知道路径是否表示目录,因此当路径似乎不存在时(当使用`NLST`时),它必须执行额外的检查。这增加了开销,需要向服务器发出多个请求。从版本 4.1.9 开始,`FtpRemoteFileTemplate`提供了`FtpRemoteFileTemplate.ExistsMode`属性,该属性具有以下选项:
* `STAT`:执行`STAT`FTP 命令(`FTPClient.getStatus(path)`)检查路径是否存在。这是默认的,要求你的 FTP 服务器正确地支持`STAT`命令(带有路径)。
* `NLST`:执行`NLST`FTP 命令—`FTPClient.listName(path)`。如果你正在测试的路径是文件的完整路径,请使用此方法。它不适用于空目录。
* `NLST_AND_DIRS`:首先执行`NLST`命令,如果不返回文件,则返回使用`FTPClient.changeWorkingDirectory(path)`临时切换工作目录的技术。有关更多信息,请参见[`FtpSession.exists()`](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/ftp/会话/ftpsession.html#exists)。
由于我们知道`FileExistsMode.FAIL`情况总是只查找文件(而不是目录),因此我们可以安全地使用`NLST`模式来处理`FtpMessageHandler`和`FtpOutboundGateway`组件。
对于任何其他情况,`FtpRemoteFileTemplate`可以扩展为在重写的`exist()`方法中实现自定义逻辑。
从版本 5.0 开始,新的`RemoteFileOperations.invoke(OperationsCallback action)`方法可用。这个方法允许在相同的、以线程为界的`Session`的范围内调用几个`RemoteFileOperations`调用。当你需要将`RemoteFileTemplate`作为一个工作单元执行多个高级操作时,这是非常有用的。例如,`AbstractRemoteFileOutboundGateway`将其与`mput`命令实现一起使用,其中我们对提供的目录中的每个文件执行`put`操作,并递归地对其子目录执行该操作。有关更多信息,请参见[Javadoc](https://docs.spring.io/spring-integration/api/org/springframework/integration/file/remote/RemoteFileOperations.html#invoke)。
### 使用`MessageSessionCallback`
从 Spring Integration4.2 开始,你可以使用带有``(在 Java 中是`FtpOutboundGateway`)的`MessageSessionCallback`实现来执行带有`requestMessage`上下文的`Session`上的任何操作。它可以用于任何非标准或低级别的 FTP 操作,并允许从集成流定义和功能接口实现注入进行访问,如以下示例所示:
```
@Bean
@ServiceActivator(inputChannel = "ftpChannel")
public MessageHandler ftpOutboundGateway(SessionFactory sessionFactory) {
return new FtpOutboundGateway(sessionFactory,
(session, requestMessage) -> session.list(requestMessage.getPayload()));
}
```
另一个示例可能是对要发送或检索的文件数据进行预处理或后处理。
当使用 XML 配置时,``提供一个`session-callback`属性,让你指定`MessageSessionCallback` Bean 名称。
| |`session-callback`与`command`和`expression`属性是互斥的。
当使用 Java 进行配置时,在[`FtpOutboundGateway`](https://DOCS. Spring.io/ Spring-integration/api/org/boundspringframework/integration/ftp/gateway/ftpoutgateway.html)类中可以使用不同的构造函数。|
|---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
### Apache Mina FTP 服务器事件
在版本 5.2 中添加的`ApacheMinaFtplet`监听某些 Apache Mina FTP 服务器事件,并将其发布为`ApplicationEvent`s,该事件可以由任何`ApplicationListener` Bean、`@EventListener` Bean 方法或[事件入站通道适配器](./event.html#appevent-inbound)方法接收。
目前支持的活动有:
* `SessionOpenedEvent`-打开了一个客户端会话
* `DirectoryCreatedEvent`-创建了一个目录
* `FileWrittenEvent`-一个文件被写入到
* `PathMovedEvent`-重命名了一个文件或目录
* `PathRemovedEvent`-删除了一个文件或目录
* `SessionClosedEvent`-客户端已断开连接
每一个都是`ApacheMinaFtpEvent`的子类;你可以配置一个侦听器来接收所有的事件类型。每个事件的`source`属性是`FtpSession`,你可以从它获得诸如客户机地址之类的信息;在抽象事件上提供了一个方便的`getSession()`方法。
会话open/close 以外的事件具有另一个属性`FtpRequest`,该属性具有命令和参数等属性。
要用侦听器(必须是 Spring Bean)配置服务器,请将其添加到服务器工厂:
```
FtpServerFactory serverFactory = new FtpServerFactory();
...
ListenerFactory factory = new ListenerFactory();
...
serverFactory.addListener("default", factory.createListener());
serverFactory.setFtplets(new HashMap<>(Collections.singletonMap("springFtplet", apacheMinaFtpletBean)));
server = serverFactory.createServer();
server.start();
```
要使用 Spring 集成事件适配器来使用这些事件:
```
@Bean
public ApplicationEventListeningMessageProducer eventsAdapter() {
ApplicationEventListeningMessageProducer producer =
new ApplicationEventListeningMessageProducer();
producer.setEventTypes(ApacheMinaFtpEvent.class);
producer.setOutputChannel(eventChannel());
return producer;
}
```
### 远程文件信息
从版本 5.2 开始,`FtpStreamingMessageSource`([FTP 流入站通道适配器](#ftp-streaming))、`FtpInboundFileSynchronizingMessageSource`([FTP 入站通道适配器](#ftp-inbound))和“read”---命令的`FtpOutboundGateway`([FTP 出站网关](#ftp-outbound-gateway))在消息中提供额外的头,以生成有关远程文件的信息:
* `FileHeaders.REMOTE_HOST_PORT`-在文件传输操作期间,远程会话已连接到的 host:port pair;
* `FileHeaders.REMOTE_DIRECTORY`-已执行操作的远程目录;
* `FileHeaders.REMOTE_FILE`-远程文件名;仅适用于单个文件操作。
由于`FtpInboundFileSynchronizingMessageSource`不会针对远程文件生成消息,而是使用本地副本,因此`AbstractInboundFileSynchronizer`在同步操作期间以 URI 样式(`protocol://host:port/remoteDirectory#remoteFileName`)在`MetadataStore`(可以在外部配置)中存储有关远程文件的信息。在对本地文件进行轮询时,将通过`FtpInboundFileSynchronizingMessageSource`检索此元数据。当本地文件被删除时,建议删除其元数据条目。`AbstractInboundFileSynchronizer`为此提供了一个`removeRemoteFileMetadata()`回调。此外还有一个`setMetadataStorePrefix()`要在元数据中使用的键。建议将该前缀与基于`MetadataStore`的`FileListFilter`实现中使用的前缀不同,当相同的`MetadataStore`实例在这些组件之间共享时,为了避免条目重写,因为 Filter 和`AbstractInboundFileSynchronizer`都对元数据项使用相同的本地文件名。