写在前面
生产上基本都使用Curator客户端去操作zookeeper,zookeeper原始API太底层了,自己封装的比较便利还是比较难的
完全出于自己想实现一下同步锁才有了这篇文章,文章中缺少了一块很重要的步骤 “创建根节点 ”,如果直接去拿文章中的代码去操作,报出来的错误就是 root/lock节点不存在。这里都体现了Curator API的实用性了,他们有一个
creatingParentContainersIfNeeded的API 去创建父节点如果不存在的话
环境
jdk 1.7
SpringBoot 1.5.10
引用包
<dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-framework</artifactId> <version>2.12.0</version> </dependency> <dependency> <groupId>org.apache.curator</groupId> <artifactId>curator-recipes</artifactId> <version>2.12.0</version> </dependency> <!-- https://mvnrepository.com/artifact/com.google.guava/guava --> <dependency> <groupId>com.google.guava</groupId> <artifactId>guava</artifactId> <version>16.0.1</version> </dependency> <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson --> <dependency> <groupId>com.alibaba</groupId> <artifactId>fastjson</artifactId> <version>1.2.19</version> </dependency> <dependency> <groupId>org.apache.zookeeper</groupId> <artifactId>zookeeper</artifactId> <exclusions> <exclusion> <groupId>org.slf4j</groupId> <artifactId>slf4j-log4j12</artifactId> </exclusion> </exclusions> <version>3.4.9</version> </dependency>
主体思想
利用zookeeper的临时有序节点和watch监控机制 和采用CLH方式获取锁
步骤
1.生成该次操作的临时节点
2.获取锁节点下所有临时子节点
3.升序排序(第一个节点为最小节点)
4.判断是否为本节点,是则获取锁
5.否则开启监控前一个子节点状态 直到前一个子节点触发删除事件 唤醒该节点线程
Talk is cheap, show you the code
Zookeeper基础方法类
public class ZooKeeperServiceImpl implements ZooKeeperService { private static final String URL = "127.0.0.1"; private static final Integer PORT = 2181; private static final String ROOT = "/root"; public static ZooKeeper getClient() throws IOException, InterruptedException { final CountDownLatch latch = new CountDownLatch(1); ZooKeeper zk = new ZooKeeper(URL, 5000, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getState() == Event.KeeperState.SyncConnected) { latch.countDown(); } } }); latch.await(); return zk; } @Override public String createNode(ZooKeeper zk, String node, String data, CreateMode mode) throws Exception { return createNode(zk, node, data.getBytes(), mode); } @Override public String createNode(ZooKeeper zk, String node, byte[] data, CreateMode mode) throws Exception { return zk.create(node, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode); } @Override public void removeNode(ZooKeeper zk, String node) throws Exception { zk.delete(node, -1); } @Override public List<String> getChildren(ZooKeeper zk, String path, Watcher watcher) throws Exception { List<String> list = zk.getChildren(path, watcher); return list; } @Override public Stat exists(ZooKeeper zk, String path, Watcher watcher) throws Exception { Stat stat = zk.exists(path, watcher); return stat; }}
分布式锁实现类
public class DistributeLock implements Lock { private Logger logger = LoggerFactory.getLogger(DistributeLock.class); private CountDownLatch latch; private ZooKeeper zk = null; private static String ROOT = "/root"; private ZooKeeperService zooKeeperService; private static String LOCK_ROOT = "/lock"; private static String SEPARATOR = "_"; //路径 private static String PATH_SEPARATOR = "/"; private static String LOCK_KEYWORD = "lock"; private String lockResource; private String lockPrefix; private String lockWholePath; private Map<Thread, String> lockMap = Maps.newConcurrentMap(); public DistributeLock(String lockResource) throws IOException, InterruptedException { zk = ZooKeeperServiceImpl.getClient(); zooKeeperService = SpringContextHolder.getBean(ZooKeeperService.class); this.lockResource = lockResource; if (! lockResource.startsWith(PATH_SEPARATOR)) { lockWholePath = ROOT + LOCK_ROOT + PATH_SEPARATOR + this.lockResource; } else { lockWholePath = ROOT + LOCK_ROOT + this.lockResource; } lockPrefix = lockWholePath + PATH_SEPARATOR + LOCK_KEYWORD + SEPARATOR; } @Override public void lock() { logger.info("thread" + Thread.currentThread().getName() + " try acquire lock..."); if (tryLock()) { logger.info("thread" + Thread.currentThread().getName() + " acquire lock success..."); } else { try { waitForLock(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } private void waitForLock() throws Exception { String currentPath = lockMap.get(Thread.currentThread()); List<String> children = zooKeeperService.getChildren(zk, lockWholePath, null); if (children == null) { logger.error("zookeeper acquire children failure, please check the connection."); zooKeeperService.removeNode(zk, currentPath); return; } if (children.size() == 1) { return; } //升序排序 Collections.sort(children); String beforePath = null; logger.info("children : {}", JSON.toJSONString(children)); for (String var : children) { String tmpVar = lockWholePath+PATH_SEPARATOR+var; if (tmpVar.equals(currentPath)) { break; } beforePath = tmpVar; } logger.info("currentPath : {}, beforePath : {}", currentPath, beforePath); if (beforePath == null) { return; } final CountDownLatch latch = new CountDownLatch(1); zooKeeperService.exists(zk, beforePath, new Watcher() { @Override public void process(WatchedEvent event) { if (event.getType() == Event.EventType.NodeDeleted) { latch.countDown(); } } }); lockMap.put(Thread.currentThread(), currentPath); latch.await(); } @Override public void lockInterruptibly() throws InterruptedException { } @Override public boolean tryLock() { try { String currentPath = zooKeeperService.createNode(zk, lockPrefix, new byte[0], CreateMode.EPHEMERAL_SEQUENTIAL); lockMap.put(Thread.currentThread(), currentPath); List<String> children = zooKeeperService.getChildren(zk, lockWholePath, null); if (children == null) { logger.error("zookeeper acquire children failure, please check the connection."); zooKeeperService.removeNode(zk, currentPath); return false; } if (children.size() == 1) { return true; } Collections.sort(children); String tmpVar = lockWholePath+PATH_SEPARATOR+children.get(0); if (tmpVar.equals(currentPath)) { return true; } return false; } catch (Exception e) { logger.error(e.getMessage(), e); } return false; } @Override public boolean tryLock(long time, TimeUnit unit) throws InterruptedException { return false; } @Override public void unlock() { try { String currentPath = lockMap.get(Thread.currentThread()); if (! StringUtils.isEmpty(currentPath)) { zooKeeperService.removeNode(zk, lockMap.get(Thread.currentThread())); } } catch (Exception e) { logger.error(e.getMessage(), e); } } @Override public Condition newCondition() { return null; }
Spring容器工具类
public class SpringContextHolder implements ApplicationContextAware { private static ApplicationContext context; @Override public void setApplicationContext(ApplicationContext applicationContext) throws BeansException { this.context = applicationContext; } public static <T> T getBean(Class<T> clazz) { return context.getBean(clazz); }}
测试类:
public class DistributeLockTest extends AbstractServiceTest { private Logger logger = LoggerFactory.getLogger(DistributeLockTest.class); private static int num = 10; @Test public void testDistributeLock() throws IOException, InterruptedException { final CountDownLatch latch = new CountDownLatch(num); Runnable runnable = new Runnable() { @Override public void run() { try { DistributeLock lock = new DistributeLock("testlock"); lock.lock(); try { System.out.println("thread : " + Thread.currentThread().getName() + " do something"); Thread.sleep(5000); latch.countDown(); } finally { lock.unlock(); } } catch (IOException e) { logger.error(e.getMessage(), e); } catch (InterruptedException e) { logger.error(e.getMessage(), e); } } }; for (int i = 0; i < num; i++) { Thread t = new Thread(runnable); t.start(); } latch.await(); }}
日志:
原网址: 访问
创建于: 2020-12-03 16:45:15
目录: default
标签: 无
未标明原创文章均为采集,版权归作者所有,转载无需和我联系,请注明原出处,南摩阿彌陀佛,知识,不只知道,要得到
最新评论