使用 Spring Integration Framework 写入 redis 队列

Spring Integration Framework 提供了应用集成的一种方式。各个应用程 序,组件,通过 Channel, Message 松散耦合在一起。

这个例子演示了如何从 stdin 读取用户输入,然后写入 redis 队列的。

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
	<modelVersion>4.0.0</modelVersion>

	<groupId>com.wcy123.example.spring.integration.redis</groupId>
	<artifactId>si-redis</artifactId>
	<version>0.0.1-SNAPSHOT</version>
	<packaging>jar</packaging>

	<name>si-redis</name>
	<description>Demo project for Spring Boot</description>

	<parent>
		<groupId>org.springframework.boot</groupId>
		<artifactId>spring-boot-starter-parent</artifactId>
		<version>1.4.0.RELEASE</version>
		<relativePath/> <!-- lookup parent from repository -->
	</parent>

	<properties>
		<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
		<project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
		<java.version>1.8</java.version>
	</properties>

	<dependencies>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-data-redis</artifactId>
		</dependency>
		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-integration</artifactId>
		</dependency>

		<dependency>
			<groupId>org.springframework.boot</groupId>
			<artifactId>spring-boot-starter-test</artifactId>
			<scope>test</scope>
		</dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-redis</artifactId>
        </dependency>
	</dependencies>
	<build>
		<plugins>
			<plugin>
				<groupId>org.springframework.boot</groupId>
				<artifactId>spring-boot-maven-plugin</artifactId>
			</plugin>
		</plugins>
	</build>
</project>

上面大部分是自动生成的,自己加的只有

        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-stream</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.integration</groupId>
            <artifactId>spring-integration-redis</artifactId>
        </dependency>

引入 spring-integration-stream ,我们可以解析 xml 中的 namespace

xmlns:int-stream="http://www.springframework.org/schema/integration/stream"

spring-integration-stream 的引入,我们可以解析 xml 中的 namespace

xmlns:int-redis="http://www.springframework.org/schema/integration/redis">

下面的 spring boot 生成的程序框架。

// src/main/java/com/wcy123/example/spring/integration/redis/SiRedisApplication.java
package com.wcy123.example.spring.integration.redis;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ImportResource;
import org.springframework.integration.config.EnableIntegration;

@SpringBootApplication
@EnableIntegration
@ImportResource("classpath:/META-INF/spring/si-components.xml")
public class SiRedisApplication {

	public static void main(String[] args) {
		SpringApplication.run(SiRedisApplication.class, args);
	}
}

@EnableIntegration 标注一个 Configuration 类是 Spring Integration 的配置。

@ImportResource("classpath:/META-INF/spring/si-components.xml")

用来引入 spring integration 的 beam 定义。目前 4.3.0 中 ,不是所有的 spring integration 组件都支持 java annotation 的配置,很多还是依赖 xml 的配置。

src/main/resources/META-INF/spring/si-components.xml 的内容

<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
       xmlns:int="http://www.springframework.org/schema/integration"
       xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
       xmlns:int-stream="http://www.springframework.org/schema/integration/stream"
       xsi:schemaLocation="
	http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
	http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
	http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
	http://www.springframework.org/schema/integration/redis  http://www.springframework.org/schema/integration/redis/spring-integration-redis.xsd"
       xmlns:int-redis="http://www.springframework.org/schema/integration/redis">

    <int-stream:stdin-channel-adapter id="producer" channel="messageChannel"/>

    <int:poller id="defaultPoller" default="true"
                max-messages-per-poll="5" fixed-rate="200"/>

    <int:channel id="messageChannel"/>
    <bean id="redisConnectionFactory"
          class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory">
    </bean>
    <int-redis:queue-outbound-channel-adapter
            id="queue"
            channel="messageChannel" queue="a_queue"
            connection-factory="redisConnectionFactory"
            left-push="false"
    ></int-redis:queue-outbound-channel-adapter>
</beans>
    <int-stream:stdin-channel-adapter id="producer" channel="messageChannel"/>

    <int:poller id="defaultPoller" default="true"
                max-messages-per-poll="5" fixed-rate="200"/>

创建了一个 endpoint ,每隔 200 ms ,从标准输入读取数据,然后扔到 messageChannel 中。

<int:channel id="messageChannel"/>

创建了一个 messageChannel 的 channel。

    <int-redis:queue-outbound-channel-adapter
            id="queue"
            channel="messageChannel" queue="a_queue"
            connection-factory="redisConnectionFactory"
            left-push="false"
    ></int-redis:queue-outbound-channel-adapter>

会从 messageChannel 中读取 Message , 然后用 rpush 命令,把消息压到 消息队列 a_queue 中。

queue-outbound-channel-adapter 需要一个 redis connection factory

    <bean id="redisConnectionFactory"
          class="org.springframework.data.redis.connection.jedis.JedisConnectionFactory" />

这里创建了一个 redis connection factory , 这个 bean 有很多属性,用于 表明连接的主机名称,密码,端口号,是否使用连接池等等。