发送的消息无法订阅_Redis 的发布订阅功能在 SpringBoot 中的应用_思考的葡萄的博客-CSDN博客

f5302744791cd423b3806c0aafb804be.png

认识 Redis 的发布订阅功能

关于 Redis 发布订阅的功能介绍可以参考:订阅与发布。下面我来介绍下 Redis 发布订阅功能的特性和适用场景。

Redis 发布订阅功能的特性

  • 消息的发送者与接收者之间通过 channel 绑定:channel 可以是确定的字符串,也可以基于模式匹配
  • 客户端可以订阅任意多个 channel
  • 发送者发送的消息无法持久化,所以可能会造成消息丢失
  • 由于消息无法持久化,所以,消费者无法收到在订阅 channel 之间发送的消息
  • 发送者与客户端之间的消息发送与接收不存在 ACK 机制

Redis 发布订阅功能的适用场景

由于没有消息持久化与 ACK 的保证,所以,Redis 的发布订阅功能并不可靠。这也就导致了它的应用场景很有限,建议用于实时与可靠性要求不高的场景。例如:

  • 消息推送
  • 内网环境的消息通知
  • ...

总之,Redis 发布订阅功能足够简单,如果没有过多的要求,且不想搭建 Kafka、RabbitMQ 这样的可靠型消息系统时,可以考虑尝试使用 Redis。

Redis 发布订阅功能在 SpringBoot 中的关键类

SpringBoot 版本

<parent>  <groupId>org.springframework.boot</groupId>  <artifactId>spring-boot-starter-parent</artifactId>  <version>2.0.2.RELEASE</version>  <relativePath/></parent>

Spring Data Redis 实现发布订阅功能非常简单,只有这样的几个类:Topic、MessageListener、RedisMessageListenerContainer。下面对它们进行解释:

org.springframework.data.redis.listener.Topic

消息发送者与接收者之间的 channel 定义,有两个实现类: org.springframework.data.redis.listener.ChannelTopic:一个确定的字符串 org.springframework.data.redis.listener.PatternTopic:基于模式匹配

org.springframework.data.redis.connection.MessageListener

一个回调接口,消息监听器,用于接收发送到 channel 的消息,接口定义如下:

package org.springframework.data.redis.connection; import org.springframework.lang.Nullable; /** * Listener of messages published in Redis. * * @author Costin Leau * @author Christoph Strobl */public interface MessageListener {     /**     * Callback for processing received objects through Redis.     *     * @param message message must not be {@literal null}.     * @param pattern pattern matching the channel (if specified) - can be {@literal null}.     */    void onMessage(Message message, @Nullable byte[] pattern);}

org.springframework.data.redis.listener.RedisMessageListenerContainer

用于消息监听,需要将 Topic 和 MessageListener 注册到 RedisMessageListenerContainer 中。这样,当 Topic 上有消息时,由 RedisMessageListenerContainer 通知 MessageListener,客户端通过 onMessage 拿到消息后,自行处理。

Redis 发布订阅功能在 SpringBoot 中的实践

说明:当前给出的示例代码使用 ChannelTopic,可以自行测试使用 PatternTopic。

  • VO 对象定义:CityInfo
import lombok.AllArgsConstructor;import lombok.Builder;import lombok.Data;import lombok.NoArgsConstructor; import java.io.Serializable; /** * <h1>城市信息</h1> * Created by Qinyi. */@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic class CityInfo implements Serializable {     /** 城市 */    private String city;     /** 经度 */    private Double longitude;     /** 纬度 */    private Double latitude;}
  • 配置类定义:RedisConfig
