0
点赞
收藏
分享

微信扫一扫

Spring 集成支持通过 SFTP 进行文件传输操作

Spring 集成支持通过 SFTP 进行文件传输操作_通道适配器

Spring 集成支持通过 SFTP 进行文件传输操作。

安全文件传输协议 (SFTP) 是一种网络协议,可让您通过任何可靠的流在 Internet 上的两台计算机之间传输文件。

SFTP 协议需要一个安全通道(如 SSH),并且在整个 SFTP 会话中需要对客户端身份的可见性。

Spring 集成通过提供三个客户端端点来支持通过 SFTP 发送和接收文件:入站通道适配器、出站通道适配器和出站网关。 它还提供了方便的命名空间配置来定义这些客户端组件。

从6.0版本开始,过时的JCraft JSch客户端已被现代Apache MINA SSHD​框架所取代。 这导致框架组件中出现许多重大更改。 但是,在大多数情况下,这样的迁移隐藏在Spring Integration API后面。 最剧烈的变化发生在 现在基于 并公开一些 if 其配置属性。​​DefaultSftpSessionFactory​​​​org.apache.sshd.client.SshClient​

您需要将此依赖项包含在项目中:

<dependency>
<groupId>org.springframework.integration</groupId>
<artifactId>spring-integration-sftp</artifactId>
<version>6.0.0</version>
</dependency>

要在 xml 配置中包含 SFTP 命名空间,请在根元素上包含以下属性:

xmlns:int-sftp="http://www.springframework.org/schema/integration/sftp"
xsi:schemaLocation="http://www.springframework.org/schema/integration/sftp
https://www.springframework.org/schema/integration/sftp/spring-integration-sftp.xsd"

SFTP 会话工厂

从版本 3.0 开始,默认情况下不再缓存会话。 请参阅 SFTP 会话缓存。

在配置 SFTP 适配器之前,必须配置 SFTP 会话工厂。 您可以使用常规 Bean 定义配置 SFTP 会话工厂,如以下示例所示:

<beans:bean id="sftpSessionFactory"
class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<beans:property name="host" value="localhost"/>
<beans:property name="privateKey" value="classpath:META-INF/keys/sftpTest"/>
<beans:property name="privateKeyPassphrase" value="springIntegration"/>
<beans:property name="port" value="22"/>
<beans:property name="user" value="kermit"/>
</beans:bean>

每次适配器从其 请求会话对象时,都会创建一个新的 SFTP 会话。 在幕后,SFTP 会话工厂依靠 Apache MINA SSHD 库来提供 SFTP 功能。​​SessionFactory​

但是,Spring 集成还支持 SFTP 会话的缓存。 有关详细信息,请参阅 SFTP 会话缓存。


支持通过连接到服务器的多个通道(操作)。 默认情况下,Spring 集成会话工厂为每个通道使用单独的物理连接。 从 Spring Integration 3.0 开始,您可以将会话工厂(使用布尔构造函数 arg - 默认)配置为使用与服务器的单个连接,并在该单个连接上创建多个实例。​​SshClient​​​​false​​​​SftpClient​



使用此功能时,必须将会话工厂包装在缓存会话工厂中,如下所述,以便在操作完成时不会物理关闭连接。



如果重置缓存,则仅当最后一个通道关闭时,会话才会断开连接。



如果在新操作获取会话时发现连接已断开连接,则会刷新连接。


现在,您需要做的就是将此 SFTP 会话工厂注入适配器。

为 SFTP 会话工厂提供值的一种更实用的方法是使用 Spring 的属性占位符支持。

配置属性

下面的列表描述了 DefaultSftpSessionFactory 公开的所有属性。

​isSharedSession​​(构造函数参数)::当 ,单个用于所有请求的实例。 默认为 .​​true​​​​SftpClient​​​​SftpSession​​​​false​

​sftpVersionSelector​​::用于 SFTP 协议选择的实例。 默认为 。​​SftpVersionSelector​​​​SftpVersionSelector.CURRENT​

​host​​::要连接的主机的 URL。 必填。

​hostConfig​​::作为用户/主机/端口选项替代的实例。 可以使用代理跳转属性进行配置。​​org.apache.sshd.client.config.hosts.HostConfigEntry​

​port​​::应通过其建立 SFTP 连接的端口。 如果未指定,则此值默认为 。 如果指定,此属性必须是正数。​​22​

​user​​::要使用的远程用户。 必填。

​knownHostsResource​​::用于主机密钥存储库的。 资源的内容必须与 OpenSSH 文件的格式相同,并且是必需的,如果为 false,则必须预先填充。​​org.springframework.core.io.Resource​​​​known_hosts​​​​allowUnknownKeys​

​password​​::用于对远程主机进行身份验证的密码。 如果未提供密码,则该属性是必需的。​​privateKey​

​privateKey​​::一个,表示用于对远程主机进行身份验证的私钥的位置。 如果未提供 ,则属性是必需的。​​org.springframework.core.io.Resource​​​​privateKey​​​​password​

​privateKeyPassphrase​​::私钥的密码。 如果设置 ,则不允许 。 密码是从该对象获取的。 自选。​​userInfo​​​​privateKeyPassphrase​

​timeout​​::超时属性用作套接字超时参数,以及默认连接超时。 默认为 ,这意味着不会发生超时。​​0​

​allowUnknownKeys​​::设置为 以允许连接到具有未知(或更改)密钥的主机。 其默认值为“假”。 如果为 ,则需要预填充的文件。​​true​​​​false​​​​knownHosts​

​userInteraction​​::身份验证期间要使用的自定义项。​​org.apache.sshd.client.auth.keyboard.UserInteraction​

委派会话工厂

版本 4.2 引入了 ,它允许在运行时选择实际的会话工厂。 在调用 SFTP 端点之前,可以调用工厂将密钥与当前线程关联。 然后使用该键查找要使用的实际会话工厂。 您可以通过在使用后调用来清除密钥。​​DelegatingSessionFactory​​​​setThreadKey()​​​​clearThreadKey()​

我们添加了方便的方法,以便您可以更轻松地从消息流执行此操作,如以下示例所示:

<bean id="dsf" class="org.springframework.integration.file.remote.session.DelegatingSessionFactory">
<constructor-arg>
<bean class="o.s.i.file.remote.session.DefaultSessionFactoryLocator">
<!-- delegate factories here -->
</bean>
</constructor-arg>
</bean>

<int:service-activator input-channel="in" output-channel="c1"
expression="@dsf.setThreadKey(#root, headers['factoryToUse'])" />

<int-sftp:outbound-gateway request-channel="c1" reply-channel="c2" ... />

<int:service-activator input-channel="c2" output-channel="out"
expression="@dsf.clearThreadKey(#root)" />

使用会话缓存(请参阅 SFTP 会话缓存​)时,应缓存每个委托。 您无法缓存本身。​​DelegatingSessionFactory​

从版本 5.0.7 开始,可以与 a 结合使用以轮询多个服务器;请参阅入站通道适配器:轮询多个服务器和目录。​​DelegatingSessionFactory​​​​RotatingServerAdvice​

SFTP 会话缓存

从 Spring 集成版本 3.0 开始,默认情况下不再缓存会话。 端点不再支持该属性。 如果要缓存会话,则必须使用 (请参阅下一个示例)。​​cache-sessions​​​​CachingSessionFactory​

