SpringBoot整合Redis实现缓存、队列、广播_只有变秃,才能更强-CSDN博客 ---- 实用

**[提前声明]
文章由作者:张耀峰 结合自己生产中的使用经验整理,最终形成简单易懂的文章
写作不易,转载请注明,谢谢!
spark代码案例地址: https://github.com/Mydreamandreality/sparkResearch**

    • *

本次主要给各位分享Redis与SpringBoot的集成使用,缓存案例,消息队列案例,广播案例

初识Redis

  • Redis是开源的(BSD许可)内存数据结构存储,常用作于<K,V>数据库,缓存和消息代理
  • Redis支持的数据结构如下

    • 字符串(String)
    • 哈希/散列/字典(Hash)
    • 列表(List)
    • 集合(Set)
    • 有序集合(sorted set)

主流的Redis-JavaApi框架

  • Jedis Redis实现的Java客户端,API较为全面
  • Redisson实现了分布式和可扩展的java数据结构,支持的数据结构有:List, Set, Map, Queue, SortedSet, ConcureentMap, Lock, AtomicLong, CountDownLatch。并且是线程安全的,底层使用Netty 4实现网络通信。和jedis相比,功能比较简单,不支持排序,事务,管道,分区等redis特性,可以认为是jedis的补充,不能替换jedis
  • Lettuce基于Netty的连接实例(StatefulRedisConnection),可以在多个线程间并发访问,且线程安全,满足多线程环境下的并发访问,同时它是可伸缩的设计,一个连接实例不够的情况也可以按需增加连接实例
    注:官方推荐使用Jedis
    springboot 1.5.x版本的默认的Redis客户端是 Jedis实现的,springboot 2.x版本中默认客户端是用 lettuce实现的

Springboot集成Redis-配置

  • 首先在pom中新增Redis依赖
        <!--Redis客户端-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <!-- redis依赖commons-pool 这个依赖一定要添加 -->
        <dependency>
            <groupId>org.apache.commons</groupId>
            <artifactId>commons-pool2</artifactId>
        </dependency>
  • 在springboot启动类增加注解:@EnableCaching

补充:搞到这里,有兄弟肯定心想,不是推荐使用jedis吗,为什么我们此处是用的SpringDataRedis,因为SpringDataRedis中对JedisApi进行了高度封装,更加方便我们开发,而且SpringDataRedis相对于Jedis来说可以方便地更换Redis的Java客户端,比Jedis多了自动管理连接池的特性,方便与其他Spring框架进行搭配使用

  • 在application.yml中增加Redis的配置
spring:
  redis:
    database: 0
    host: 127.0.0.1
    port: 6379
    password:            #没有可以不填
    lettuce:            #连接池配置
      pool:
        max-active: 200
        max-wait: -1
        max-idle: 10
        min-idle: 10
    timeout: 1000
  cache: redis
  • 增加Redis序列化配置

    • 为什么需要这个:因为redis默认的序列化和反序列化是JdkSerializationRedisSerializer完成的,此处我们重写序列化方式,是为了防止Key或者Value存储到Redis中乱码,导致缓存读取不到或者值乱码的情况
/**
 * @author 孤
 * @version v1.0
 * @Developers 张耀烽
 * @serviceProvider xxx
 * @description redis配置
 * @date 2019年11月15日 00:00:00
 */
@Configuration
public class RedisConfig {
    //Key的过期时间
    private Duration timeToLive = Duration.ofDays(1);