import listener.SubscribeListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.connection.RedisConnection;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.listener.ChannelTopic;import org.springframework.data.redis.listener.PatternTopic;import org.springframework.data.redis.listener.RedisMessageListenerContainer;import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;import org.springframework.data.redis.serializer.RedisSerializer;import org.springframework.data.redis.serializer.StringRedisSerializer; /** * <h1>Redis 配置</h1> * Created by Qinyi. */@Configurationpublic class RedisConfig {     private final RedisConnectionFactory redisConnectionFactory;     @Autowired    public RedisConfig(RedisConnectionFactory redisConnectionFactory) {        this.redisConnectionFactory = redisConnectionFactory;    }     /**     * <h2>配置消息监听器</h2>     * */    @Bean    public SubscribeListener listener() {        return new SubscribeListener();    }     /**     * <h2>配置 发布/订阅 的 Topic</h2>     * */    @Bean    public ChannelTopic channelTopic() {        return new ChannelTopic("city");    }     /**     * <h2>配置 ChannelName 的模式匹配</h2>     * */    @Bean    public PatternTopic patternTopic() {        return new PatternTopic("/city/*");    }     /**     * <h2>将消息监听器绑定到消息容器</h2>     * */    @Bean    public RedisMessageListenerContainer messageListenerContainer() {         RedisMessageListenerContainer container = new RedisMessageListenerContainer();        container.setConnectionFactory(redisConnectionFactory);         // 可以修改成 patternTopic, 看一看 MessageListener 中监听的数据        container.addMessageListener(listener(), channelTopic());        return container;    }}
  • MessageListener 接口实现类:SubscribeListener
import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.connection.MessageListener; /** * <h1>消息监听器</h1> * Created by Qinyi. */public class SubscribeListener implements MessageListener {     /**     * <h2>消息回调</h2>     * @param message {@link Message} 消息体 + ChannelName     * @param pattern 订阅的 pattern, ChannelName 的模式匹配     * */    @Override    public void onMessage(Message message, byte[] pattern) {         String body = new String(message.getBody());        String channel = new String(message.getChannel());        String pattern_ = new String(pattern);         System.out.println(body);        System.out.println(channel);        System.out.println(pattern_);       // 如果是 ChannelTopic, 则 channel 字段与 pattern 字段值相同    }}
  • 测试用例:RedisPubSubTest
import com.alibaba.fastjson.JSON;import Application;import vo.CityInfo;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.data.redis.listener.ChannelTopic;import org.springframework.test.context.junit4.SpringRunner; /** * <h1>Redis 发布订阅测试用例</h1> * Created by Qinyi. */@RunWith(SpringRunner.class)@SpringBootTest(classes = {Application.class}, webEnvironment = SpringBootTest.WebEnvironment.NONE)public class RedisPubSubTest {     @Autowired    private StringRedisTemplate redisTemplate;     @Autowired    private ChannelTopic topic;     @Test    public void testRedisPubSub() {         redisTemplate.convertAndSend(                topic.getTopic(),                JSON.toJSONString(new CityInfo("hefei", 117.17, 31.52))        );    }}

执行测试用例,可以看到如下打印信息:

2019-03-12 17:54:41.699  INFO 5627 --- [enerContainer-1] io.lettuce.core.EpollProvider            : Starting without optional epoll library2019-03-12 17:54:41.703  INFO 5627 --- [enerContainer-1] io.lettuce.core.KqueueProvider           : Starting without optional kqueue library2019-03-12 17:54:42.354  INFO 5627 --- [           main] com.imooc.ad.service.RedisPubSubTest     : Started RedisPubSubTest in 8.364 seconds (JVM running for 12.321){"city":"hefei","latitude":31.52,"longitude":117.17}citycity2019-03-12 17:54:42.936  INFO 5627 --- [       Thread-4] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@2812b107: startup date [Tue Mar 12 17:54:35 CST 2019]; root of context hierarchy2019-03-12 17:54:42.939  INFO 5627 --- [       Thread-4] o.s.c.support.DefaultLifecycleProcessor  : Stopping beans in phase 2147483647 Process finished with exit code 0

原网址: 访问
创建于: 2021-02-04 18:11:53
目录: default
标签: 无

请先后发表评论
  • 最新评论
  • 总共0条评论