在 3.0 之前的版本中,会话默认自动缓存。 属性可用于禁用自动缓存,但该解决方案不提供配置其他会话缓存属性的方法。 例如,您无法限制创建的会话数。 为了支持该要求和其他配置选项,我们添加了 . 它提供和属性。 顾名思义,该属性控制工厂在其缓存中维护的活动会话数(默认值为无限制)。 如果已达到阈值,则任何获取另一个会话的尝试都将阻塞,直到其中一个缓存会话可用或会话的等待时间到期(默认等待时间为 )。 该属性启用等待时间的配置。​​cache-sessions​​​​CachingSessionFactory​​​​sessionCacheSize​​​​sessionWaitTimeout​​​​sessionCacheSize​​​​sessionCacheSize​​​​Integer.MAX_VALUE​​​​sessionWaitTimeout​

如果要缓存会话,请配置默认会话工厂(如前所述),然后将其包装在可以提供这些附加属性的实例中。 以下示例演示如何执行此操作:​​CachingSessionFactory​

<bean id="sftpSessionFactory"
class="org.springframework.integration.sftp.session.DefaultSftpSessionFactory">
<property name="host" value="localhost"/>
</bean>

<bean id="cachingSessionFactory"
class="org.springframework.integration.file.remote.session.CachingSessionFactory">
<constructor-arg ref="sftpSessionFactory"/>
<constructor-arg value="10"/>
<property name="sessionWaitTimeout" value="1000"/>
</bean>

前面的示例创建一个设置为 ,设置为 1 秒(1000 毫秒)。​​CachingSessionFactory​​​​sessionCacheSize​​​​10​​​​sessionWaitTimeout​

从 Spring Integration 版本 3.0 开始,提供了一种方法。 调用时,所有空闲会话将立即关闭,正在使用的会话在返回到缓存时将关闭。 使用 时,通道将关闭,共享会话仅在最后一个通道关闭时关闭。 新的会话请求会根据需要建立新会话。​​CachingConnectionFactory​​​​resetCache()​​​​isSharedSession=true​

从版本 5.1 开始,具有一个新属性。 如果为 true,则会话将通过对空路径执行命令来测试,以确保它仍处于活动状态;如果没有,它将从缓存中删除;如果缓存中没有活动会话,则会创建新会话。​​CachingSessionFactory​​​​testSession​​​​REALPATH​

用​​RemoteFileTemplate​

Spring Integration 3.0 版提供了对对象的新抽象。 该模板提供了发送、检索(作为)、删除和重命名文件的方法。 此外,我们提供了一种方法,让调用方在会话上运行多个操作。 在所有情况下,模板都会负责可靠地关闭会话。 有关更多信息,请参阅 Javadoc for RemoteFileTemplate SFTP 有一个子类:SftpRemoteFileTemplate。​​SftpSession​​​​InputStream​​​​execute​

我们在 4.1 版中添加了其他方法,包括 。 它提供了对底层的访问,从而允许访问低级API。​​getClientInstance()​​​​ChannelSftp​

版本 5.0 引入了该方法。 此方法允许在同一线程边界的范围内调用多个调用。 当您需要执行作为工作单元的多个高级操作时,这很有用。 例如,将其与命令实现一起使用,其中我们对提供的目录中的每个文件执行操作,并递归地为其子目录执行操作。 有关更多信息,请参阅 Javadoc。​​RemoteFileOperations.invoke(OperationsCallback<F, T> action)​​​​RemoteFileOperations​​​​Session​​​​RemoteFileTemplate​​​​AbstractRemoteFileOutboundGateway​​​​mput​​​​put​

SFTP 入站通道适配器

SFTP 入站通道适配器是一个特殊的侦听器,它连接到服务器并侦听远程目录事件(例如正在创建的新文件),此时它将启动文件传输。 以下示例演示如何配置 SFTP 入站通道适配器:

<int-sftp:inbound-channel-adapter id="sftpAdapterAutoCreate"
session-factory="sftpSessionFactory"
channel="requestChannel"
filename-pattern="*.txt"
remote-directory="/foo/bar"
preserve-timestamp="true"
local-directory="file:target/foo"
auto-create-local-directory="true"
local-filename-generator-expression="#this.toUpperCase() + '.a'"
scanner="myDirScanner"
local-filter="myFilter"
temporary-file-suffix=".writing"
max-fetch-size="-1"
delete-remote-files="false">
<int:poller fixed-rate="1000"/>
</int-sftp:inbound-channel-adapter>

前面的配置示例演示如何为各种属性提供值,包括以下内容:

  • ​local-directory​​:文件要传输到的位置
  • ​remote-directory​​:要从中传输文件的远程源目录
  • ​session-factory​​:对我们之前配置的 Bean 的引用

默认情况下,传输的文件与原始文件的名称相同。 如果要覆盖此行为,可以设置该属性,该属性允许您提供 SpEL 表达式来生成本地文件的名称。 与出站网关和适配器不同,其中 SpEL 评估上下文的根对象是 ,此入站适配器在评估时还没有消息,因为这是它最终以传输的文件作为其有效负载生成的消息。 因此,SpEL 评估上下文的根对象是远程文件的原始名称 (a)。​​local-filename-generator-expression​​​​Message​​​​String​

入站通道适配器首先将文件检索到本地目录,然后根据轮询器配置发出每个文件。 从版本 5.0 开始,当需要新文件检索时,您可以限制从 SFTP 服务器获取的文件数。 当目标文件很大或在具有持久文件列表筛选器的群集系统中运行时,这可能很有用,本节稍后将对此进行讨论。 用于此目的。 负值(默认值)表示没有限制,将检索所有匹配的文件。 有关详细信息,请参阅入站通道适配器:控制远程文件提取。 从版本 5.0 开始,您还可以通过设置属性来提供自定义实现。​​max-fetch-size​​​​DirectoryScanner​​​​inbound-channel-adapter​​​​scanner​

从 Spring Integration 3.0 开始,您可以指定属性(默认值为 )。 当 时,本地文件的修改时间戳设置为从服务器检索的值。 否则,它将设置为当前时间。​​preserve-timestamp​​​​false​​​​true​

从版本 4.2 开始,您可以指定代替 ,这样就可以动态确定每次轮询的目录,例如 。​​remote-directory-expression​​​​remote-directory​​​​remote-directory-expression="@myBean.determineRemoteDir()"​

有时,基于通过属性指定的简单模式进行文件筛选可能还不够。 如果是这种情况,可以使用该属性指定正则表达式(例如 )。 如果需要完全控制,可以使用该属性提供对 的自定义实现的引用,该实现是用于筛选文件列表的策略接口。 此筛选器确定检索哪些远程文件。 还可以使用.​​filename-pattern​​​​filename-regex​​​​filename-regex=".*\.test$"​​​​filter​​​​org.springframework.integration.file.filters.FileListFilter​​​​AcceptOnceFileListFilter​​​​CompositeFileListFilter​

将其状态存储在内存中。 如果希望状态在系统重新启动后继续存在,请考虑改用 。 此筛选器将接受的文件名存储在策略的实例中(请参阅元数据存储)。 此过滤器与文件名和远程修改时间匹配。​​AcceptOnceFileListFilter​​​​SftpPersistentAcceptOnceFileListFilter​​​​MetadataStore​

从版本 4.0 开始,此筛选器需要 . 当与共享数据存储一起使用(例如与 一起使用)时,这允许在多个应用程序或服务器实例之间共享筛选器键。​​ConcurrentMetadataStore​​​​Redis​​​​RedisMetadataStore​

