SpringBoot2 高级应用(06):整合 Redis哨兵集群 ,实现消息队列场景_ITPUB博客

本文源码:GitHub·点这里 || GitEE·点这里

一、Redis集群简介

1、RedisCluster概念

Redis的分布式解决方案,在3.0版本后推出的方案,有效地解决了Redis分布式的需求,当一个服务宕机可以快速的切换到另外一个服务。redis cluster主要是针对海量数据+高并发+高可用的场景。

二、与SpringBoot2.0整合

1、核心依赖

  1. <dependency>
  2. <groupId>org.springframework.boot</groupId>
  3. <artifactId>spring-boot-starter-data-redis</artifactId>
  4. <version>${spring-boot.version}</version>
  5. </dependency>
  6. <dependency>
  7. <groupId>redis.clients</groupId>
  8. <artifactId>jedis</artifactId>
  9. <version>${redis-client.version}</version>
  10. </dependency>

2、核心配置

  1. spring:
  2. # Redis 集群
  3. redis:
  4. sentinel:
  5. # sentinel 配置
  6. master: mymaster
  7. nodes: 192.168.0.127:26379
  8. maxTotal: 60
  9. minIdle: 10
  10. maxWaitMillis: 10000
  11. testWhileIdle: true
  12. testOnBorrow: true
  13. testOnReturn: false
  14. timeBetweenEvictionRunsMillis: 10000

3、参数渲染类

  1. @ConfigurationProperties(prefix = "spring.redis.sentinel")
  2. public class RedisParam {
  3. private String nodes ;
  4. private String master ;
  5. private Integer maxTotal ;
  6. private Integer minIdle ;
  7. private Integer maxWaitMillis ;
  8. private Integer timeBetweenEvictionRunsMillis ;
  9. private boolean testWhileIdle ;
  10. private boolean testOnBorrow ;
  11. private boolean testOnReturn ;
  12. // 省略GET和SET方法
  13. }

4、集群配置文件

  1. @Configuration
  2. @EnableConfigurationProperties(RedisParam.class)
  3. public class RedisPool {
  4. @Resource
  5. private RedisParam redisParam ;
  6. @Bean("jedisSentinelPool")
  7. public JedisSentinelPool getRedisPool (){
  8. Set<String> sentinels = new HashSet<>();
  9. sentinels.addAll(Arrays.asList(redisParam.getNodes().split(",")));
  10. GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();
  11. poolConfig.setMaxTotal(redisParam.getMaxTotal());
  12. poolConfig.setMinIdle(redisParam.getMinIdle());
  13. poolConfig.setMaxWaitMillis(redisParam.getMaxWaitMillis());
  14. poolConfig.setTestWhileIdle(redisParam.isTestWhileIdle());
  15. poolConfig.setTestOnBorrow(redisParam.isTestOnBorrow());
  16. poolConfig.setTestOnReturn(redisParam.isTestOnReturn());
  17. poolConfig.setTimeBetweenEvictionRunsMillis(redisParam.getTimeBetweenEvictionRunsMillis());
  18. JedisSentinelPool redisPool = new JedisSentinelPool(redisParam.getMaster(), sentinels, poolConfig);
  19. return redisPool;
  20. }
  21. @Bean
  22. SpringUtil springUtil() {
  23. return new SpringUtil();
  24. }
  25. @Bean
  26. RedisListener redisListener() {
  27. return new RedisListener();
  28. }
  29. }

5、配置Redis模板类

  1. @Configuration
  2. public class RedisConfig {
  3. @Bean
  4. public StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {
  5. StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();
  6. stringRedisTemplate.setConnectionFactory(factory);
  7. return stringRedisTemplate;
  8. }
  9. }

三、模拟队列场景案例

生产者消费者模式:客户端监听消息队列,消息达到,消费者马上消费,如果消息队列里面没有消息,那么消费者就继续监听。基于Redis的LPUSH(BLPUSH)把消息入队,用 RPOP(BRPOP)获取消息的模式。

1、加锁解锁工具

  1. @Component
  2. public class RedisLock {
  3. private static String keyPrefix = "RedisLock:";
  4. @Resource
  5. private JedisSentinelPool jedisSentinelPool;
  6. public boolean addLock(String key, long expire) {
  7. Jedis jedis = null;
  8. try {
  9. jedis = jedisSentinelPool.getResource();
  10. /*
  11. * nxxx的值只能取NX或者XX,如果取NX,则只有当key不存在是才进行set,如果取XX,则只有当key已经存在时才进行set
  12. * expx的值只能取EX或者PX,代表数据过期时间的单位,EX代表秒,PX代表毫秒。
  13. */
  14. String value = jedis.set(keyPrefix + key, "1", "nx", "ex", expire);
  15. return value != null;
  16. } catch (Exception e){
  17. e.printStackTrace();
  18. }finally {
  19. if (jedis != null) jedis.close();
  20. }
  21. return false;
  22. }
  23. public void removeLock(String key) {
  24. Jedis jedis = null;
  25. try {
  26. jedis = jedisSentinelPool.getResource();
  27. jedis.del(keyPrefix + key);
  28. } finally {
  29. if (jedis != null) jedis.close();
  30. }
  31. }
  32. }

