# SFTP 适配器 ## SFTP 适配器 Spring 集成为 SFTP 上的文件传输操作提供了支持。 安全文件传输协议(英语:Secure File Transfer Protocol,SFTP)是一种网络协议,允许你通过任何可靠的流在互联网上的两台计算机之间传输文件。 SFTP 协议需要一个安全的通道(如 SSH),并且在 SFTP会话中对客户端身份的可见性。 Spring 集成通过提供三个客户端端点来支持通过 SFTP 发送和接收文件:入站通道适配器、出站通道适配器和出站网关。它还提供了方便的名称空间配置来定义这些客户机组件。 你需要在项目中包含此依赖项: Maven ``` org.springframework.integration spring-integration-sftp 5.5.9 ``` Gradle ``` compile "org.springframework.integration:spring-integration-sftp:5.5.9" ``` 要在 XML 配置中包含 SFTP 命名空间,请在根元素上包含以下属性: ``` xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp" xsi:schemaLocation="http://www.springframework.org/schema/integration/sftp https://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd" ``` ### sftp会话工厂 | |从版本 3.0 开始,默认情况下会话不再缓存。
参见[SFTP Session Caching](#sftp-session-caching)。| |---|-------------------------------------------------------------------------------------------------------------------| 在配置 SFTP 适配器之前,你必须配置一个 SFTP会话工厂。你可以使用常规的 Bean 定义来配置 SFTP会话工厂,如下例所示: ``` ``` 每当适配器从其`SessionFactory`请求会话对象时,就会创建一个新的 SFTP会话。在这种情况下,SFTP会话工厂依赖于[JSch](http://www.jcraft.com/jsch)库来提供 SFTP 功能。 然而, Spring 集成还支持对 SFTP 会话的缓存。有关更多信息,请参见[SFTP Session Caching](#sftp-session-caching)。 | |JSCH 支持在与服务器的连接上进行多个通道(操作),
默认情况下, Spring 集成会话工厂为每个通道使用单独的物理连接,
自 Spring 集成 3.0 以来,你可以配置会话工厂(使用布尔构造函数 arg-default`false`)来使用到服务器的单个连接,并在该单个连接上创建多个`JSch`通道。,

当使用此功能时,你必须将会话工厂包装在缓存会话工厂中,作为[稍后描述](#sftp-session-caching),以便当一个操作完成时连接不是物理关闭的。

如果缓存被重置,会话仅在关闭最后一个通道时断开连接。

如果在新操作获得会话时发现断开连接,则刷新该连接。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| | |如果你遇到连接问题,并且希望跟踪会话创建并查看轮询的会话,则可以通过将记录器设置为`TRACE`级别(例如,`log4j.category.org.springframework.integration.sftp=TRACE`)来启用跟踪。
参见[SFTP/JSCH 日志记录](#sftp-jsch-logging)。| |---|--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 现在,你需要做的就是将这个 SFTP会话工厂注入到适配器中。 | |为 SFTP会话工厂提供值的一种更实用的方法是使用 Spring 的[属性占位符支持](https://docs.spring.io/spring/docs/current/spring-framework-reference/core.html#beans-factory-placeholderconfigurer)。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| #### 配置属性 下面的列表描述了[`DefaultSftpSessionFactory`](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/sftp/会话/defaultsftpsessionfactory.html)所公开的所有属性。 `isSharedSession`(构造函数参数)::当`true`时,使用单个连接,并且`JSch Channels`是多路复用的。它的默认值为`false`。 `clientVersion`::允许你设置客户机版本属性。它的默认值取决于底层的 JSCH 版本,但它看起来会是:*SSH-2.0-JSCH-0.1.45* `enableDaemonThread`::如果`true`,所有线程都是守护进程线程。如果设置为`false`,则使用普通的非守护进程线程。此属性设置在底层[session](https://epaul.github.io/jsch-documentation/javadoc/com/jcraft/jsch/Session.html)上。在这里,此属性默认为`false`。 `host`:你想要连接的主机的 URL。必须的。 `hostKeyAlias`:设置主机键的别名,在将主机键与已知的主机列表进行比较时使用该别名。 `knownHostsResource`::指定用于主密钥存储库的文件资源。该文件的格式与 OpenSSH 的`known_hosts`文件相同,并且是必需的,如果`allowUnknownKeys`为 false,则必须预先填充该文件。 `password`:对远程主机进行身份验证的密码。如果没有提供密码,则需要`privateKey`属性。如果你设置`userInfo`,则不允许这样做。密码是从那个对象获得的。 `port`:应在其上建立 SFTP 连接的端口。如果没有指定,该值默认为`22`。如果指定,此属性必须是正数。 `privateKey`::允许你设置一个[resource](https://docs.spring.io/spring/docs/current/javadoc-api/org/springframework/core/io/Resource.html),该位置表示用于对远程主机进行身份验证的私钥的位置。如果不提供`privateKey`,则需要`password`属性。 `privateKeyPassphrase`:私钥的密码。如果设置`userInfo`,则不允许设置`privateKeyPassphrase`。密码是从那个物体上取得的。可选的。 `proxy`::允许指定基于 JSCH 的[proxy](https://epaul.github.com/jsch-documentation/javadoc/com/jcraft/jsch/Proxy.html)。如果设置了,代理对象将用于通过代理创建到远程主机的连接。有关配置代理的方便方法,请参见[Proxy Factory Bean](#sftp-proxy-factory-bean)。 `serverAliveCountMax`::指定服务器激活消息的数量,这些消息在断开连接之前不需要服务器的任何回复就可以发送。如果未设置,此属性默认为`1`。 `serverAliveInterval`:设置发送服务器激活消息之前的超时间隔(以毫秒为单位),以防服务器未接收到任何消息。 `sessionConfig`::通过使用`Properties`,你可以在底层 JSCH会话上设置额外的配置设置。 `socketFactory`::让你传入一个[`SocketFactory`](https://epaul.github.com/jsch-documentation/javadoc/com/jcraft/jsch/socketfactory.html)。套接字工厂用于创建到目标主机的套接字。当使用代理时,套接字工厂将传递给代理。默认情况下,使用的是普通的 TCP 套接字。 `timeout`::超时属性用作套接字超时参数,以及默认的连接超时。默认值为`0`,这意味着不会发生超时。 `user`:远程用户使用。必须的。 `allowUnknownKeys`::设置为`true`,以允许使用未知(或更改)键连接到主机。它的默认值是“false”。只有在不提供`userInfo`的情况下,才应用它。如果`false`,则需要一个预先填充的`knownHosts`文件。 `userInfo`:设置一个在身份验证期间使用的自定义`UserInfo`。特别地,当接收到未知(或更改的)主机键时,将调用`promptYesNo()`。另见[`allowUnknownKeys`]。当你提供一个`UserInfo`时,`password`和私钥`passphrase`是从它获得的,并且你不能设置离散的`password`和`privateKeyPassphrase`属性。 ### 代理工厂 Bean `Jsch`提供了一种通过 HTTP 或 SOCKS 代理连接到服务器的机制。要使用此功能,请配置`Proxy`,并提供对`DefaultSftpSessionFactory`的引用,如前面讨论的那样。由`Jsch`提供三种实现方式:`HTTP`、`SOCKS4`和`SOCKS5`。 Spring Integration4.3 引入了`FactoryBean`,通过允许属性注入来简化这些代理的配置,如下例所示: ``` ... ... ``` ### 委托会话工厂 版本 4.2 引入了`DelegatingSessionFactory`,它允许在运行时选择实际的会话工厂。在调用 SFTP 端点之前,你可以在工厂上调用`setThreadKey()`将一个键与当前线程关联。然后使用该键查找要使用的实际会话工厂。使用后,你可以通过调用`clearThreadKey()`清除密钥。 我们添加了方便的方法,这样你就可以更容易地从消息流中实现这一点,如下例所示: ``` ``` | |当使用会话缓存(参见[SFTP Session Caching](#sftp-session-caching))时,每个委托都应该被缓存。
你不能缓存`DelegatingSessionFactory`本身。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 从版本 5.0.7 开始,`DelegatingSessionFactory`可以与`RotatingServerAdvice`一起用于轮询多个服务器;请参见[入站通道适配器:轮询多个服务器和目录](#sftp-rotating-server-advice)。 ### SFTP会话缓存 | |从 Spring Integration Version3.0 开始,默认情况下不再缓存会话。
端点不再支持`cache-sessions`属性。
如果你希望缓存会话,则必须使用`CachingSessionFactory`(参见下一个示例)。| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 在 3.0 之前的版本中,默认情况下会话是自动缓存的。有一个`cache-sessions`属性可用于禁用自动缓存,但该解决方案没有提供一种配置其他会话-缓存属性的方法。例如,你无法限制创建的会话的数量。为了支持该需求和其他配置选项,我们添加了`CachingSessionFactory`。它提供`sessionCacheSize`和`sessionWaitTimeout`属性。顾名思义,`sessionCacheSize`属性控制工厂在其缓存中维护的活动会话的数量(默认情况是无界的)。如果已经达到`sessionCacheSize`阈值,则试图获取另一个会话块,直到其中一个缓存的会话变得可用,或者直到一个会话的等待时间过期(默认的等待时间是`Integer.MAX_VALUE`)。`sessionWaitTimeout`属性允许配置等待时间。 如果你希望你的会话被缓存,那么配置你的默认会话工厂(作为[前面描述的](#sftp-session-factory)),然后将其包装到`CachingSessionFactory`的实例中,你可以在其中提供这些附加属性。下面的示例展示了如何做到这一点: ``` ``` 前面的示例创建一个`CachingSessionFactory`,其`sessionCacheSize`设置为`10`,其`sessionWaitTimeout`设置为一秒(1000 毫秒)。 从 Spring 集成版本 3.0 开始,`CachingConnectionFactory`提供了一个`resetCache()`方法。当调用时,所有空闲会话都会立即关闭,而当正在使用的会话被返回到缓存时,它们会被关闭。当使用`isSharedSession=true`时,只有当最后一个通道关闭时,通道才关闭,共享会话才关闭。新的届会请求在必要时设立新的届会。 从版本 5.1 开始,`CachingSessionFactory`有一个新的属性`testSession`。当为真时,会话将通过执行`stat(getHome())`命令来进行测试,以确保它仍然处于活动状态;如果不是,则将从缓存中删除它;如果缓存中没有活动会话,则将创建一个新的会话。 ### 使用`RemoteFileTemplate` Spring 集成版本 3.0 在`SftpSession`对象上提供了一个新的抽象。该模板提供了发送、检索(作为`InputStream`)、删除和重命名文件的方法。此外,我们提供了一个`execute`方法来让调用方在会话上运行多个操作。在所有情况下,模板都会可靠地关闭会话。有关更多信息,请参见[Javadoc for`RemoteFileTemplate`](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/file/file/remotefiletemplate.html)SFTP 有一个子类:[`SftpRemoteFileTemplate`](https://DOCS. Spring.template/ Spring-integration/api/org/springframework/integration/sftepramework/sftefilefilemplate/sp/会话/sf 我们在版本 4.1 中添加了其他方法,包括`getClientInstance()`。它提供了对底层`ChannelSftp`的访问,从而能够访问底层 API。 5.0 版引入了`RemoteFileOperations.invoke(OperationsCallback action)`方法。这个方法允许在同一个线程有界的`Session`的范围内调用几个`RemoteFileOperations`调用。当你需要将`RemoteFileTemplate`作为一个工作单元执行多个高级操作时,这是非常有用的。例如,`AbstractRemoteFileOutboundGateway`将其与`mput`命令实现一起使用,其中我们对提供的目录中的每个文件执行`put`操作,并递归地对其子目录执行该操作。有关更多信息,请参见[Javadoc](https://docs.spring.io/spring-integration/api/org/springframework/integration/file/remote/RemoteFileTemplate.html#invoke-org.springframework.integration.file.remote.OperationsCallback-)。 ### SFTP 入站通道适配器 SFTP 入站通道适配器是一种特殊的侦听器,它连接到服务器并侦听远程目录事件(例如正在创建的新文件),此时它将启动文件传输。下面的示例展示了如何配置 SFTP 入站通道适配器: ``` ``` 前面的配置示例展示了如何为各种属性提供值,包括以下内容: * `local-directory`:文件要传输到的位置 * `remote-directory`:文件将从其中传输的远程源目录 * `session-factory`:对我们之前配置的 Bean 的引用 默认情况下,传输的文件带有与原始文件相同的名称。如果要重写此行为,可以设置`local-filename-generator-expression`属性,该属性允许你提供一个 SPEL 表达式来生成本地文件的名称。与出站网关和适配器(SPEL 求值上下文的根对象是`Message`)不同,此入站适配器在求值时还没有消息,因为这是它最终以传输的文件作为有效负载生成的消息。因此,SPEL 求值上下文的根对象是远程文件的原始名称(a`String`)。 入站通道适配器首先将文件检索到本地目录,然后根据 Poller 配置发出每个文件。从版本 5.0 开始,当需要新的文件检索时,可以限制从 SFTP 服务器获取的文件的数量。当目标文件很大时,或者在具有持久文件列表过滤器的集群系统中运行时,这可能是有益的,本文将在后面讨论。为此,请使用`max-fetch-size`。负值(默认值)意味着没有限制,所有匹配的文件都会被检索到。有关更多信息,请参见[入站通道适配器:控制远程文件获取](#sftp-max-fetch)。从版本 5.0 开始,你还可以通过设置`scanner`属性,为`inbound-channel-adapter`提供一个自定义的`DirectoryScanner`实现。 从 Spring Integration3.0 开始,你可以指定`preserve-timestamp`属性(默认值是`false`)。当`true`时,本地文件的修改时间戳被设置为从服务器检索到的值。否则,它被设置为当前时间。 从版本 4.2 开始,你可以指定`remote-directory-expression`而不是`remote-directory`,这使你可以动态地确定每个轮询上的目录——例如,`remote-directory-expression="@myBean.determineRemoteDir()"`。 有时,基于通过`filename-pattern`属性指定的简单模式的文件过滤可能还不够。如果是这种情况,可以使用`filename-regex`属性来指定正则表达式(例如,`filename-regex=".*\.test$"`)。如果需要完整的控制,可以使用`filter`属性提供对`org.springframework.integration.file.filters.FileListFilter`的自定义实现的引用,这是用于过滤文件列表的策略接口。此筛选器确定要检索哪些远程文件。你还可以使用`CompositeFileListFilter`将基于模式的过滤器与其他过滤器(例如`AcceptOnceFileListFilter`,以避免同步先前已获取的文件)结合起来。 `AcceptOnceFileListFilter`将其状态存储在内存中。如果你希望该状态在系统重新启动后仍然有效,请考虑使用`SftpPersistentAcceptOnceFileListFilter`代替。这个过滤器将接受的文件名存储在`MetadataStore`策略的实例中(参见[元数据存储](./meta-data-store.html#metadata-store))。此筛选器匹配文件名和远程修改时间。 从版本 4.0 开始,这个过滤器需要`ConcurrentMetadataStore`。当与共享数据存储一起使用时(例如`Redis`与`RedisMetadataStore`),这使得过滤器键可以在多个应用程序或服务器实例之间共享。 从版本 5.0 开始,对于`SftpInboundFileSynchronizer`,默认情况下会应用带有内存`SimpleMetadataStore`的`SftpPersistentAcceptOnceFileListFilter`。这个筛选器也将应用于 XML 配置中的`regex`或`pattern`选项,以及 Java DSL 中的`SftpInboundChannelAdapterSpec`选项。你可以通过使用`CompositeFileListFilter`(或`ChainFileListFilter`)来处理任何其他用例。 上面的讨论涉及在检索文件之前对文件进行过滤。检索完文件后,将对文件系统上的文件应用一个额外的过滤器。默认情况下,这是一个`AcceptOnceFileListFilter`,正如本节讨论的那样,它在内存中保留状态,并且不考虑文件的修改时间。除非你的应用程序在处理后删除文件,否则在应用程序重新启动后,适配器默认会重新处理磁盘上的文件。 另外,如果将`filter`配置为使用`SftpPersistentAcceptOnceFileListFilter`并更改远程文件的时间戳(导致重新获取它),则默认的本地过滤器不允许处理此新文件。 有关此过滤器的详细信息,以及如何使用它,请参见[远程持久文件列表过滤器](./file.html#remote-persistent-flf)。 你可以使用`local-filter`属性来配置本地文件系统过滤器的行为。从版本 4.3.8 开始,默认情况下配置了`FileSystemPersistentAcceptOnceFileListFilter`。该过滤器将接受的文件名和修改的时间戳存储在`MetadataStore`策略的实例中(参见[元数据存储](./meta-data-store.html#metadata-store)),并检测本地文件修改时间的更改。默认的`MetadataStore`是在内存中存储状态的`SimpleMetadataStore`。 从版本 4.1.5 开始,这些过滤器有一个新的属性`flushOnUpdate`,这会导致它们在每次更新时刷新元数据存储(如果存储实现`Flushable`)。 | |此外,如果使用分布式`MetadataStore`(例如[Redis 元数据存储](./redis.html#redis-metadata-store)或[Gemfire 元数据存储](./gemfire.html#gemfire-metadata-store)),则可以使用同一个适配器或应用程序的多个实例,并确保只有一个实例处理一个文件。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 实际的本地过滤器是一个`CompositeFileListFilter`,其中包含提供的过滤器和一个模式过滤器,该过滤器防止正在下载过程中的文件被处理(基于`temporary-file-suffix`)。文件是用这个后缀下载的(默认值是`.writing`),当传输完成时,文件被重命名为它们的最终名称,使它们对过滤器“可见”。 有关这些属性的更多详细信息,请参见[schema](https://github.com/spring-projects/spring-integration/tree/main/spring-integration-core/src/main/resources/org/springframework/integration/config)。 SFTP 入站通道适配器是一个轮询消费者。因此,你必须配置一个 Poller(一个全局默认值或一个局部元素)。一旦文件被传输到本地目录,将生成一条以`java.io.File`为有效负载类型的消息,并将其发送到由`channel`属性标识的通道。 #### 更多关于文件过滤和大文件的信息 有时,刚刚出现在监视(远程)目录中的文件是不完整的。通常,这样的文件是以某种临时扩展名编写的(例如在名为`something.txt.writing`的文件上使用`.writing`),然后在编写过程完成后重新命名。在大多数情况下,开发人员只对完整的文件感兴趣,并且只想过滤那些文件。要处理这些场景,可以使用`filename-pattern`、`filename-regex`和`filter`属性提供的过滤支持。如果需要自定义过滤器实现,可以通过设置`filter`属性在适配器中包含引用。下面的示例展示了如何做到这一点: ``` ``` #### 从故障中恢复 你应该了解适配器的体系结构。文件同步器获取文件,`FileReadingMessageSource`为每个同步文件发出一条消息。由于[前面讨论过](#sftp-inbound),涉及两个过滤器。`filter`属性(和模式)引用远程文件列表,以避免获取已经被获取的文件。`FileReadingMessageSource`使用`local-filter`来确定哪些文件将作为消息发送。 同步器列出远程文件并查看其过滤器。然后文件被转移。如果在文件传输过程中发生 IO 错误,那么已经添加到过滤器中的所有文件都将被删除,以便在下一次投票时有资格重新获取它们。这仅在过滤器实现`ReversibleFileListFilter`(例如`AcceptOnceFileListFilter`)时才适用。 如果在同步文件后,处理文件的下游流出现错误,则不会自动回滚过滤器,因此默认情况下不会重新处理失败的文件。 如果你希望在失败后重新处理此类文件,则可以使用类似于以下的配置,以便于从筛选器中删除失败的文件: ``` ``` 前面的配置适用于任何`ResettableFileListFilter`。 从版本 5.0 开始,入站通道适配器可以根据生成的本地文件名在本地构建子目录。这也可以是一个远程子路径。为了能够根据层次结构支持递归地读取本地目录以进行修改,你现在可以基于`Files.walk()`算法提供一个内部`FileReadingMessageSource`和一个新的`RecursiveDirectoryScanner`。参见[`AbstractInboundFileSynchronizingMessageSource.setScanner()`](https://DOCS. Spring.io/ Spring-integration/api/org/springframework/integration/file/remote/synchronizer/abstractinboundfilesynchronizingmessagesource.html#setscanner-org.springframework.integration.file.directoryscanner)了解更多信息。此外,你现在可以通过使用`setUseWatchService()`选项将`AbstractInboundFileSynchronizingMessageSource`切换到基于`WatchService`的`DirectoryScanner`。它还被配置为所有`WatchEventType`实例,以便对本地目录中的任何修改做出反应。前面显示的再处理示例基于`FileReadingMessageSource.WatchServiceDirectoryScanner`的内置功能,当从本地目录中删除文件(`StandardWatchEventKinds.ENTRY_DELETE`)时,它使用`ResettableFileListFilter.remove()`。有关更多信息,请参见[`WatchServiceDirectoryScanner`](./file.html#watch-service-directory-scanner)。 #### 使用 Java 配置进行配置 Spring 以下引导应用程序展示了如何使用 Java 配置入站适配器的示例: ``` @SpringBootApplication public class SftpJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(SftpJavaApplication.class) .web(false) .run(args); } @Bean public SessionFactory sftpSessionFactory() { DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true); factory.setHost("localhost"); factory.setPort(port); factory.setUser("foo"); factory.setPassword("foo"); factory.setAllowUnknownKeys(true); factory.setTestSession(true); return new CachingSessionFactory(factory); } @Bean public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() { SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory()); fileSynchronizer.setDeleteRemoteFiles(false); fileSynchronizer.setRemoteDirectory("foo"); fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml")); return fileSynchronizer; } @Bean @InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000")) public MessageSource sftpMessageSource() { SftpInboundFileSynchronizingMessageSource source = new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer()); source.setLocalDirectory(new File("sftp-inbound")); source.setAutoCreateLocalDirectory(true); source.setLocalFilter(new AcceptOnceFileListFilter()); source.setMaxFetchSize(1); return source; } @Bean @ServiceActivator(inputChannel = "sftpChannel") public MessageHandler handler() { return new MessageHandler() { @Override public void handleMessage(Message message) throws MessagingException { System.out.println(message.getPayload()); } }; } } ``` #### 使用 Java DSL 进行配置 Spring 以下引导应用程序展示了如何使用 Java DSL 配置入站适配器的示例: ``` @SpringBootApplication public class SftpJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(SftpJavaApplication.class) .web(false) .run(args); } @Bean public IntegrationFlow sftpInboundFlow() { return IntegrationFlows .from(Sftp.inboundAdapter(this.sftpSessionFactory) .preserveTimestamp(true) .remoteDirectory("foo") .regexFilter(".*\\.txt$") .localFilenameExpression("#this.toUpperCase() + '.a'") .localDirectory(new File("sftp-inbound")), e -> e.id("sftpInboundAdapter") .autoStartup(true) .poller(Pollers.fixedDelay(5000))) .handle(m -> System.out.println(m.getPayload())) .get(); } } ``` #### 处理不完整数据 见[处理不完整的数据](./file.html#file-incomplete)。 `SftpSystemMarkerFilePresentFileListFilter`用于过滤远程系统上没有相应标记文件的远程文件。有关配置信息,请参见[Javadoc](https://docs.spring.io/spring-integration/api/org/springframework/integration/sftp/filters/SftpSystemMarkerFilePresentFileListFilter.html)。 ### SFTP 流媒体入站通道适配器 版本 4.3 引入了流入站通道适配器。这个适配器生成的消息的有效负载类型为`InputStream`,这样你就可以在不写到本地文件系统的情况下获取文件。由于会话保持打开,所以当文件已被消费时,消费应用程序负责关闭会话。会话在`closeableResource`报头(`IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE`)中提供。标准框架组件,例如`FileSplitter`和`StreamTransformer`,会自动关闭会话。有关这些组件的更多信息,请参见[文件拆分器](./file.html#file-splitter)和[流变压器](./transformer.html#stream-transformer)。下面的示例展示了如何配置 SFTP 流入站通道适配器: ``` ``` 你只能使用`filename-pattern`、`filename-regex`、`filter`或`filter-expression`中的一个。 | |从版本 5.0 开始,默认情况下,`SftpStreamingMessageSource`适配器通过使用基于内存`SimpleMetadataStore`的`SftpPersistentAcceptOnceFileListFilter`来防止远程文件的重复。
默认情况下,该过滤器还与文件名模式(或正则表达式)一起应用。,如果你需要允许重复,
,你可以使用`AcceptAllFileListFilter`。
你可以通过使用`CompositeFileListFilter`(或`ChainFileListFilter`)来处理任何其他用例。
Java 配置[稍后显示](#sftp-streaming-java-config)显示了一种在处理后删除远程文件的技术,从而避免重复。| |---|------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 有关`SftpPersistentAcceptOnceFileListFilter`及其使用方式的更多信息,请参见[远程持久文件列表过滤器](./file.html#remote-persistent-flf)。 你可以使用`max-fetch-size`属性来限制在需要获取时在每个轮询中获取的文件数量。将其设置为`1`,并在集群环境中运行时使用持久过滤器。有关更多信息,请参见[入站通道适配器:控制远程文件获取](#sftp-max-fetch)。 适配器将远程目录和文件名分别放入头文件中(`FileHeaders.REMOTE_DIRECTORY`和`FileHeaders.REMOTE_FILE`)。从版本 5.0 开始,`FileHeaders.REMOTE_FILE_INFO`头提供了额外的远程文件信息(在 JSON 中)。如果将`SftpStreamingMessageSource`上的`fileInfoJson`属性设置为`false`,则头包含一个`SftpFileInfo`对象。你可以使用`SftpFileInfo.getFileInfo()`方法访问底层 JSCH 库提供的`LsEntry`对象。当你使用 XML 配置时,`fileInfoJson`属性是不可用的,但是你可以通过将`SftpStreamingMessageSource`注入到你的一个配置类中来设置它。另见[远程文件信息](#sftp-remote-file-info)。 从版本 5.1 开始,`comparator`的通用类型是`LsEntry`。在此之前,它是`AbstractFileInfo`。这是因为排序现在是在处理的较早阶段执行的,在过滤和应用`maxFetch`之前。 #### 使用 Java 配置进行配置 Spring 以下引导应用程序展示了如何使用 Java 配置入站适配器的示例: ``` @SpringBootApplication public class SftpJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(SftpJavaApplication.class) .web(false) .run(args); } @Bean @InboundChannelAdapter(channel = "stream") public MessageSource ftpMessageSource() { SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template()); messageSource.setRemoteDirectory("sftpSource/"); messageSource.setFilter(new AcceptAllFileListFilter<>()); messageSource.setMaxFetchSize(1); return messageSource; } @Bean @Transformer(inputChannel = "stream", outputChannel = "data") public org.springframework.integration.transformer.Transformer transformer() { return new StreamTransformer("UTF-8"); } @Bean public SftpRemoteFileTemplate template() { return new SftpRemoteFileTemplate(sftpSessionFactory()); } @ServiceActivator(inputChannel = "data", adviceChain = "after") @Bean public MessageHandler handle() { return System.out::println; } @Bean public ExpressionEvaluatingRequestHandlerAdvice after() { ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice(); advice.setOnSuccessExpression( "@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])"); advice.setPropagateEvaluationFailures(true); return advice; } } ``` 请注意,在本例中,Transformer 下游的消息处理程序有一个建议,该建议在处理后删除远程文件。 ### 入站通道适配器:轮询多个服务器和目录 从版本 5.0.7 开始,`RotatingServerAdvice`是可用的;当配置为 Poller 建议时,入站适配器可以轮询多个服务器和目录。配置建议,并将其正常添加到 Poller 的建议链中。a`DelegatingSessionFactory`用于选择服务器,有关更多信息,请参见[Delegating Session Factory](./ftp.html#ftp-dsf)。通知配置由`RotationPolicy.KeyDirectory`对象列表组成。 例子 ``` @Bean public RotatingServerAdvice advice() { List keyDirectories = new ArrayList<>(); keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo")); keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar")); keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz")); keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux")); keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz")); keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz")); return new RotatingServerAdvice(delegatingSf(), keyDirectories); } ``` 此建议将轮询服务器`foo`上的目录`one`,直到没有新文件存在,然后移动到目录`bar`,然后在服务器`baz`上的目录`two`,等等。 可以使用`公平`构造函数 arg 修改此默认行为: fair ``` @Bean public RotatingServerAdvice advice() { ... return new RotatingServerAdvice(delegatingSf(), keyDirectories, true); } ``` 在这种情况下,无论上一次投票是否返回了文件,通知都将移动到下一个服务器/目录。 或者,你可以提供自己的`RotationPolicy`,以便根据需要重新配置消息源: 政策 ``` public interface RotationPolicy { void beforeReceive(MessageSource source); void afterReceive(boolean messageReceived, MessageSource source); } ``` and 习惯 ``` @Bean public RotatingServerAdvice advice() { return new RotatingServerAdvice(myRotationPolicy()); } ``` `local-filename-generator-expression`属性(同步器上的`localFilenameGeneratorExpression`)现在可以包含`#remoteDirectory`变量。这允许将从不同目录检索到的文件下载到本地类似的目录中: ``` @Bean public IntegrationFlow flow() { return IntegrationFlows.from(Sftp.inboundAdapter(sf()) .filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate")) .localDirectory(new File(tmpDir)) .localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root") .remoteDirectory("."), e -> e.poller(Pollers.fixedDelay(1).advice(advice()))) .channel(MessageChannels.queue("files")) .get(); } ``` | |使用此建议时,不要在 Poller 上配置`TaskExecutor`;有关更多信息,请参见[消息源的条件 Poller](./polling-consumer.html#conditional-pollers)。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ### 入站通道适配器:控制远程文件获取 在配置入站通道适配器时,应该考虑两个属性。`max-messages-per-poll`,与所有的 Poller 一样,可以用来限制在每个轮询上发出的消息的数量(如果超过配置的值已经准备好)。`max-fetch-size`(自版本 5.0 起)可以限制一次从远程服务器检索到的文件的数量。 以下场景假定起始状态是一个空的本地目录: * `max-messages-per-poll=2`和`max-fetch-size=1`:适配器获取一个文件,发出它,获取下一个文件,并发出它。然后它会一直睡到下一次投票。 * `max-messages-per-poll=2`和`max-fetch-size=2`):适配器获取这两个文件,然后发射每个文件。 * `max-messages-per-poll=2`和`max-fetch-size=4`:适配器获取最多 4 个文件(如果可用)并发出前两个文件(如果至少有两个)。接下来的两个文件将在下一次投票时发出。 * `max-messages-per-poll=2`和`max-fetch-size`未指定:适配器获取所有远程文件并发出前两个文件(如果至少有两个)。后续的文件将在后续的轮询中发出(一次两个)。当所有的文件都用完后,将再次尝试远程获取,以获取任何新的文件。 | |当你部署一个应用程序的多个实例时,我们建议设置一个小的`max-fetch-size`,以避免一个实例“抓取”所有文件并使其他实例无法使用。| |---|---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| `max-fetch-size`的另一种用法是,当你想要停止获取远程文件,但要继续处理已经获取的文件时。在`MessageSource`上设置`maxFetchSize`属性(通过编程方式,通过 JMX 或通过[控制总线](./control-bus.html#control-bus))可以有效地阻止适配器获取更多文件,但可以让 Poller 继续为以前已获取的文件发送消息。如果属性更改时 poller 处于活动状态,则更改将在下一次投票时生效。 从版本 5.1 开始,同步器可以提供`Comparator`。这在限制使用`maxFetchSize`获取的文件数量时很有用。 ### SFTP 出站通道适配器 SFTP 出站通道适配器是一种特殊的`MessageHandler`,它连接到远程目录,并为它作为传入`Message`的有效负载接收的每个文件发起文件传输。它还支持文件的几种表示方式,因此不限于`File`对象。与 FTP 出站适配器类似,SFTP 出站通道适配器支持以下有效负载: * `java.io.File`:实际的文件对象 * `byte[]`:表示文件内容的字节数组 * `java.lang.String`:表示文件内容的文本 * `java.io.InputStream`:要传输到远程文件的数据流 * `org.springframework.core.io.Resource`:用于将数据传输到远程文件的资源 下面的示例展示了如何配置 SFTP 出站通道适配器: ``` ``` 有关这些属性的更多详细信息,请参见[schema](https://github.com/spring-projects/spring-integration/tree/main/spring-integration-core/src/main/resources/org/springframework/integration/config)。 #### spel 和 SFTP 出站适配器 与 Spring 集成中的许多其他组件一样,在配置 SFTP 出站通道适配器时,可以使用 Spring 表达式语言,方法是指定两个属性:`remote-directory-expression`和`remote-filename-generator-expression`([前面描述的](#sftp-inbound))。表达式求值上下文将消息作为其根对象,它允许你使用表达式,这些表达式可以基于消息中的数据(来自“payload”或“headers”)动态计算文件名或现有目录路径。在前面的示例中,我们使用一个表达式值来定义`remote-filename-generator-expression`属性,该表达式值根据原始名称计算文件名,同时还附加一个后缀:“-mysuffix”。 从版本 4.1 开始,你可以在传输文件时指定`mode`。默认情况下,现有文件将被覆盖。模式由`FileExistsMode`枚举定义,其中包括以下值: * `REPLACE`(默认) * `REPLACE_IF_MODIFIED` * `APPEND` * `APPEND_NO_FLUSH` * `IGNORE` * `FAIL` 使用`IGNORE`和`FAIL`时,文件不会被传输。`FAIL`会导致抛出异常,而`IGNORE`会静默地忽略传输(尽管生成了`DEBUG`日志条目)。 4.3 版本引入了`chmod`属性,你可以使用该属性在上传后更改远程文件权限。你可以使用常规的 UNIX 八进制格式(例如,`600`仅允许文件所有者进行读写)。当使用 Java 配置适配器时,可以使用`setChmodOctal("600")`或`setChmod(0600)`。 #### 避免部分写入的文件 处理文件传输时的常见问题之一是处理部分文件的可能性。文件在实际传输完成之前可能会出现在文件系统中。 Spring 为了处理这个问题,集成 SFTP 适配器使用一种常见的算法,在这种算法中,文件以临时名称进行传输,并在文件完全传输后重新命名。 默认情况下,每个正在传输过程中的文件都会出现在文件系统中,并带有一个附加的后缀,默认情况下,后缀是`.writing`。可以通过设置`temporary-file-suffix`属性进行更改。 但是,在某些情况下,你可能不想使用这种技术(例如,如果服务器不允许重命名文件)。对于这样的情况,可以通过将`use-temporary-file-name`设置为`false`来禁用此功能(默认值为`true`)。当此属性`false`时,文件将以其最终名称写入,而使用该属性的应用程序需要其他一些机制来检测该文件是否已完全上传,然后才能访问它。 #### 使用 Java 配置进行配置 Spring 以下引导应用程序展示了如何使用 Java 配置出站适配器的示例: ``` @SpringBootApplication @IntegrationComponentScan public class SftpJavaApplication { public static void main(String[] args) { ConfigurableApplicationContext context = new SpringApplicationBuilder(SftpJavaApplication.class) .web(false) .run(args); MyGateway gateway = context.getBean(MyGateway.class); gateway.sendToSftp(new File("/foo/bar.txt")); } @Bean public SessionFactory sftpSessionFactory() { DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true); factory.setHost("localhost"); factory.setPort(port); factory.setUser("foo"); factory.setPassword("foo"); factory.setAllowUnknownKeys(true); factory.setTestSession(true); return new CachingSessionFactory(factory); } @Bean @ServiceActivator(inputChannel = "toSftpChannel") public MessageHandler handler() { SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory()); handler.setRemoteDirectoryExpressionString("headers['remote-target-dir']"); handler.setFileNameGenerator(new FileNameGenerator() { @Override public String generateFileName(Message message) { return "handlerContent.test"; } }); return handler; } @MessagingGateway public interface MyGateway { @Gateway(requestChannel = "toSftpChannel") void sendToSftp(File file); } } ``` #### 使用 Java DSL 进行配置 Spring 以下引导应用程序展示了如何使用 Java DSL 配置出站适配器的示例: ``` @SpringBootApplication public class SftpJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(SftpJavaApplication.class) .web(false) .run(args); } @Bean public IntegrationFlow sftpOutboundFlow() { return IntegrationFlows.from("toSftpChannel") .handle(Sftp.outboundAdapter(this.sftpSessionFactory, FileExistsMode.FAIL) .useTemporaryFileName(false) .remoteDirectory("/foo") ).get(); } } ``` ### SFTP 出站网关 SFTP 出站网关提供了一组有限的命令,允许你与远程 SFTP 服务器进行交互: * `ls`(列表文件) * `nlst`(列出文件名) * `get`(检索文件) * `mget`(检索多个文件) * `rm`(删除文件) * `mv`(移动和重命名文件) * `put`(发送文件) * `mput`(发送多个文件) #### 使用`ls`命令 `ls`列出了远程文件并支持以下选项: * `-1`:检索文件名列表。默认值是检索`FileInfo`对象的列表 * `-a`:包含所有文件(包括以“.”开头的文件) * `-f`:不对列表进行排序 * `-dirs`:包括目录(默认排除) * `-links`:包括符号链接(默认排除) * `-R`:递归列出远程目录 此外,文件名过滤是以与`inbound-channel-adapter`相同的方式提供的。 由`ls`操作产生的消息有效负载是文件名列表或`FileInfo`对象列表(取决于你是否使用`-1`开关)。这些对象提供诸如修改时间、权限等信息。 `ls`命令所作用的远程目录在`file_remoteDirectory`报头中提供。 当使用递归选项(`-R`)时,`fileName`包括任意子目录元素,并表示文件的相对路径(相对于远程目录)。如果使用`-dirs`选项,那么每个递归目录也将作为列表中的一个元素返回。在这种情况下,我们建议你不要使用`-1`选项,因为你将无法区分文件和目录,这在使用`FileInfo`对象时是可以做到的。 #### 使用`nlst`命令 版本 5 引入了对`nlst`命令的支持。 `nlst`列出远程文件名,并且只支持一个选项: * `-f`:不要对列表进行排序 由`nlst`操作产生的消息有效负载是一个文件名列表。 `file_remoteDirectory`头保存了执行`nlst`命令的远程目录。 SFTP 协议不提供列出名称的功能。此命令相当于带有`-1`选项的`ls`命令,在此添加此命令是为了方便。 #### 使用`get`命令 `get`检索远程文件并支持以下选项: * `-P`:保留远程文件的时间戳。 * `-stream`:以流的形式检索远程文件。 * `-D`:成功传输后删除远程文件。如果忽略传输,则不会删除远程文件,因为`FileExistsMode`是`IGNORE`,并且本地文件已经存在。 `file_remoteDirectory`头保存远程目录,`file_remoteFile`头保存文件名。 由`get`操作产生的消息有效负载是表示检索到的文件的`File`对象。如果使用`-stream`选项,则有效负载是`InputStream`,而不是`File`。对于文本文件,一个常见的用例是将此操作与[文件拆分器](./file.html#file-splitter)或[流变压器](./transformer.html#stream-transformer)结合起来。当以流的形式使用远程文件时,你要负责在流被使用后关闭`Session`。为了方便起见,`Session`在`closeableResource`头中提供了`IntegrationMessageHeaderAccessor`提供了方便的方法: ``` Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource(); if (closeable != null) { closeable.close(); } ``` 框架组件,例如[文件拆分器](./file.html#file-splitter)和[流变压器](./transformer.html#stream-transformer),在传输数据后自动关闭会话。 下面的示例展示了如何将文件作为流使用: ``` ``` | |如果在自定义组件中使用输入流,则必须关闭`Session`。
你可以在自定义代码中这样做,也可以将消息的副本路由到`service-activator`并使用 SPEL,如下例所示:| |---|-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ``` ``` #### 使用`mget`命令 `mget`基于模式检索多个远程文件,并支持以下选项: * `-P`:保留远程文件的时间戳。 * `-R`:递归地检索整个目录树。 * `-x`:如果没有与模式匹配的文件,则抛出异常(否则,将返回一个空列表)。 * `-D`:成功传输后删除每个远程文件。如果忽略传输,则不会删除远程文件,因为`FileExistsMode`是`IGNORE`,并且本地文件已经存在。 由`mget`操作产生的消息有效负载是一个`List`对象(即`List`的`File`对象,每个对象代表一个检索到的文件)。 | |从版本 5.0 开始,如果`FileExistsMode`是`IGNORE`,则输出消息的有效负载不再包含由于文件已经存在而未被获取的文件。
以前,该数组包含所有文件,包括已经存在的文件。| |---|----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 你使用的表达式确定远程路径应该产生一个以``**结尾的结果,例如`myfiles/`** 在`myfiles`下获取完整的树。 从版本 5.0 开始,你可以使用递归的`MGET`,并结合`FileExistsMode.REPLACE_IF_MODIFIED`模式,在本地定期同步整个远程目录树。此模式将本地文件上一次修改的时间戳设置为远程文件的时间戳,而不考虑`-P`(保留时间戳)选项。 | |Notes for when using recursion (`-R`)

The pattern is ignored and `*` is assumed.
By default, the entire remote tree is retrieved.
However, you can filter files in the tree by providing a `FileListFilter`.
You can also filter directories in the tree this way.
A `FileListFilter` can be provided by reference or by `filename-pattern` or `filename-regex` attributes.
For example, `filename-regex="(subDir|.*1.TXT)“` retrieves all files ending with `1.TXT` in the remote directory and the subdirectory `subdir`.
However, we describe an alternative available after this note.

If you filter a subdirectory, no additional traversal of that subdirectory is performed.

The `-dirs` option is not allowed (the recursive `mget` uses the recursive `ls` to obtain the directory tree and the directories themselves cannot be included in the list).

Typically, you would use the `#remoteDirectory` variable in the `local-directory-expression` 使远程目录结构在本地保留。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| 持久文件列表过滤器现在有一个布尔属性`forRecursion`。将此属性设置为`true`,还设置`alwaysAcceptDirectories`,这意味着出站网关上的递归操作(`ls`和`mget`)现在每次都将遍历完整目录树。这是为了解决未检测到目录树中深层更改的问题。此外,`forRecursion=true`会导致文件的完整路径被用作元数据存储键;这解决了一个问题,即如果同名文件在不同的目录中多次出现,则过滤器无法正常工作。重要提示:这意味着,对于顶层目录下的文件,将找不到持久性元数据存储中的现有密钥。由于这个原因,默认情况下,该属性是`false`;这可能会在将来的版本中发生变化。 从版本 5.0 开始,你可以将`SftpSimplePatternFileListFilter`和`SftpRegexPatternFileListFilter`配置为始终通过将`alwaysAcceptDirectorties`设置为`true`的目录。这样做允许对简单模式进行递归,如下例所示: ``` ``` 你可以通过使用网关上的`filter`属性来提供这些过滤器之一。 另请参见[出站网关部分成功(`mget`和`mput`)]。 #### 使用`put`命令 `put`将文件发送到远程服务器。消息的有效负载可以是`java.io.File`、`byte[]`或`String`。使用`remote-filename-generator`(或表达式)为远程文件命名。其他可用的属性包括`remote-directory`、`temporary-remote-directory`以及它们的`*-expression`等价物:`use-temporary-file-name`和`auto-create-directory`。有关更多信息,请参见[模式文档](https://github.com/spring-projects/spring-integration/tree/main/spring-integration-core/src/main/resources/org/springframework/integration/config)。 由`put`操作产生的消息有效负载是`String`,其中包含了传输后服务器上文件的完整路径。 版本 4.3 引入了`chmod`属性,该属性在上传后更改远程文件权限。你可以使用常规的 UNIX 八进制格式(例如,`600`仅允许文件所有者进行读写)。当使用 Java 配置适配器时,可以使用`setChmod(0600)`。 #### 使用`mput`命令 `mput`向服务器发送多个文件,并支持以下选项: * `-R`:递归——发送目录和子目录中的所有文件(可能经过筛选) 消息有效负载必须是表示本地目录的`java.io.File`(或`String`)。从版本 5.1 开始,还支持`File`或`String`的集合。 支持与[`put`命令](#sftp-put-command)相同的属性。此外,你还可以使用`mput-pattern`、`mput-regex`、`mput-filter`或`mput-filter-expression`中的一个来过滤本地目录中的文件。只要子目录本身通过筛选器,筛选器就可以使用递归。不通过筛选器的子目录不会被递归。 由`mput`操作产生的消息有效负载是`List`对象(即由传输产生的远程文件路径的`List`)。 另请参见[出站网关部分成功(`mget`和`mput`)]。 版本 4.3 引入了`chmod`属性,它允许你在上传后更改远程文件权限。你可以使用常规的 UNIX 八进制格式(例如,`600`仅允许文件所有者进行读写)。当使用 Java 配置适配器时,可以使用`setChmodOctal("600")`或`setChmod(0600)`。 #### 使用`rm`命令 `rm`命令没有选项。 如果删除操作成功,则得到的消息有效负载为`Boolean.TRUE`。否则,消息有效负载为`Boolean.FALSE`。`file_remoteDirectory`头保存远程目录,`file_remoteFile`头保存文件名。 #### 使用`mv`命令 `mv`命令没有选项。 `expression`属性定义了“from”路径,`rename-expression`属性定义了“to”路径。默认情况下,`rename-expression`是`headers['file_renameTo']`。这个表达式不能计算为空或空`String`。如有必要,将创建所需的任何远程目录。结果消息的有效负载是`Boolean.TRUE`。`file_remoteDirectory`报头保存原始远程目录,`file_remoteFile`报头保存文件名。`file_renameTo`头保存新路径。 从版本 5.5.6 开始,`remoteDirectoryExpression`可以在`mv`命令中使用,以方便使用。如果“from”文件不是完整的文件路径,则将`remoteDirectoryExpression`的结果用作远程目录。这同样适用于“to”文件,例如,如果任务只是重命名某个目录中的远程文件。 #### 附加命令信息 `get`和`mget`命令支持`local-filename-generator-expression`属性。它定义了一个 SPEL 表达式,在传输过程中生成本地文件的名称。求值上下文的根对象是请求消息。`remoteFileName`变量也是可用的。它对于`mget`特别有用(例如:`local-filename-generator-expression="#remoteFileName.toUpperCase() + headers.foo"`)。 `get`和`mget`命令支持`local-directory-expression`属性。它定义了一个 SPEL 表达式,在传输过程中生成本地目录的名称。求值上下文的根对象是请求消息。`remoteDirectory`变量也是可用的。它对 MGET 特别有用(例如:`local-directory-expression="'/tmp/local/' + #remoteDirectory.toUpperCase() + headers.myheader"`)。这个属性与`local-directory`属性是互斥的。 对于所有命令,网关的“表达式”属性保存命令在其上执行的路径。对于`mget`命令,表达式可以计算为``**,这意味着检索所有文件,`somedirectory/`**,以及以`*`结尾的其他值。 下面的示例显示了为`ls`命令配置的网关: ``` ``` 发送到`toSplitter`通道的消息的有效负载是`String`对象的列表,每个对象都包含一个文件的名称。如果省略`command-options="-1"`,则有效负载将是`FileInfo`对象的列表。你可以以空格分隔的列表的形式提供选项(例如,`command-options="-1 -dirs -links"`)。 从版本 4.2 开始,`GET`、`MGET`、`PUT`和`MPUT`命令支持`FileExistsMode`属性(使用名称空间支持时`mode`)。这会影响当本地文件存在(`GET`和`MGET`)或远程文件存在(`PUT`和`MPUT`)时的行为。支持的模式有`REPLACE`、`APPEND`、`FAIL`和`IGNORE`。对于向后兼容性,`PUT`和`MPUT`操作的默认模式是`REPLACE`。对于`GET`和`MGET`操作,缺省值是`FAIL`。 #### 使用 Java 配置进行配置 Spring 以下引导应用程序展示了如何使用 Java 配置出站网关的示例: ``` @SpringBootApplication public class SftpJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(SftpJavaApplication.class) .web(false) .run(args); } @Bean @ServiceActivator(inputChannel = "sftpChannel") public MessageHandler handler() { return new SftpOutboundGateway(ftpSessionFactory(), "ls", "'my_remote_dir/'"); } } ``` #### 使用 Java DSL 进行配置 Spring 以下引导应用程序展示了如何使用 Java DSL 配置出站网关的示例: ``` @SpringBootApplication public class SftpJavaApplication { public static void main(String[] args) { new SpringApplicationBuilder(SftpJavaApplication.class) .web(false) .run(args); } @Bean public SessionFactory sftpSessionFactory() { DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory(); sf.setHost("localhost"); sf.setPort(port); sf.setUsername("foo"); sf.setPassword("foo"); factory.setTestSession(true); return new CachingSessionFactory(sf); } @Bean public QueueChannelSpec remoteFileOutputChannel() { return MessageChannels.queue(); } @Bean public IntegrationFlow sftpMGetFlow() { return IntegrationFlows.from("sftpMgetInputChannel") .handle(Sftp.outboundGateway(sftpSessionFactory(), AbstractRemoteFileOutboundGateway.Command.MGET, "payload") .options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE) .regexFileNameFilter("(subSftpSource|.*1.txt)") .localDirectoryExpression("'myDir/' + #remoteDirectory") .localFilenameExpression("#remoteFileName.replaceFirst('sftpSource', 'localTarget')")) .channel("remoteFileOutputChannel") .get(); } } ``` #### 出站网关部分成功(`mget`和`mput`) 在对多个文件执行操作时(通过使用`mget`和`mput`),在传输一个或多个文件后的一段时间内可能会发生异常。在这种情况下(从版本 4.2 开始),将抛出一个`PartialSuccessException`。除了通常的`MessagingException`属性(`failedMessage`和`cause`)外,此异常还具有两个附加属性: * `partialResults`:成功的转移结果。 * `derivedInput`:从请求消息中生成的文件列表(例如要传输的本地文件为`mput`)。 这些属性允许你确定哪些文件已成功传输,哪些文件未成功传输。 在递归`mput`的情况下,`PartialSuccessException`可能嵌套了`PartialSuccessException`实例。 考虑以下目录结构: ``` root/ |- file1.txt |- subdir/ | - file2.txt | - file3.txt |- zoo.txt ``` 如果异常发生在`file3.txt`上,则网关抛出的`PartialSuccessException`具有`derivedInput`of`file1.txt`,`subdir`,以及`zoo.txt`和`partialResults`of`file1.txt`。它的`cause`是另一个`PartialSuccessException`与`derivedInput`之`file2.txt`和`file3.txt`之`partialResults`之`file2.txt`之`(`SftpOutboundGateway`)的`MessageSessionCallback`实现来执行带有`requestMessage`上下文的`Session`上的任何操作。你可以将它用于任何非标准或低级别的 SFTP 操作(或多个),例如允许从集成流定义或功能接口实现注入进行访问。下面的示例使用 lambda: ``` @Bean @ServiceActivator(inputChannel = "sftpChannel") public MessageHandler sftpOutboundGateway(SessionFactory sessionFactory) { return new SftpOutboundGateway(sessionFactory, (session, requestMessage) -> session.list(requestMessage.getPayload())); } ``` 另一个示例可能是对要发送或检索的文件数据进行预处理或后处理。 在使用 XML 配置时,``提供了一个`session-callback`属性,该属性允许你指定`MessageSessionCallback` Bean 名称。 | |`session-callback`与`command`和`expression`属性是互斥的。
当使用 Java 进行配置时,`SftpOutboundGateway`类提供了不同的构造函数。| |---|-------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------| ### Apache Mina SFTP 服务器事件 在版本 5.2 中添加的`ApacheMinaSftpEventListener`监听某些 Apache Mina SFTP 服务器事件,并将其发布为`ApplicationEvent`s,该事件可以由任何`ApplicationListener` Bean、`@EventListener` Bean 方法或[事件入站通道适配器](./event.html#appevent-inbound)方法接收。 目前支持的活动有: * `SessionOpenedEvent`-打开了一个客户端会话 * `DirectoryCreatedEvent`-创建了一个目录 * `FileWrittenEvent`-一个文件被写入到 * `PathMovedEvent`-重命名了一个文件或目录 * `PathRemovedEvent`-删除了一个文件或目录 * `SessionClosedEvent`-客户端已断开连接 每一个都是`ApacheMinaSftpEvent`的子类;你可以配置一个侦听器来接收所有的事件类型。每个事件的`source`属性是一个`ServerSession`,你可以从它获得诸如客户机地址之类的信息;在抽象事件上提供了一个方便的`getSession()`方法。 要用侦听器(必须是 Spring Bean)配置服务器,只需将其添加到`SftpSubsystemFactory`中: ``` server = SshServer.setUpDefaultServer(); ... SftpSubsystemFactory sftpFactory = new SftpSubsystemFactory(); sftpFactory.addSftpEventListener(apacheMinaSftpEventListenerBean); ... ``` 要使用 Spring 集成事件适配器来使用这些事件: ``` @Bean public ApplicationEventListeningMessageProducer eventsAdapter() { ApplicationEventListeningMessageProducer producer = new ApplicationEventListeningMessageProducer(); producer.setEventTypes(ApacheMinaSftpEvent.class); producer.setOutputChannel(eventChannel()); return producer; } ``` ### 远程文件信息 从版本 5.2 开始,`SftpStreamingMessageSource`([SFTP 流入站通道适配器](#sftp-streaming))、`SftpInboundFileSynchronizingMessageSource`([SFTP 入站通道适配器](#sftp-inbound))和“read”---命令的`SftpOutboundGateway`([SFTP 出站网关](#sftp-outbound-gateway))在消息中提供额外的头,以生成有关远程文件的信息: * `FileHeaders.REMOTE_HOST_PORT`-在文件传输操作期间,远程会话已连接到的 host:port pair; * `FileHeaders.REMOTE_DIRECTORY`-已执行操作的远程目录; * `FileHeaders.REMOTE_FILE`-远程文件名;仅适用于单个文件操作。 由于`SftpInboundFileSynchronizingMessageSource`不会针对远程文件生成消息,而是使用本地副本,因此`AbstractInboundFileSynchronizer`在同步操作期间以 URI 样式(`protocol://host:port/remoteDirectory#remoteFileName`)在`MetadataStore`(可以在外部配置)中存储有关远程文件的信息。当对本地文件进行轮询时,将通过`SftpInboundFileSynchronizingMessageSource`检索此元数据。当本地文件被删除时,建议删除其元数据条目。`AbstractInboundFileSynchronizer`为此提供了一个`removeRemoteFileMetadata()`回调。此外还有一个`setMetadataStorePrefix()`要在元数据中使用的键。建议将该前缀与基于`MetadataStore`的`FileListFilter`实现中使用的前缀不同,当相同的`MetadataStore`实例在这些组件之间共享时,为了避免条目重写,因为 filter 和`AbstractInboundFileSynchronizer`都对元数据项使用相同的本地文件名。