从版本 5.0 开始,默认情况下对 应用内存中的 。 此过滤器也与 XML 配置中的 or 选项以及 Java DSL 中的 through 一起应用。 您可以使用 (或 ) 处理任何其他用例。​​SftpPersistentAcceptOnceFileListFilter​​​​SimpleMetadataStore​​​​SftpInboundFileSynchronizer​​​​regex​​​​pattern​​​​SftpInboundChannelAdapterSpec​​​​CompositeFileListFilter​​​​ChainFileListFilter​

上面的讨论是指在检索文件之前过滤文件。 检索文件后,将对文件系统上的文件应用其他筛选器。 默认情况下,这是一个“AcceptOnceFileListFilter”,如本节所述,它将状态保留在内存中,并且不考虑文件的修改时间。 除非应用程序在处理后删除文件,否则适配器会在应用程序重新启动后默认重新处理磁盘上的文件。

此外,如果将 配置为使用 并且远程文件时间戳更改(导致重新获取),则默认本地筛选器不允许处理此新文件。​​filter​​​​SftpPersistentAcceptOnceFileListFilter​

有关此筛选器及其使用方式的详细信息,请参阅远程持久文件列表筛选器。

您可以使用该属性来配置本地文件系统筛选器的行为。 从版本 4.3.8 开始,默认情况下配置 a。 此筛选器将接受的文件名和修改的时间戳存储在策略的实例中(请参阅元数据存储),并检测对本地文件修改时间的更改。 默认值是将状态存储在内存中的 。​​local-filter​​​​FileSystemPersistentAcceptOnceFileListFilter​​​​MetadataStore​​​​MetadataStore​​​​SimpleMetadataStore​

从版本 4.1.5 开始,这些筛选器具有一个名为 的新属性,这会导致它们刷新 每次更新时的元数据存储(如果存储实现)。​​flushOnUpdate​​​​Flushable​

此外,如果使用分布式(如 Redis 元数据存储​),则可以具有同一适配器或应用程序的多个实例,并确保只有一个实例处理文件。​​MetadataStore​

实际的本地过滤器是包含提供的过滤器和模式过滤器,用于阻止处理正在下载的文件(基于 )。 使用此后缀下载文件(默认值为 ),并在传输完成后将文件重命名为其最终名称,从而使它们对过滤器“可见”。​​CompositeFileListFilter​​​​temporary-file-suffix​​​​.writing​

有关这些属性的更多详细信息,请参阅架构。

SFTP 入站通道适配器是轮询使用者。 因此,必须配置轮询器(全局默认值或本地元素)。 将文件传输到本地目录后,将生成一条消息,并将其有效负载类型发送到由该属性标识的通道。​​java.io.File​​​​channel​

详细了解文件过滤和大文件

有时,刚刚出现在受监视(远程)目录中的文件不完整。 通常,此类文件以某种临时扩展名写入(例如在名为 的 上),然后在写入过程完成后重命名。 在大多数情况下,开发人员只对完整的文件感兴趣,并且只想过滤这些文件。 若要处理这些方案,可以使用 、 和 属性提供的筛选支持。 如果需要自定义筛选器实现,可以通过设置属性在适配器中包含引用。 以下示例演示如何执行此操作:​​.writing​​​​something.txt.writing​​​​filename-pattern​​​​filename-regex​​​​filter​​​​filter​

<int-sftp:inbound-channel-adapter id="sftpInbondAdapter"
channel="receiveChannel"
session-factory="sftpSessionFactory"
filter="customFilter"
local-directory="file:/local-test-dir"
remote-directory="/remote-test-dir">
<int:poller fixed-rate="1000" max-messages-per-poll="10" task-executor="executor"/>
</int-sftp:inbound-channel-adapter>

<bean id="customFilter" class="org.foo.CustomFilter"/>

从故障中恢复

您应该了解适配器的体系结构。 文件同步器提取文件,并为每个同步文件发出一条消息。 如前所述,涉及两个筛选器。 属性(和模式)引用远程 (SFTP) 文件列表,以避免提取已提取的文件。 使用 来确定哪些文件要作为消息发送。​​FileReadingMessageSource​​​​filter​​​​FileReadingMessageSource​​​​local-filter​

同步器列出远程文件并查阅其筛选器。 然后传输文件。 如果在文件传输过程中发生 IO 错误,则会删除已添加到筛选器的任何文件,以便它们有资格在下次轮询时重新提取。 这仅适用于过滤器实现(如 )。​​ReversibleFileListFilter​​​​AcceptOnceFileListFilter​

如果在同步文件后,处理文件的下游流发生错误,则不会自动回滚筛选器,因此默认情况下不会重新处理失败的文件。

如果您希望在失败后重新处理此类文件,可以使用类似于以下内容的配置来简化从筛选器中删除失败的文件:

<int-sftp:inbound-channel-adapter id="sftpAdapter"
session-factory="sftpSessionFactory"
channel="requestChannel"
remote-directory-expression="'/sftpSource'"
local-directory="file:myLocalDir"
auto-create-local-directory="true"
filename-pattern="*.txt">
<int:poller fixed-rate="1000">
<int:transactional synchronization-factory="syncFactory" />
</int:poller>
</int-sftp:inbound-channel-adapter>

<bean id="acceptOnceFilter"
class="org.springframework.integration.file.filters.AcceptOnceFileListFilter" />

<int:transaction-synchronization-factory id="syncFactory">
<int:after-rollback expression="payload.delete()" />
</int:transaction-synchronization-factory>

<bean id="transactionManager"
class="org.springframework.integration.transaction.PseudoTransactionManager" />

上述配置适用于任何 .​​ResettableFileListFilter​

从版本 5.0 开始,入站通道适配器可以根据生成的本地文件名在本地构建子目录。 这也可以是远程子路径。 为了能够根据层次结构支持递归读取本地目录以进行修改,您现在可以根据算法为内部目录提供新的目录。 有关更多信息,请参阅 AbstractInboundFileSynchronizingMessageSource.setScanner()。 此外,您现在可以将 切换到 -based by using 选项。 它还配置为所有实例对本地目录中的任何修改做出反应。 前面显示的重新处理示例基于 的内置功能,该功能在从本地目录中删除文件时使用 ()。 有关详细信息,请参阅 WatchServiceDirectoryScanner。​​FileReadingMessageSource​​​​RecursiveDirectoryScanner​​​​Files.walk()​​​​AbstractInboundFileSynchronizingMessageSource​​​​WatchService​​​​DirectoryScanner​​​​setUseWatchService()​​​​WatchEventType​​​​FileReadingMessageSource.WatchServiceDirectoryScanner​​​​ResettableFileListFilter.remove()​​​​StandardWatchEventKinds.ENTRY_DELETE​

使用 Java 配置进行配置

以下 Spring 引导应用程序显示了如何使用 Java 配置入站适配器的示例:

@SpringBootApplication
public class SftpJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}

@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<>(factory);
}

@Bean
public SftpInboundFileSynchronizer sftpInboundFileSynchronizer() {
SftpInboundFileSynchronizer fileSynchronizer = new SftpInboundFileSynchronizer(sftpSessionFactory());
fileSynchronizer.setDeleteRemoteFiles(false);
fileSynchronizer.setRemoteDirectory("foo");
fileSynchronizer.setFilter(new SftpSimplePatternFileListFilter("*.xml"));
return fileSynchronizer;
}

