关于 Redis 发布订阅的功能介绍可以参考:订阅与发布。下面我来介绍下 Redis 发布订阅功能的特性和适用场景。
由于没有消息持久化与 ACK 的保证,所以,Redis 的发布订阅功能并不可靠。这也就导致了它的应用场景很有限,建议用于实时与可靠性要求不高的场景。例如:
总之,Redis 发布订阅功能足够简单,如果没有过多的要求,且不想搭建 Kafka、RabbitMQ 这样的可靠型消息系统时,可以考虑尝试使用 Redis。
<parent> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-parent</artifactId> <version>2.0.2.RELEASE</version> <relativePath/></parent>
Spring Data Redis 实现发布订阅功能非常简单,只有这样的几个类:Topic、MessageListener、RedisMessageListenerContainer。下面对它们进行解释:
消息发送者与接收者之间的 channel 定义,有两个实现类: org.springframework.data.redis.listener.ChannelTopic:一个确定的字符串 org.springframework.data.redis.listener.PatternTopic:基于模式匹配
一个回调接口,消息监听器,用于接收发送到 channel 的消息,接口定义如下:
package org.springframework.data.redis.connection; import org.springframework.lang.Nullable; /** * Listener of messages published in Redis. * * @author Costin Leau * @author Christoph Strobl */public interface MessageListener { /** * Callback for processing received objects through Redis. * * @param message message must not be {@literal null}. * @param pattern pattern matching the channel (if specified) - can be {@literal null}. */ void onMessage(Message message, @Nullable byte[] pattern);}
用于消息监听,需要将 Topic 和 MessageListener 注册到 RedisMessageListenerContainer 中。这样,当 Topic 上有消息时,由 RedisMessageListenerContainer 通知 MessageListener,客户端通过 onMessage 拿到消息后,自行处理。
说明:当前给出的示例代码使用 ChannelTopic,可以自行测试使用 PatternTopic。
import lombok.AllArgsConstructor;import lombok.Builder;import lombok.Data;import lombok.NoArgsConstructor; import java.io.Serializable; /** * <h1>城市信息</h1> * Created by Qinyi. */@Data@Builder@NoArgsConstructor@AllArgsConstructorpublic class CityInfo implements Serializable { /** 城市 */ private String city; /** 经度 */ private Double longitude; /** 纬度 */ private Double latitude;}
import listener.SubscribeListener;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.data.redis.connection.RedisConnection;import org.springframework.data.redis.connection.RedisConnectionFactory;import org.springframework.data.redis.core.RedisTemplate;import org.springframework.data.redis.listener.ChannelTopic;import org.springframework.data.redis.listener.PatternTopic;import org.springframework.data.redis.listener.RedisMessageListenerContainer;import org.springframework.data.redis.serializer.JdkSerializationRedisSerializer;import org.springframework.data.redis.serializer.RedisSerializer;import org.springframework.data.redis.serializer.StringRedisSerializer; /** * <h1>Redis 配置</h1> * Created by Qinyi. */@Configurationpublic class RedisConfig { private final RedisConnectionFactory redisConnectionFactory; @Autowired public RedisConfig(RedisConnectionFactory redisConnectionFactory) { this.redisConnectionFactory = redisConnectionFactory; } /** * <h2>配置消息监听器</h2> * */ @Bean public SubscribeListener listener() { return new SubscribeListener(); } /** * <h2>配置 发布/订阅 的 Topic</h2> * */ @Bean public ChannelTopic channelTopic() { return new ChannelTopic("city"); } /** * <h2>配置 ChannelName 的模式匹配</h2> * */ @Bean public PatternTopic patternTopic() { return new PatternTopic("/city/*"); } /** * <h2>将消息监听器绑定到消息容器</h2> * */ @Bean public RedisMessageListenerContainer messageListenerContainer() { RedisMessageListenerContainer container = new RedisMessageListenerContainer(); container.setConnectionFactory(redisConnectionFactory); // 可以修改成 patternTopic, 看一看 MessageListener 中监听的数据 container.addMessageListener(listener(), channelTopic()); return container; }}
import org.springframework.data.redis.connection.Message;import org.springframework.data.redis.connection.MessageListener; /** * <h1>消息监听器</h1> * Created by Qinyi. */public class SubscribeListener implements MessageListener { /** * <h2>消息回调</h2> * @param message {@link Message} 消息体 + ChannelName * @param pattern 订阅的 pattern, ChannelName 的模式匹配 * */ @Override public void onMessage(Message message, byte[] pattern) { String body = new String(message.getBody()); String channel = new String(message.getChannel()); String pattern_ = new String(pattern); System.out.println(body); System.out.println(channel); System.out.println(pattern_); // 如果是 ChannelTopic, 则 channel 字段与 pattern 字段值相同 }}
import com.alibaba.fastjson.JSON;import Application;import vo.CityInfo;import org.junit.Test;import org.junit.runner.RunWith;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.boot.test.context.SpringBootTest;import org.springframework.data.redis.core.StringRedisTemplate;import org.springframework.data.redis.listener.ChannelTopic;import org.springframework.test.context.junit4.SpringRunner; /** * <h1>Redis 发布订阅测试用例</h1> * Created by Qinyi. */@RunWith(SpringRunner.class)@SpringBootTest(classes = {Application.class}, webEnvironment = SpringBootTest.WebEnvironment.NONE)public class RedisPubSubTest { @Autowired private StringRedisTemplate redisTemplate; @Autowired private ChannelTopic topic; @Test public void testRedisPubSub() { redisTemplate.convertAndSend( topic.getTopic(), JSON.toJSONString(new CityInfo("hefei", 117.17, 31.52)) ); }}
执行测试用例,可以看到如下打印信息:
2019-03-12 17:54:41.699 INFO 5627 --- [enerContainer-1] io.lettuce.core.EpollProvider : Starting without optional epoll library2019-03-12 17:54:41.703 INFO 5627 --- [enerContainer-1] io.lettuce.core.KqueueProvider : Starting without optional kqueue library2019-03-12 17:54:42.354 INFO 5627 --- [ main] com.imooc.ad.service.RedisPubSubTest : Started RedisPubSubTest in 8.364 seconds (JVM running for 12.321){"city":"hefei","latitude":31.52,"longitude":117.17}citycity2019-03-12 17:54:42.936 INFO 5627 --- [ Thread-4] s.c.a.AnnotationConfigApplicationContext : Closing org.springframework.context.annotation.AnnotationConfigApplicationContext@2812b107: startup date [Tue Mar 12 17:54:35 CST 2019]; root of context hierarchy2019-03-12 17:54:42.939 INFO 5627 --- [ Thread-4] o.s.c.support.DefaultLifecycleProcessor : Stopping beans in phase 2147483647 Process finished with exit code 0
原网址: 访问
创建于: 2021-02-04 18:11:53
目录: default
标签: 无
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论