    /**
     * redis模板,存储关键字是字符串,值jackson2JsonRedisSerializer是序列化后的值
     *
     * @param
     * @return org.springframework.data.redis.core.RedisTemplate
     */
    @Bean
    public RedisTemplate<String, Object> redisTemplate(RedisConnectionFactory connectionFactory) {
        RedisTemplate<String, Object> redisTemplate = new RedisTemplate<>();
        redisTemplate.setConnectionFactory(connectionFactory);
        //使用Jackson2JsonRedisSerializer来序列化和反序列化redis的value值(默认使用JDK的序列化方式)
        Jackson2JsonRedisSerializer jackson2JsonRedisSerializer = new Jackson2JsonRedisSerializer(Object.class);
        ObjectMapper objectMapper = new ObjectMapper();
        objectMapper.setVisibility(PropertyAccessor.ALL, JsonAutoDetect.Visibility.ANY);
        objectMapper.enableDefaultTyping(ObjectMapper.DefaultTyping.NON_FINAL);
        jackson2JsonRedisSerializer.setObjectMapper(objectMapper);

        //使用StringRedisSerializer来序列化和反序列化redis的key值
        RedisSerializer redisSerializer = new StringRedisSerializer();
        //key
        redisTemplate.setKeySerializer(redisSerializer);
        redisTemplate.setHashKeySerializer(redisSerializer);
        //value
        redisTemplate.setValueSerializer(jackson2JsonRedisSerializer);
        redisTemplate.setHashValueSerializer(jackson2JsonRedisSerializer);

        redisTemplate.afterPropertiesSet();
        return redisTemplate;
    }

    @Bean
    public RedisCacheConfiguration redisCacheConfiguration() {
        return RedisCacheConfiguration.
                defaultCacheConfig().
                entryTtl(this.timeToLive).            //Key过期时间 此处设置1天
                serializeKeysWith(RedisSerializationContext.SerializationPair.
                        fromSerializer(new StringRedisSerializer())).
                serializeValuesWith(RedisSerializationContext.SerializationPair.
                        fromSerializer(new GenericJackson2JsonRedisSerializer()));
    }
}

Springboot集成Redis-缓存的使用

  • 配置完之后,我们就可以开始愉快的开发了
  • 比如此时我们有一个service,只需要在查询的方法上增加@Cacheable注解即可:

    • @Cacheable注解:
    • value代表当前缓存的分组名称(方便我们对缓存做管理,不是缓存的value)
    • key代表当前缓存的Key值
    • unless表达式,使用场景:如果当前方法返回null,则不记录缓存
@Service
public class CsdnDemoImpl implements CsdnDemoService {
    @Autowired
    private CsdnDemoMapper csdnDemoMapper;