@Bean
@InboundChannelAdapter(channel = "sftpChannel", poller = @Poller(fixedDelay = "5000"))
public MessageSource<File> sftpMessageSource() {
SftpInboundFileSynchronizingMessageSource source =
new SftpInboundFileSynchronizingMessageSource(sftpInboundFileSynchronizer());
source.setLocalDirectory(new File("sftp-inbound"));
source.setAutoCreateLocalDirectory(true);
source.setLocalFilter(new AcceptOnceFileListFilter<File>());
source.setMaxFetchSize(1);
return source;
}

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new MessageHandler() {

@Override
public void handleMessage(Message<?> message) throws MessagingException {
System.out.println(message.getPayload());
}

};
}

}

使用 Java DSL 进行配置

以下 Spring 引导应用程序显示了如何使用 Java DSL 配置入站适配器的示例:

@SpringBootApplication
public class SftpJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}

@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlow
.from(Sftp.inboundAdapter(this.sftpSessionFactory)
.preserveTimestamp(true)
.remoteDirectory("foo")
.regexFilter(".*\\.txt$")
.localFilenameExpression("#this.toUpperCase() + '.a'")
.localDirectory(new File("sftp-inbound")),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(5000)))
.handle(m -> System.out.println(m.getPayload()))
.get();
}
}

处理不完整的数据

请参阅处理不完整的数据。

提供 用于过滤远程系统上没有相应标记文件的远程文件。 有关配置信息,请参阅 Javadoc。​​SftpSystemMarkerFilePresentFileListFilter​

SFTP 流式入站通道适配器

版本 4.3 引入了流式入站通道适配器。 此适配器生成有效负载类型的消息,使您可以在不写入本地文件系统的情况下获取文件。 由于会话保持打开状态,因此使用应用程序负责在使用文件时关闭会话。 会话在标头 () 中提供。 标准框架组件(如 和 )会自动关闭会话。 有关这些组件的详细信息,请参阅文件拆分器和流转换器。 以下示例演示如何配置 SFTP 流式入站通道适配器:​​InputStream​​​​closeableResource​​​​IntegrationMessageHeaderAccessor.CLOSEABLE_RESOURCE​​​​FileSplitter​​​​StreamTransformer​

<int-sftp:inbound-streaming-channel-adapter id="ftpInbound"
channel="ftpChannel"
session-factory="sessionFactory"
filename-pattern="*.txt"
filename-regex=".*\.txt"
filter="filter"
filter-expression="@myFilterBean.check(#root)"
remote-file-separator="/"
comparator="comparator"
max-fetch-size="1"
remote-directory-expression="'foo/bar'">
<int:poller fixed-rate="1000" />
</int-sftp:inbound-streaming-channel-adapter>

只能使用 、、 或 中的一种。​​filename-pattern​​​​filename-regex​​​​filter​​​​filter-expression​

从版本 5.0 开始,默认情况下,适配器通过使用 基于内存中的 来防止远程文件的重复。 默认情况下,此过滤器也与文件名模式(或正则表达式)一起应用。 如果需要允许重复项,可以使用 . 您可以使用 (或 ) 处理任何其他用例。 后面显示​的 Java 配置显示了一种在处理后删除远程文件以避免重复项的技术。​​SftpStreamingMessageSource​​​​SftpPersistentAcceptOnceFileListFilter​​​​SimpleMetadataStore​​​​AcceptAllFileListFilter​​​​CompositeFileListFilter​​​​ChainFileListFilter​

有关 及其使用方式的详细信息,请参阅远程持久文件列表筛选器。SftpPersistentAcceptOnceFileListFilter

当需要提取时,您可以使用该属性来限制每次轮询中获取的文件数。 将其设置为在群集环境中运行时使用持久性筛选器。 有关详细信息,请参阅入站通道适配器:控制远程文件提取。max-fetch-size1

适配器将远程目录和文件名放在标头中(分别是 和 )。 从版本 5.0 开始,标头提供其他远程文件信息(以 JSON 格式)。 如果在 上设置属性 to ,则标头包含一个对象。 可以使用该方法访问基础提供的对象。 使用 XML 配置时,该属性不可用,但可以通过将 注入到其中一个配置类中进行设置。 另请参阅远程文件信息。​​FileHeaders.REMOTE_DIRECTORY​​​​FileHeaders.REMOTE_FILE​​​​FileHeaders.REMOTE_FILE_INFO​​​​fileInfoJson​​​​SftpStreamingMessageSource​​​​false​​​​SftpFileInfo​​​​SftpClient.DirEntry​​​​SftpClient​​​​SftpFileInfo.getFileInfo()​​​​fileInfoJson​​​​SftpStreamingMessageSource​

使用 Java 配置进行配置

以下 Spring 引导应用程序显示了如何使用 Java 配置入站适配器的示例:

@SpringBootApplication
public class SftpJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}

@Bean
@InboundChannelAdapter(channel = "stream")
public MessageSource<InputStream> ftpMessageSource() {
SftpStreamingMessageSource messageSource = new SftpStreamingMessageSource(template());
messageSource.setRemoteDirectory("sftpSource/");
messageSource.setFilter(new AcceptAllFileListFilter<>());
messageSource.setMaxFetchSize(1);
return messageSource;
}

@Bean
@Transformer(inputChannel = "stream", outputChannel = "data")
public org.springframework.integration.transformer.Transformer transformer() {
return new StreamTransformer("UTF-8");
}

@Bean
public SftpRemoteFileTemplate template() {
return new SftpRemoteFileTemplate(sftpSessionFactory());
}

@ServiceActivator(inputChannel = "data", adviceChain = "after")
@Bean
public MessageHandler handle() {
return System.out::println;
}

@Bean
public ExpressionEvaluatingRequestHandlerAdvice after() {
ExpressionEvaluatingRequestHandlerAdvice advice = new ExpressionEvaluatingRequestHandlerAdvice();
advice.setOnSuccessExpression(
"@template.remove(headers['file_remoteDirectory'] + headers['file_remoteFile'])");
advice.setPropagateEvaluationFailures(true);
return advice;
}

}

请注意,在此示例中,转换器下游的消息处理程序具有在处理后删除远程文件的消息处理程序。​​advice​

入站通道适配器:轮询多个服务器和目录

从版本 5.0.7 开始,可用;当配置为轮询器建议时,入站适配器可以轮询多个服务器和目录。 配置建议并将其正常添加到轮询器的建议链中。 A 用于选择服务器,有关详细信息,请参阅​​委派会话工厂​​。 建议配置由对象列表组成。​​RotatingServerAdvice​​​​DelegatingSessionFactory​​​​RotationPolicy.KeyDirectory​

