本文源码:GitHub·点这里 || GitEE·点这里
Redis的分布式解决方案,在3.0版本后推出的方案,有效地解决了Redis分布式的需求,当一个服务宕机可以快速的切换到另外一个服务。redis cluster主要是针对海量数据+高并发+高可用的场景。
<dependency><groupId>org.springframework.boot</groupId><artifactId>spring-boot-starter-data-redis</artifactId><version>${spring-boot.version}</version></dependency><dependency><groupId>redis.clients</groupId><artifactId>jedis</artifactId><version>${redis-client.version}</version></dependency>spring:# Redis 集群redis:sentinel:# sentinel 配置master: mymasternodes: 192.168.0.127:26379maxTotal: 60minIdle: 10maxWaitMillis: 10000testWhileIdle: truetestOnBorrow: truetestOnReturn: falsetimeBetweenEvictionRunsMillis: 10000@ConfigurationProperties(prefix = "spring.redis.sentinel")public class RedisParam {private String nodes ;private String master ;private Integer maxTotal ;private Integer minIdle ;private Integer maxWaitMillis ;private Integer timeBetweenEvictionRunsMillis ;private boolean testWhileIdle ;private boolean testOnBorrow ;private boolean testOnReturn ;// 省略GET和SET方法}@Configuration@EnableConfigurationProperties(RedisParam.class)public class RedisPool {@Resourceprivate RedisParam redisParam ;@Bean("jedisSentinelPool")public JedisSentinelPool getRedisPool (){Set<String> sentinels = new HashSet<>();sentinels.addAll(Arrays.asList(redisParam.getNodes().split(",")));GenericObjectPoolConfig poolConfig = new GenericObjectPoolConfig();poolConfig.setMaxTotal(redisParam.getMaxTotal());poolConfig.setMinIdle(redisParam.getMinIdle());poolConfig.setMaxWaitMillis(redisParam.getMaxWaitMillis());poolConfig.setTestWhileIdle(redisParam.isTestWhileIdle());poolConfig.setTestOnBorrow(redisParam.isTestOnBorrow());poolConfig.setTestOnReturn(redisParam.isTestOnReturn());poolConfig.setTimeBetweenEvictionRunsMillis(redisParam.getTimeBetweenEvictionRunsMillis());JedisSentinelPool redisPool = new JedisSentinelPool(redisParam.getMaster(), sentinels, poolConfig);return redisPool;}@BeanSpringUtil springUtil() {return new SpringUtil();}@BeanRedisListener redisListener() {return new RedisListener();}}@Configurationpublic class RedisConfig {@Beanpublic StringRedisTemplate stringRedisTemplate(RedisConnectionFactory factory) {StringRedisTemplate stringRedisTemplate = new StringRedisTemplate();stringRedisTemplate.setConnectionFactory(factory);return stringRedisTemplate;}}生产者消费者模式:客户端监听消息队列,消息达到,消费者马上消费,如果消息队列里面没有消息,那么消费者就继续监听。基于Redis的LPUSH(BLPUSH)把消息入队,用 RPOP(BRPOP)获取消息的模式。
@Componentpublic class RedisLock {private static String keyPrefix = "RedisLock:";@Resourceprivate JedisSentinelPool jedisSentinelPool;public boolean addLock(String key, long expire) {Jedis jedis = null;try {jedis = jedisSentinelPool.getResource();/** nxxx的值只能取NX或者XX,如果取NX,则只有当key不存在是才进行set,如果取XX,则只有当key已经存在时才进行set* expx的值只能取EX或者PX,代表数据过期时间的单位,EX代表秒,PX代表毫秒。*/String value = jedis.set(keyPrefix + key, "1", "nx", "ex", expire);return value != null;} catch (Exception e){e.printStackTrace();}finally {if (jedis != null) jedis.close();}return false;}public void removeLock(String key) {Jedis jedis = null;try {jedis = jedisSentinelPool.getResource();jedis.del(keyPrefix + key);} finally {if (jedis != null) jedis.close();}}}1)封装接口
public interface RedisHandler {/*** 队列名称*/String queueName();/*** 队列消息内容*/String consume (String msgBody);}2)接口实现
@Componentpublic class LogAListen implements RedisHandler {private static final Logger LOG = LoggerFactory.getLogger(LogAListen.class) ;@Resourceprivate RedisLock redisLock;@Overridepublic String queueName() {return "LogA-key";}@Overridepublic String consume(String msgBody) {// 加锁,防止消息重复投递String lockKey = "lock-order-uuid-A";boolean lock = false;try {lock = redisLock.addLock(lockKey, 60);if (!lock) {return "success";}LOG.info("LogA-key == >>" + msgBody);} catch (Exception e){e.printStackTrace();} finally {if (lock) {redisLock.removeLock(lockKey);}}return "success";}}public class RedisListener implements InitializingBean {/*** Redis 集群*/@Resourceprivate JedisSentinelPool jedisSentinelPool;private List<RedisHandler> handlers = null;private ExecutorService product = null;private ExecutorService consumer = null;/*** 初始化配置*/@Overridepublic void afterPropertiesSet() {handlers = SpringUtil.getBeans(RedisHandler.class) ;product = new ThreadPoolExecutor(10,15,60 * 3,TimeUnit.SECONDS,new SynchronousQueue<>());consumer = new ThreadPoolExecutor(10,15,60 * 3,TimeUnit.SECONDS,new SynchronousQueue<>());for (RedisHandler redisHandler : handlers){product.execute(() -> {redisTask(redisHandler);});}}/*** 队列监听*/public void redisTask (RedisHandler redisHandler){Jedis jedis = null ;while (true){try {jedis = jedisSentinelPool.getResource() ;List<String> msgBodyList = jedis.brpop(0, redisHandler.queueName());if (msgBodyList != null && msgBodyList.size()>0){consumer.execute(() -> {redisHandler.consume(msgBodyList.get(1)) ;});}} catch (Exception e){e.printStackTrace();} finally {if (jedis != null) jedis.close();}}}}@Servicepublic class RedisServiceImpl implements RedisService {@Resourceprivate JedisSentinelPool jedisSentinelPool;@Overridepublic void saveQueue(String queueKey, String msgBody) {Jedis jedis = null;try {jedis = jedisSentinelPool.getResource();jedis.lpush(queueKey,msgBody) ;} catch (Exception e){e.printStackTrace();} finally {if (jedis != null) jedis.close();}}}@RestControllerpublic class RedisController {@Resourceprivate RedisService redisService ;/*** 队列推消息*/@RequestMapping("/saveQueue")public String saveQueue (){MsgBody msgBody = new MsgBody() ;msgBody.setName("LogAModel");msgBody.setDesc("描述");msgBody.setCreateTime(new Date());redisService.saveQueue("LogA-key", JSONObject.toJSONString(msgBody));return "success" ;}}GitHub地址:知了一笑https://github.com/cicadasmile/middle-ware-parent码云地址:知了一笑https://gitee.com/cicadasmile/middle-ware-parent原网址: 访问
创建于: 2021-02-04 17:08:19
目录: default
标签: 无
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论