    @Cacheable(value = "corp_id", key = "#id", unless = "#result == null")
    @Override
    public DemoDO getDemoInfo(String id) {
        return csdnDemoMapper.getDemoInfo(id);
    }
}
  • 到此缓存的基本功能我们就实现了,此时我们测试一下,访问这个方法,先通过@Cacheable注解自动查找缓存是否存在,由于是第一次访问,缓存是肯定不存在的,然后正常执行我们的查询,到Mysql中查询,查询到数据后,注解会帮助我们自动把结果写入到redis中,key就是参数id,value就是方法返回的结果,此时再次访问时,@Cacheable直接在redis中查找到缓存,所以会直接返回,而不去跟数据库做交互
  • 可以下载RedisDesktopManager(redis可视化工具)进行配合测试
    案例
  • 此时我们的缓存看起来是没问题了,但是深入思考下,还是会发现很多问题:

    • 如果这条数据被更新了,此时缓存是无感知的,如果我们继续访问查询接口,实际上访问的还是更新之前的值,就会出现脏数据
    • 如果这条数据被删除了,也是同样的道理
  • 那么如何解决呢,只需要在update方法或者delete的方法上增加@CacheEvict注解即可

    • @CacheEvict注解
    • value和@Cacheable注解的作用是一样的,标记是哪个缓存分组下的操作
    • key代表需要更新的缓存Key
    @CacheEvict(value = "corp_id", key = "#demoDO.getId()")
    @Override
    public void updateSecretInfo(DemoDO demoDO) {
        try {
            csdnDemoMapper.updateDemoDO(demoDO);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
  • 当调用更新的方法时,@CacheEvict会先清空Redis中key是方法实体中id的Value,删除的接口也是同样的写法
  • 至此,一个较为完善的缓存使用就完成了
  • 后续会写个不使用注解的demo,使用springDataRedis提供的API来实现我们的缓存场景

消息队列-点对点以及订阅发布的模式区别

Springboot集成Redis-消息队列的使用

  • 点对点的消息队列
    首先我们实现消费者相关的代码,思路如下:
  • 定时任务轮询队列中的数据
  • 如果有则消费,没有则阻塞,直到有新的数据或者阻塞超时
  • 使用线程池(按照阿里巴巴Java开发规范自己实现)的方式,节省系统资源
  • 代码如下:
/**
 * @author 孤
 * @version 1.0
 * @name 张耀烽
 * @description
 * @date 2019/11/27
 */
@Component
public class ConsumerService {

    private final Logger logger = LoggerFactory.getLogger(this.getClass());

    @Autowired
    private RedisTemplate<String, String> redisTemplate;


    @Scheduled(cron = "*/5 * * * * ?")
    public synchronized void consumer() {
        logger.info("消费队列数据,消费时间:[{}]", DateTimeKit.formatDateTime(new Date()));
        ThreadManager.getThreadPoolProxy(1, 5).execute(() -> {
            try {
                String message = redisTemplate.opsForList().rightPop("queue:queueData",5, TimeUnit.SECONDS);
                System.out.println("接收到了消息message" + message);
            } catch (Exception ex) {
                logger.info("队列阻塞超时-[{}]-[{}]", DateTimeKit.formatDateTime(new Date()), ex.getMessage());
            } finally {
                logger.info("线程销毁-[{}]", DateTimeKit.formatDateTime(new Date()));
                ThreadManager.shutdownThreadPoolProxy();
            }
        });
    }
}

我们使用@Scheduled(cron = "*/5 * * * * ?")设置了每五秒轮询一次是否存在队列数据,这个可以根据自己的业务场景灵活调整

  • 生产者的代码实现
    这个就比较简单粗暴了,此处直接定义一个接口来模拟生产者即可
/**
 * @author 孤
 * @version 1.0
 * @name 张耀烽
 * @description
 * @date 2019/11/27
 */
@RestController
public class SendController {

    @Autowired
    private RedisTemplate redisTemplate;

    @GetMapping("/send")
    public ServiceResult send() {
        redisTemplate.opsForList().leftPush("queue:queueData", "既然被你看到了我,那我就要和程序世界说再见了哦!");
        return ServiceResult.success("生产成功");
    }
}
  • 都搞定后,我们访问生产数据的接口,
    (此处我点击了五次,生产五条数据方便测试)
    在这里插入图片描述
  • 然后再次查看Redis面板,找到我们刚才生产的五条数据(此时还没有被消费是因为我们的定时任务还未执行)
    在这里插入图片描述
  • 大约五秒后,查看我们的控制台输出,可以发现数据被成功的消费,一共五条数据,全部消费后,又进入阻塞的状态,直到队列不为空
    在这里插入图片描述
  • 为了确保我们确实是消费了数据,查看redis面板,此时队列中的数据已经被全部消费,已经不存在了
    在这里插入图片描述
  • 总结:

    • 使用定时任务轮询的方式进行数据的消费
    • 使用线程池的方式,避免资源浪费
    • 点对点的队列模式.消息只能被一个消费者消费,且只有一次

Springboot集成Redis-广播通知的使用

  • 订阅:(多个频道)
    在我们的RedisConfig类中增加redis广播监听Bean
    private final String ChannelOne = "ChannelOne";

    private final String ChannelTwo = "ChannelTwo";
    /**
     * Redis订阅消息监听器
     *
     * @param connectionFactory
     * @param channelOneAdapter
     * @param channelTwoAdapter
     * @return
     */
    @Bean
    public RedisMessageListenerContainer container(RedisConnectionFactory connectionFactory, MessageListenerAdapter channelOneAdapter, MessageListenerAdapter channelTwoAdapter) {
        RedisMessageListenerContainer listenerContainer = new RedisMessageListenerContainer();
        listenerContainer.setConnectionFactory(connectionFactory);
        //监听频道1
        listenerContainer.addMessageListener(channelOneAdapter, new PatternTopic(ChannelOne));
        //监听频道2
        listenerContainer.addMessageListener(channelTwoAdapter, new PatternTopic(ChannelTwo));
        return listenerContainer;
    }

    /**
     * 委托对象 当我们监听的频道1 有新消息到来时,使用defaultListenerMethod来处理订阅的消息
     * 此处springboot利用反射的技术,使用defaultListenerMethod处理消息
     * @param processMessagesChannelOne
     * @return
     */
    @Bean
    public MessageListenerAdapter channelOneAdapter(ProcessMessagesChannelOne processMessagesChannelOne) {
        return new MessageListenerAdapter(processMessagesChannelOne, "monitorBroadcast");
    }

    /**
     * 委托对象 当我们监听的频道2 有新消息到来时,使用defaultListenerMethod来处理订阅的消息
     * 此处springboot利用反射的技术,使用defaultListenerMethod处理消息
     * @param processMessagesChannelTwo
     * @return
     */
    @Bean
    public MessageListenerAdapter channelTwoAdapter(ProcessMessagesChannelTwo processMessagesChannelTwo) {
        return new MessageListenerAdapter(processMessagesChannelTwo, "monitorBroadcast");
    }

OK,到这里我们先整理一下上面代码的思路

  • 首先我们定义了container()方法增加了Redis订阅消息监听器
  • container()方法中我们使用addMessageListener监听了两个频道,并且使用MessageListenerAdapter实现代理委托类,利用反射的方法处理监听到的消息
  • 那么下面我想兄弟们就清楚该怎么做了,那就是实现我们的反射方法
  • 可以看到我们的channelOneAdapter()channelTwoAdapter()两个方法中都各有一个参数,这个参数就是我们的委托类,那么我们就要先定义出这两个类
  • 委托类ProcessMessagesChannelOne
/**
 * @author 孤
 * @version 1.0
 * @name 张耀烽
 * @description
 * @date 2019/11/27
 */
@Component
public class ProcessMessagesChannelOne {

    public synchronized void monitorBroadcast(String message) {
        try {
            System.out.println("我监听到频道1的消息啦,消息是:" + message);
        } catch (Exception e) {
            System.out.println("消息监听失败啦~~~~~~~");
        }
    }
}

  • 委托类ProcessMessagesChannelTwo
/**
 * @author 孤
 * @version 1.0
 * @name 张耀烽
 * @description
 * @date 2019/11/27
 */
@Component
public class ProcessMessagesChannelTwo {

    public synchronized void monitorBroadcast(String message) {
        try {
            System.out.println("我监听到频道2的消息啦,消息是:" + message);
        } catch (Exception e) {
            System.out.println("消息监听失败啦~~~~~~~");
        }
    }
}

OK,到这里的话,我们的Redis广播订阅监听就处理完啦,下面就可以写一个模拟生产者的接口,发布消息,看下我们的反射方法是否正常监听到消息

  • 生产者
/**
 * @author 孤
 * @version 1.0
 * @name 张耀烽
 * @description
 * @date 2019/11/27
 */
@RestController
public class PubMessageController {

    public final String ChannelOne = "ChannelOne";

    public final String ChannelTwo = "ChannelTwo";

    @Autowired
    private StringRedisTemplate stringRedisTemplate;

    @GetMapping("/send")
    public ServiceResult sendMessage() {
        stringRedisTemplate.convertAndSend(ChannelOne, "我是来自频道一的消息");
        stringRedisTemplate.convertAndSend(ChannelTwo, "我是来自频道二的消息");
        return ServiceResult.success(HttpStatus.OK);
    }
}
  • 然后我们使用接口测试工具测试,提示我们消息生产成功
    在这里插入图片描述
  • 然后查看控制台打印的结果
  • 最终结果如下
  • 成功订阅消费到广播中的消息
    在这里插入图片描述
  • 以上就是SpringBoot+Redis案例

    • 缓存
    • 队列(点对点)
    • 广播(订阅发布)
如果有任何疑问可以留言或者私信。

原网址: 访问
创建于: 2021-02-04 11:06:41
目录: default
标签: 无

请先后发表评论
  • 最新评论
  • 总共0条评论