@Bean
public RotatingServerAdvice advice() {
List<RotationPolicy.KeyDirectory> keyDirectories = new ArrayList<>();
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "foo"));
keyDirectories.add(new RotationPolicy.KeyDirectory("one", "bar"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "baz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("two", "qux"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "fiz"));
keyDirectories.add(new RotationPolicy.KeyDirectory("three", "buz"));
return new RotatingServerAdvice(delegatingSf(), keyDirectories);
}

此建议将轮询服务器上的目录,直到不存在新文件,然后移动到目录,然后移动到服务器上的目录,等等。​​foo​​​​one​​​​bar​​​​baz​​​​two​

可以使用构造函数 arg 修改此默认行为:​​fair​

公平

@Bean
public RotatingServerAdvice advice() {
...
return new RotatingServerAdvice(delegatingSf(), keyDirectories, true);
}

在这种情况下,无论上一个轮询是否返回文件,建议都将移动到下一个服务器/目录。

或者,您可以根据需要提供自己的消息源以重新配置消息源:​​RotationPolicy​

政策

public interface RotationPolicy {

void beforeReceive(MessageSource<?> source);

void afterReceive(boolean messageReceived, MessageSource<?> source);

}

习惯

@Bean
public RotatingServerAdvice advice() {
return new RotatingServerAdvice(myRotationPolicy());
}

属性(在同步器上)现在可以包含变量。 这允许将从不同目录检索的文件下载到本地的类似目录:​​local-filename-generator-expression​​​​localFilenameGeneratorExpression​​​​#remoteDirectory​

@Bean
public IntegrationFlow flow() {
return IntegrationFlow.from(Sftp.inboundAdapter(sf())
.filter(new SftpPersistentAcceptOnceFileListFilter(new SimpleMetadataStore(), "rotate"))
.localDirectory(new File(tmpDir))
.localFilenameExpression("#remoteDirectory + T(java.io.File).separator + #root")
.remoteDirectory("."),
e -> e.poller(Pollers.fixedDelay(1).advice(advice())))
.channel(MessageChannels.queue("files"))
.get();
}

使用此建议时,请勿在轮询器上配置 a;有关详细信息,请参阅消息源的条件轮询器​。​​TaskExecutor​

入站通道适配器:控制远程文件提取

配置入站通道适配器时,应考虑两个属性。,与所有轮询器一样,可用于限制每次轮询时发出的消息数(如果已准备好的数值超过配置的值)。 (从版本 5.0 开始)可以限制一次从远程服务器检索的文件数。​​max-messages-per-poll​​​​max-fetch-size​

以下方案假定起始状态为空本地目录:

  • ​max-messages-per-poll=2​​和 :适配器提取一个文件,发出它,获取下一个文件,然后发出它。 然后它沉睡到下一次投票。max-fetch-size=1
  • ​max-messages-per-poll=2​​和 ):适配器获取这两个文件,然后发出每个文件。max-fetch-size=2
  • ​max-messages-per-poll=2​​和 :适配器最多获取 4 个文件(如果可用)并发出前两个文件(如果至少有两个)。 接下来的两个文件将在下一次轮询时发出。max-fetch-size=4
  • ​max-messages-per-poll=2​​未指定:适配器获取所有远程文件并发出前两个文件(如果至少有两个)。 后续文件在后续轮询中发出(一次两个)。 当全部用完后,将再次尝试远程获取,以拾取任何新文件。max-fetch-size

部署应用程序的多个实例时,我们建议设置一个小的 ,以避免一个实例“抓取”所有文件并使其他实例匮乏。​​max-fetch-size​

的另一个用途是当您想要停止获取远程文件但继续处理已获取的文件时。 在 上设置属性(以编程方式、通过 JMX 或通过控制总线)有效地阻止适配器获取更多文件,但允许轮询器继续发出以前已提取的文件的消息。 如果轮询器在更改属性时处于活动状态,则更改将在下一次轮询时生效。​​max-fetch-size​​​​maxFetchSize​​​​MessageSource​

从版本 5.1 开始,可以为同步器提供 . 这在限制使用 获取的文件数时很有用。​​Comparator<?>​​​​maxFetchSize​

SFTP 出站通道适配器

SFTP 出站通道适配器是一种特殊的适配器,它连接到远程目录,并为它作为传入的有效负载接收的每个文件启动文件传输。 它还支持文件的多种表示形式,因此您不仅限于对象。 与 FTP 出站适配器类似,SFTP 出站通道适配器支持以下有效负载:​​MessageHandler​​​​Message​​​​File​

  • ​java.io.File​​:实际文件对象
  • ​byte[]​​:表示文件内容的字节数组
  • ​java.lang.String​​:表示文件内容的文本
  • ​java.io.InputStream​​:要传输到远程文件的数据流
  • ​org.springframework.core.io.Resource​​:用于将数据传输到远程文件的资源

以下示例演示如何配置 SFTP 出站通道适配器:

<int-sftp:outbound-channel-adapter id="sftpOutboundAdapter"
session-factory="sftpSessionFactory"
channel="inputChannel"
charset="UTF-8"
remote-file-separator="/"
remote-directory="foo/bar"
remote-filename-generator-expression="payload.getName() + '-mysuffix'"
filename-generator="fileNameGenerator"
use-temporary-filename="true"
chmod="600"
mode="REPLACE"/>

有关这些属性的更多详细信息,请参阅架构。

SpEL 和 SFTP 出站适配器

与 Spring 集成中的许多其他组件一样,在配置 SFTP 出站通道适配器时,可以通过指定两个属性来使用 Spring 表达式语言 (SpEL):和(如前所述)。 表达式计算上下文将消息作为其根对象,这允许您使用可以根据消息中的数据(从“有效负载”或“标头”)动态计算文件名或现有目录路径的表达式。 在前面的示例中,我们使用表达式值定义属性,该表达式值根据文件名的原始名称计算文件名,同时还附加后缀:“-mysuffix”。​​remote-directory-expression​​​​remote-filename-generator-expression​​​​remote-filename-generator-expression​

从版本 4.1 开始,您可以指定传输文件的时间。 默认情况下,现有文件将被覆盖。 模式由枚举定义,枚举包括以下值:​​mode​​​​FileExistsMode​

  • ​REPLACE​​(默认)
  • ​REPLACE_IF_MODIFIED​
  • ​APPEND​
  • ​APPEND_NO_FLUSH​
  • ​IGNORE​
  • ​FAIL​

使用 和 ,不会传输文件。 导致引发异常,同时以静默方式忽略传输(尽管生成了日志条目)。​​IGNORE​​​​FAIL​​​​FAIL​​​​IGNORE​​​​DEBUG​

版本 4.3 引入了该属性,您可以使用该属性在上传后更改远程文件权限。 您可以使用传统的 Unix 八进制格式(例如,仅允许文件所有者读写)。 使用 java 配置适配器时,可以使用 或 。​​chmod​​​​600​​​​setChmodOctal("600")​​​​setChmod(0600)​

避免部分写入的文件

处理文件传输时的常见问题之一是处理部分文件的可能性。 文件可能会在实际传输完成之前出现在文件系统中。

为了解决这个问题,Spring Integration SFTP适配器使用一种通用算法,其中文件以临时名称传输,然后在完全传输后重命名。

默认情况下,每个正在传输的文件都会显示在文件系统中,并带有一个附加后缀,默认情况下为 . 您可以通过设置属性来更改。​​.writing​​​​temporary-file-suffix​

但是,在某些情况下,您可能不想使用此技术(例如,如果服务器不允许重命名文件)。 对于此类情况,可以通过设置为 (默认值为 ) 来禁用此功能。 当此属性为 时,文件以其最终名称写入,并且使用应用程序需要某种其他机制来检测文件是否已完全上传,然后再访问它。​​use-temporary-file-name​​​​false​​​​true​​​​false​

使用 Java 配置进行配置

以下 Spring 引导应用程序显示了如何使用 Java 配置出站适配器的示例:

@SpringBootApplication
@IntegrationComponentScan
public class SftpJavaApplication {

public static void main(String[] args) {
ConfigurableApplicationContext context =
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
MyGateway gateway = context.getBean(MyGateway.class);
gateway.sendToSftp(new File("/foo/bar.txt"));
}

@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost("localhost");
factory.setPort(port);
factory.setUser("foo");
factory.setPassword("foo");
factory.setAllowUnknownKeys(true);
factory.setTestSession(true);
return new CachingSessionFactory<SftpClient.DirEntry>(factory);
}