2、消息消费

1)封装接口

  1. public interface RedisHandler {
  2. /**
  3. * 队列名称
  4. */
  5. String queueName();
  6. /**
  7. * 队列消息内容
  8. */
  9. String consume (String msgBody);
  10. }

2)接口实现

  1. @Component
  2. public class LogAListen implements RedisHandler {
  3. private static final Logger LOG = LoggerFactory.getLogger(LogAListen.class) ;
  4. @Resource
  5. private RedisLock redisLock;
  6. @Override
  7. public String queueName() {
  8. return "LogA-key";
  9. }
  10. @Override
  11. public String consume(String msgBody) {
  12. // 加锁,防止消息重复投递
  13. String lockKey = "lock-order-uuid-A";
  14. boolean lock = false;
  15. try {
  16. lock = redisLock.addLock(lockKey, 60);
  17. if (!lock) {
  18. return "success";
  19. }
  20. LOG.info("LogA-key == >>" + msgBody);
  21. } catch (Exception e){
  22. e.printStackTrace();
  23. } finally {
  24. if (lock) {
  25. redisLock.removeLock(lockKey);
  26. }
  27. }
  28. return "success";
  29. }
  30. }

3、消息监听器

  1. public class RedisListener implements InitializingBean {
  2. /**
  3. * Redis 集群
  4. */
  5. @Resource
  6. private JedisSentinelPool jedisSentinelPool;
  7. private List<RedisHandler> handlers = null;
  8. private ExecutorService product = null;
  9. private ExecutorService consumer = null;
  10. /**
  11. * 初始化配置
  12. */
  13. @Override
  14. public void afterPropertiesSet() {
  15. handlers = SpringUtil.getBeans(RedisHandler.class) ;
  16. product = new ThreadPoolExecutor(10,15,60 * 3,
  17. TimeUnit.SECONDS,new SynchronousQueue<>());
  18. consumer = new ThreadPoolExecutor(10,15,60 * 3,
  19. TimeUnit.SECONDS,new SynchronousQueue<>());
  20. for (RedisHandler redisHandler : handlers){
  21. product.execute(() -> {
  22. redisTask(redisHandler);
  23. });
  24. }
  25. }
  26. /**
  27. * 队列监听
  28. */
  29. public void redisTask (RedisHandler redisHandler){
  30. Jedis jedis = null ;
  31. while (true){
  32. try {
  33. jedis = jedisSentinelPool.getResource() ;
  34. List<String> msgBodyList = jedis.brpop(0, redisHandler.queueName());
  35. if (msgBodyList != null && msgBodyList.size()>0){
  36. consumer.execute(() -> {
  37. redisHandler.consume(msgBodyList.get(1)) ;
  38. });
  39. }
  40. } catch (Exception e){
  41. e.printStackTrace();
  42. } finally {
  43. if (jedis != null) jedis.close();
  44. }
  45. }
  46. }
  47. }

4、消息生产者

  1. @Service
  2. public class RedisServiceImpl implements RedisService {
  3. @Resource
  4. private JedisSentinelPool jedisSentinelPool;
  5. @Override
  6. public void saveQueue(String queueKey, String msgBody) {
  7. Jedis jedis = null;
  8. try {
  9. jedis = jedisSentinelPool.getResource();
  10. jedis.lpush(queueKey,msgBody) ;
  11. } catch (Exception e){
  12. e.printStackTrace();
  13. } finally {
  14. if (jedis != null) jedis.close();
  15. }
  16. }
  17. }

5、场景测试接口

  1. @RestController
  2. public class RedisController {
  3. @Resource
  4. private RedisService redisService ;
  5. /**
  6. * 队列推消息
  7. */
  8. @RequestMapping("/saveQueue")
  9. public String saveQueue (){
  10. MsgBody msgBody = new MsgBody() ;
  11. msgBody.setName("LogAModel");
  12. msgBody.setDesc("描述");
  13. msgBody.setCreateTime(new Date());
  14. redisService.saveQueue("LogA-key", JSONObject.toJSONString(msgBody));
  15. return "success" ;
  16. }
  17. }

四、源代码地址

  1. GitHub地址:知了一笑
  2. https://github.com/cicadasmile/middle-ware-parent
  3. 码云地址:知了一笑
  4. https://gitee.com/cicadasmile/middle-ware-parent

原网址: 访问
创建于: 2021-02-04 17:08:19
目录: default
标签: 无

请先后发表评论
  • 最新评论
  • 总共0条评论