@Bean
@ServiceActivator(inputChannel = "toSftpChannel")
public MessageHandler handler() {
SftpMessageHandler handler = new SftpMessageHandler(sftpSessionFactory());
handler.setRemoteDirectoryExpressionString("headers['remote-target-dir']");
handler.setFileNameGenerator(new FileNameGenerator() {

@Override
public String generateFileName(Message<?> message) {
return "handlerContent.test";
}

});
return handler;
}

@MessagingGateway
public interface MyGateway {

@Gateway(requestChannel = "toSftpChannel")
void sendToSftp(File file);

}
}

使用 Java DSL 进行配置

以下 Spring 引导应用程序显示了如何使用 Java DSL 配置出站适配器的示例:

@SpringBootApplication
public class SftpJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}

@Bean
public IntegrationFlow sftpOutboundFlow() {
return IntegrationFlow.from("toSftpChannel")
.handle(Sftp.outboundAdapter(this.sftpSessionFactory, FileExistsMode.FAIL)
.useTemporaryFileName(false)
.remoteDirectory("/foo")
).get();
}

}

SFTP 出站网关

SFTP 出站网关提供一组有限的命令,允许您与远程 SFTP 服务器进行交互:

  • ​ls​​(列出文件)
  • ​nlst​​(列出文件名)
  • ​get​​(检索文件)
  • ​mget​​(检索多个文件)
  • ​rm​​(删除文件)
  • ​mv​​(移动和重命名文件)
  • ​put​​(发送文件)
  • ​mput​​(发送多个文件)

使用命令​​ls​

​ls​​列出远程文件并支持以下选项:

  • ​-1​​:检索文件名列表。 默认设置是检索对象列表FileInfo
  • ​-a​​:包括所有文件(包括以“.”开头的文件)
  • ​-f​​:不对列表进行排序
  • ​-dirs​​:包含目录(默认排除)
  • ​-links​​:包含符号链接(默认排除)
  • ​-R​​:递归列出远程目录

此外,文件名筛选的提供方式与 .​​inbound-channel-adapter​

操作产生的消息负载是文件名列表或对象列表(取决于是否使用开关)。 这些对象提供修改时间、权限等信息。​​ls​​​​FileInfo​​​​-1​

命令操作的远程目录在标头中提供。​​ls​​​​file_remoteDirectory​

使用递归选项 () 时,包括任何子目录元素,并表示文件的相对路径(相对于远程目录)。 如果使用该选项,则每个递归目录也会作为列表中的元素返回。 在这种情况下,我们建议您不要使用该选项,因为您将无法区分文件和目录,而使用对象时可以这样做。​​-R​​​​fileName​​​​-dirs​​​​-1​​​​FileInfo​

使用命令​​nlst​

版本 5 引入了对该命令的支持。​​nlst​

​nlst​​列出远程文件名,仅支持一个选项:

  • ​-f​​:不对列表进行排序

操作生成的消息负载是文件名列表。​​nlst​

标头包含命令在其上执行操作的远程目录。​​file_remoteDirectory​​​​nlst​

SFTP 协议不提供列出名称的功能。 此命令等效于带有选项的命令,为方便起见,在此处添加。​​ls​​​​-1​

使用命令​​get​

​get​​检索远程文件并支持以下选项:

  • ​-P​​:保留远程文件的时间戳。
  • ​-stream​​:以流形式检索远程文件。
  • ​-D​​:传输成功后删除远程文件。 如果忽略传输,则不会删除远程文件,因为 和本地文件已存在。FileExistsModeIGNORE

标头保存远程目录,标头保存文件名。​​file_remoteDirectory​​​​file_remoteFile​

操作生成的消息负载是表示检索到的文件的对象。 如果使用该选项,则有效负载是 . 对于文本文件,常见的用例是将此操作与文件拆分器或流转换器结合使用。 将远程文件作为流使用时,您负责在使用流后关闭 。 为方便起见,在标题中提供了 ,并提供方便的方法:​​get​​​​File​​​​-stream​​​​InputStream​​​​File​​​​Session​​​​Session​​​​closeableResource​​​​IntegrationMessageHeaderAccessor​

Closeable closeable = new IntegrationMessageHeaderAccessor(message).getCloseableResource();
if (closeable != null) {
closeable.close();
}

框架组件(如文件拆分器和流转换器)在传输数据后自动关闭会话。

以下示例演示如何将文件用作流:

<int-sftp:outbound-gateway session-factory="ftpSessionFactory"
request-channel="inboundGetStream"
command="get"
command-options="-stream"
expression="payload"
remote-directory="ftpTarget"
reply-channel="stream" />

<int-file:splitter input-channel="stream" output-channel="lines" />

如果在自定义组件中使用输入流,则必须关闭 . 您可以在自定义代码中执行此操作,也可以将消息的副本路由到 并使用 SpEL,如以下示例所示:​​Session​​​​service-activator​

<int:service-activator input-channel="closeSession"
expression="headers['closeableResource'].close()" />

使用命令​​mget​

​mget​​基于模式检索多个远程文件,并支持以下选项:

  • ​-P​​:保留远程文件的时间戳。
  • ​-R​​:以递归方式检索整个目录树。
  • ​-x​​:如果没有与模式匹配的文件,则引发异常(否则,返回空列表)。
  • ​-D​​:成功传输后删除每个远程文件。 如果忽略传输,则不会删除远程文件,因为 is 和本地文件已存在。FileExistsModeIGNORE

由操作生成的消息负载是一个对象(即,一个对象,每个对象表示一个检索到的文件)。​​mget​​​​List<File>​​​​List​​​​File​

从版本 5.0 开始,如果 is ,则输出消息的有效负载不再包含由于文件已存在而未获取的文件。 以前,数组包含所有文件,包括已存在的文件。​​FileExistsMode​​​​IGNORE​

您使用的表达式确定远程路径应生成一个结果,该结果以 例如在 下获取完整的树结尾。​​*​​​​myfiles/*​​​​myfiles​

从版本 5.0 开始,您可以使用递归 ,结合模式,在本地定期同步整个远程目录树。 此模式将本地文件的上次修改时间戳设置为远程文件的时间戳,而不考虑(保留时间戳)选项。​​MGET​​​​FileExistsMode.REPLACE_IF_MODIFIED​​​​-P​

使用递归 (​​-R​​)


该模式将被忽略并被假定。 默认情况下,将检索整个远程树。 但是,您可以通过提供 . 您也可以通过这种方式过滤树中的目录。 A 可以通过引用或 OR 属性提供。 例如, 检索远程目录和子目录中以 结尾的所有文件。 但是,我们在此说明之后描述了一种可用的替代方案。​​*​​​​FileListFilter​​​​FileListFilter​​​​filename-pattern​​​​filename-regex​​​​filename-regex="(subDir|.*1.txt)"​​​​1.txt​​​​subDir​



如果筛选子目录,则不会执行该子目录的其他遍历。



不允许使用此选项(递归使用递归获取目录树,并且目录本身不能包含在列表中)。​​-dirs​​​​mget​​​​ls​



通常,您将在 中使用该变量,以便在本地保留远程目录结构。​​#remoteDirectory​​​​local-directory-expression​


持久文件列表筛选器现在具有布尔属性。 将此属性设置为 也会设置 ,这意味着出站网关 ( 和 ) 上的递归操作现在每次将始终遍历整个目录树。 这是为了解决未检测到目录树深处的更改的问题。 此外,导致将文件的完整路径用作元数据存储键;这解决了如果具有相同名称的文件在不同目录中多次出现,过滤器无法正常工作的问题。 重要说明:这意味着对于顶级目录下的文件,将找不到持久元数据存储中的现有密钥。 因此,默认情况下,该属性是;这可能会在将来的版本中更改。​​forRecursion​​​​true​​​​alwaysAcceptDirectories​​​​ls​​​​mget​​​​forRecursion=true​​​​false​

从版本 5.0 开始,您可以通过将 设置为 来配置 和 以始终传递目录。 这样做允许对简单模式进行递归,如以下示例所示:​​SftpSimplePatternFileListFilter​​​​SftpRegexPatternFileListFilter​​​​alwaysAcceptDirectorties​​​​true​

<bean id="starDotTxtFilter"
class="org.springframework.integration.sftp.filters.SftpSimplePatternFileListFilter">
<constructor-arg value="*.txt" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>

<bean id="dotStarDotTxtFilter"
class="org.springframework.integration.sftp.filters.SftpRegexPatternFileListFilter">
<constructor-arg value="^.*\.txt$" />
<property name="alwaysAcceptDirectories" value="true" />
</bean>

可以使用网关上的属性提供这些筛选器之一。​​filter​

另请参阅出站网关部分成功(mget 和 mput)。

使用命令put

put将文件发送到远程服务器。 消息的有效负载可以是 、 或 。 (或表达式)用于命名远程文件。 其他可用属性包括 及其等效项:和 。 有关详细信息,请参阅架构文档。​​java.io.File​​​​byte[]​​​​String​​​​remote-filename-generator​​​​remote-directory​​​​temporary-remote-directory​​​​*-expression​​​​use-temporary-file-name​​​​auto-create-directory​

操作产生的消息有效负载是包含传输后服务器上文件的完整路径。​​put​​​​String​

版本 4.3 引入了该属性,该属性在上传后更改远程文件权限。 您可以使用传统的 Unix 八进制格式(例如,仅允许文件所有者读写)。 使用 java 配置适配器时,可以使用 。​​chmod​​​​600​​​​setChmod(0600)​

使用命令​​mput​

​mput​​将多个文件发送到服务器并支持以下选项:

  • ​-R​​:递归 — 发送目录和子目录中的所有文件(可能已过滤)

消息负载必须是表示本地目录的(或)。 从版本 5.1 开始,还支持 or 的集合。​​java.io.File​​​​String​​​​File​​​​String​

支持与 put 命令相同的属性。 此外,还可以使用 、、 或 之一过滤本地目录中的文件。 过滤器与递归一起工作,只要子目录本身通过过滤器。 未通过筛选器的子目录不会递归。​​mput-pattern​​​​mput-regex​​​​mput-filter​​​​mput-filter-expression​

操作生成的消息有效负载是一个对象(即传输产生的远程文件路径)。​​mput​​​​List<String>​​​​List​

另请参阅出站网关部分成功(mget 和 mput)。

版本 4.3 引入了该属性,该属性允许您在上传后更改远程文件权限。 您可以使用传统的 Unix 八进制格式(例如,仅允许文件所有者读写)。 使用 Java 配置适配器时,可以使用 或 。​​chmod​​​​600​​​​setChmodOctal("600")​​​​setChmod(0600)​

使用命令​​rm​

该命令没有选项。​​rm​

如果删除操作成功,则生成的消息负载为 。 否则,消息负载为 。 标头保存远程目录,标头保存文件名。​​Boolean.TRUE​​​​Boolean.FALSE​​​​file_remoteDirectory​​​​file_remoteFile​

使用命令​​mv​

该命令没有选项。​​mv​

该属性定义“发件人”路径,该属性定义“to”路径。 默认情况下,为 . 此表达式的计算结果不得为 null 或空 。 如有必要,将创建所需的任何远程目录。 结果消息的有效负载为 。 标头保存原始远程目录,标头保存文件名。 标头包含新路径。​​expression​​​​rename-expression​​​​rename-expression​​​​headers['file_renameTo']​​​​String​​​​Boolean.TRUE​​​​file_remoteDirectory​​​​file_remoteFile​​​​file_renameTo​

从版本 5.5.6 开始,为方便起见,可以在命令中使用 。 如果“from”文件不是完整的文件路径,则 的结果将用作远程目录。 这同样适用于“to”文件,例如,如果任务只是重命名某个目录中的远程文件。​​remoteDirectoryExpression​​​​mv​​​​remoteDirectoryExpression​

其他命令信息

和命令支持该属性。 它定义了一个 SpEL 表达式,用于在传输过程中生成本地文件的名称。 评估上下文的根对象是请求消息。 该变量也可用。 它对于(例如:)特别有用。​​get​​​​mget​​​​local-filename-generator-expression​​​​remoteFileName​​​​mget​​​​local-filename-generator-expression="#remoteFileName.toUpperCase() + headers.foo"​

和命令支持该属性。 它定义了一个 SpEL 表达式,用于在传输过程中生成本地目录的名称。 评估上下文的根对象是请求消息。 该变量也可用。 它对 mget 特别有用(例如:)。 此属性与该属性互斥。​​get​​​​mget​​​​local-directory-expression​​​​remoteDirectory​​​​local-directory-expression="'/tmp/local/' + #remoteDirectory.toUpperCase() + headers.myheader"​​​​local-directory​

对于所有命令,网关的“expression”属性保存命令作用的路径。 对于该命令,表达式的计算结果可能为 ,表示检索所有文件、 和其他以 结尾的值。​​mget​​​​*​​​​somedirectory/*​​​​*​

以下示例显示了为命令配置的网关:​​ls​

<int-ftp:outbound-gateway id="gateway1"
session-factory="ftpSessionFactory"
request-channel="inbound1"
command="ls"
command-options="-1"
expression="payload"
reply-channel="toSplitter"/>

发送到通道的消息的有效负载是一个对象列表,每个对象都包含一个文件的名称。 如果省略 ,则有效负载将是对象列表。 可以将选项作为空格分隔的列表提供(例如,)。​​toSplitter​​​​String​​​​command-options="-1"​​​​FileInfo​​​​command-options="-1 -dirs -links"​

从版本 4.2 开始,、、 和命令支持属性(使用命名空间支持时)。 这会影响本地文件存在(和)或远程文件存在(和)时的行为。 支持的模式包括 、 、 和 。 为了向后兼容,和操作的默认模式为 。 对于 和 操作,缺省值为 。​​GET​​​​MGET​​​​PUT​​​​MPUT​​​​FileExistsMode​​​​mode​​​​GET​​​​MGET​​​​PUT​​​​MPUT​​​​REPLACE​​​​APPEND​​​​FAIL​​​​IGNORE​​​​PUT​​​​MPUT​​​​REPLACE​​​​GET​​​​MGET​​​​FAIL​

使用 Java 配置进行配置

以下 Spring 引导应用程序显示了如何使用 Java 配置出站网关的示例:

@SpringBootApplication
public class SftpJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler handler() {
return new SftpOutboundGateway(ftpSessionFactory(), "ls", "'my_remote_dir/'");
}

}

使用 Java DSL 进行配置

以下 Spring 引导应用程序显示了如何使用 Java DSL 配置出站网关的示例:

@SpringBootApplication
public class SftpJavaApplication {

public static void main(String[] args) {
new SpringApplicationBuilder(SftpJavaApplication.class)
.web(false)
.run(args);
}

@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory sf = new DefaultSftpSessionFactory();
sf.setHost("localhost");
sf.setPort(port);
sf.setUsername("foo");
sf.setPassword("foo");
factory.setTestSession(true);
return new CachingSessionFactory<>(sf);
}

@Bean
public QueueChannelSpec remoteFileOutputChannel() {
return MessageChannels.queue();
}

@Bean
public IntegrationFlow sftpMGetFlow() {
return IntegrationFlow.from("sftpMgetInputChannel")
.handle(Sftp.outboundGateway(sftpSessionFactory(),
AbstractRemoteFileOutboundGateway.Command.MGET, "payload")
.options(AbstractRemoteFileOutboundGateway.Option.RECURSIVE)
.regexFileNameFilter("(subSftpSource|.*1.txt)")
.localDirectoryExpression("'myDir/' + #remoteDirectory")
.localFilenameExpression("#remoteFileName.replaceFirst('sftpSource', 'localTarget')"))
.channel("remoteFileOutputChannel")
.get();
}

}

出站网关部分成功 ( 和​​mget​​​​mput​​)

对多个文件执行操作(通过使用 和 )时,可能会在传输一个或多个文件后的一段时间内发生异常。 在这种情况下(从版本 4.2 开始),抛出 a。 除了常用属性 ( 和 ) 之外,此异常还有两个附加属性:​​mget​​​​mput​​​​PartialSuccessException​​​​MessagingException​​​​failedMessage​​​​cause​

  • ​partialResults​​:传输成功结果。
  • ​derivedInput​​:从请求消息生成的文件列表(例如要传输的本地文件)。mput

这些属性使您可以确定哪些文件已成功传输,哪些文件未成功传输。

在递归的情况下,可能有嵌套实例。​​mput​​​​PartialSuccessException​​​​PartialSuccessException​

请考虑以下目录结构:

root/
|- file1.txt
|- subdir/
| - file2.txt
| - file3.txt
|- zoo.txt

如果异常发生在 上,则网关抛出的 具有 、 和 的 。 它是另一个 with of 和 and of .​​file3.txt​​​​PartialSuccessException​​​​derivedInput​​​​file1.txt​​​​subdir​​​​zoo.txt​​​​partialResults​​​​file1.txt​​​​cause​​​​PartialSuccessException​​​​derivedInput​​​​file2.txt​​​​file3.txt​​​​partialResults​​​​file2.txt​

消息会话回调

从 Spring Integration 版本 4.2 开始,您可以使用带有 () 的实现在上下文中执行任何操作。 您可以将其用于任何非标准或低级 SFTP 操作(或多个),例如允许从集成流定义或功能接口 (lambda) 实现注入进行访问。 以下示例使用 lambda:​​MessageSessionCallback<F, T>​​​​<int-sftp:outbound-gateway/>​​​​SftpOutboundGateway​​​​Session<SftpClient.DirEntry>​​​​requestMessage​

@Bean
@ServiceActivator(inputChannel = "sftpChannel")
public MessageHandler sftpOutboundGateway(SessionFactory<SftpClient.DirEntry> sessionFactory) {
return new SftpOutboundGateway(sessionFactory,
(session, requestMessage) -> session.list(requestMessage.getPayload()));
}

另一个示例可能是对正在发送或检索的文件数据进行预处理或后处理。

使用 XML 配置时,提供了允许您指定 Bean 名称的属性。​​<int-sftp:outbound-gateway/>​​​​session-callback​​​​MessageSessionCallback​

与 和 属性互斥。 使用 Java 进行配置时,该类提供了不同的构造函数。​​session-callback​​​​command​​​​expression​​​​SftpOutboundGateway​

Apache Mina SFTP 服务器事件

在 5.2 版中添加的侦听某些 Apache Mina SFTP 服务器事件,并将其发布为 s,任何 Bean、Bean 方法或事件入站通道适配器都可以接收这些事件。​​ApacheMinaSftpEventListener​​​​ApplicationEvent​​​​ApplicationListener​​​​@EventListener​

目前,支持的事件包括:

  • ​SessionOpenedEvent​​- 客户端会话已打开
  • ​DirectoryCreatedEvent​​- 已创建目录
  • ​FileWrittenEvent​​- 文件已写入
  • ​PathMovedEvent​​- 文件或目录已重命名
  • ​PathRemovedEvent​​- 删除了文件或目录
  • ​SessionClosedEvent​​- 客户端已断开连接

其中每个都是 的子类;您可以将单个侦听器配置为接收所有事件类型。 每个事件的属性都是 ,您可以从中获取客户端地址等信息;在抽象事件上提供了一种方便的方法。​​ApacheMinaSftpEvent​​​​source​​​​ServerSession​​​​getSession()​

要使用侦听器(必须是 Spring bean)配置服务器,只需将其添加到 :​​SftpSubsystemFactory​

server = SshServer.setUpDefaultServer();
...
SftpSubsystemFactory sftpFactory = new SftpSubsystemFactory();
sftpFactory.addSftpEventListener(apacheMinaSftpEventListenerBean);
...

要使用 Spring 集成事件适配器使用这些事件,请执行以下操作:

@Bean
public ApplicationEventListeningMessageProducer eventsAdapter() {
ApplicationEventListeningMessageProducer producer =
new ApplicationEventListeningMessageProducer();
producer.setEventTypes(ApacheMinaSftpEvent.class);
producer.setOutputChannel(eventChannel());
return producer;
}

远程文件信息

从版本 5.2 开始,(SFTP 流式入站通道适配器)、(SFTP 入站通道适配器)和 (SFTP 出站网关) 的“读取”命令在消息中提供其他标头,以生成有关远程文件的信息:​​SftpStreamingMessageSource​​​​SftpInboundFileSynchronizingMessageSource​​​​SftpOutboundGateway​

  • ​FileHeaders.REMOTE_HOST_PORT​​- 在文件传输操作期间远程会话连接到的主机:端口对;
  • ​FileHeaders.REMOTE_DIRECTORY​​- 已执行操作的远程目录;
  • ​FileHeaders.REMOTE_FILE​​- 远程文件名;仅适用于单个文件操作。

由于 不会针对远程文件生成消息,而是使用本地副本,因此在同步操作期间,会将有关远程文件的信息存储在 (可以在外部配置) 中的 URI 样式 ()。 轮询本地文件时,将检索此元数据。 删除本地文件时,建议删除其元数据条目。 为此提供了一个回调。 此外,还有一个要在元数据键中使用的。 当这些组件之间共享同一实例时,建议使此前缀与基于 -的实现中使用的前缀不同,以避免条目覆盖,因为两者都筛选并使用相同的本地文件名作为元数据条目键。​​SftpInboundFileSynchronizingMessageSource​​​​AbstractInboundFileSynchronizer​​​​MetadataStore​​​​protocol://host:port/remoteDirectory#remoteFileName​​​​SftpInboundFileSynchronizingMessageSource​​​​AbstractInboundFileSynchronizer​​​​removeRemoteFileMetadata()​​​​setMetadataStorePrefix()​​​​MetadataStore​​​​FileListFilter​​​​MetadataStore​​​​AbstractInboundFileSynchronizer​

举报

相关推荐

0 条评论