zookeeper 同步锁实现_u012210451的专栏-CSDN博客 - 原生封装

写在前面

生产上基本都使用Curator客户端去操作zookeeper,zookeeper原始API太底层了,自己封装的比较便利还是比较难的

完全出于自己想实现一下同步锁才有了这篇文章,文章中缺少了一块很重要的步骤  “创建根节点 ”,如果直接去拿文章中的代码去操作,报出来的错误就是 root/lock节点不存在。这里都体现了Curator API的实用性了,他们有一个

creatingParentContainersIfNeeded的API 去创建父节点如果不存在的话

环境

jdk 1.7

SpringBoot 1.5.10

引用包

        <dependency>            <groupId>org.apache.curator</groupId>            <artifactId>curator-framework</artifactId>            <version>2.12.0</version>        </dependency>        <dependency>            <groupId>org.apache.curator</groupId>            <artifactId>curator-recipes</artifactId>            <version>2.12.0</version>        </dependency>        <!-- https://mvnrepository.com/artifact/com.google.guava/guava -->        <dependency>            <groupId>com.google.guava</groupId>            <artifactId>guava</artifactId>            <version>16.0.1</version>        </dependency>        <!-- https://mvnrepository.com/artifact/com.alibaba/fastjson -->        <dependency>            <groupId>com.alibaba</groupId>            <artifactId>fastjson</artifactId>            <version>1.2.19</version>        </dependency>        <dependency>            <groupId>org.apache.zookeeper</groupId>            <artifactId>zookeeper</artifactId>            <exclusions>                <exclusion>                    <groupId>org.slf4j</groupId>                    <artifactId>slf4j-log4j12</artifactId>                </exclusion>            </exclusions>            <version>3.4.9</version>        </dependency>

主体思想

利用zookeeper的临时有序节点和watch监控机制 和采用CLH方式获取锁

步骤

1.生成该次操作的临时节点

2.获取锁节点下所有临时子节点

3.升序排序(第一个节点为最小节点)

4.判断是否为本节点,是则获取锁

5.否则开启监控前一个子节点状态 直到前一个子节点触发删除事件 唤醒该节点线程

Talk is cheap, show you the code

Zookeeper基础方法类

public class ZooKeeperServiceImpl implements ZooKeeperService {     private static final String URL = "127.0.0.1";    private static final Integer PORT = 2181;     private static final String ROOT = "/root";     public static ZooKeeper getClient() throws IOException, InterruptedException {        final CountDownLatch latch = new CountDownLatch(1);         ZooKeeper zk = new ZooKeeper(URL, 5000, new Watcher() {            @Override            public void process(WatchedEvent event) {                 if (event.getState() == Event.KeeperState.SyncConnected) {                    latch.countDown();                }            }        });        latch.await();        return zk;    }     @Override    public String createNode(ZooKeeper zk, String node, String data, CreateMode mode) throws Exception {        return createNode(zk, node, data.getBytes(), mode);    }     @Override    public String createNode(ZooKeeper zk, String node, byte[] data, CreateMode mode) throws Exception {        return zk.create(node, data, ZooDefs.Ids.OPEN_ACL_UNSAFE, mode);    }     @Override    public void removeNode(ZooKeeper zk, String node) throws Exception {        zk.delete(node, -1);    }     @Override    public List<String> getChildren(ZooKeeper zk, String path, Watcher watcher) throws Exception {        List<String> list = zk.getChildren(path, watcher);        return list;    }     @Override    public Stat exists(ZooKeeper zk, String path, Watcher watcher) throws Exception {        Stat stat = zk.exists(path, watcher);        return stat;    }}

分布式锁实现类

public class DistributeLock implements Lock {     private Logger logger = LoggerFactory.getLogger(DistributeLock.class);     private CountDownLatch latch;     private ZooKeeper zk = null;     private static String ROOT = "/root";     private ZooKeeperService zooKeeperService;     private static String LOCK_ROOT = "/lock";     private static String SEPARATOR = "_";    //路径    private static String PATH_SEPARATOR = "/";     private static String LOCK_KEYWORD = "lock";     private String lockResource;     private String lockPrefix;     private String lockWholePath;     private Map<Thread, String> lockMap = Maps.newConcurrentMap();     public DistributeLock(String lockResource) throws IOException, InterruptedException {        zk = ZooKeeperServiceImpl.getClient();        zooKeeperService = SpringContextHolder.getBean(ZooKeeperService.class);        this.lockResource = lockResource;         if (! lockResource.startsWith(PATH_SEPARATOR)) {            lockWholePath = ROOT + LOCK_ROOT + PATH_SEPARATOR + this.lockResource;        } else {            lockWholePath = ROOT + LOCK_ROOT + this.lockResource;        }        lockPrefix = lockWholePath + PATH_SEPARATOR + LOCK_KEYWORD + SEPARATOR;    }     @Override    public void lock() {        logger.info("thread" + Thread.currentThread().getName() + " try acquire lock...");        if (tryLock()) {            logger.info("thread" + Thread.currentThread().getName() + " acquire lock success...");        } else {            try {                waitForLock();            } catch (Exception e) {                logger.error(e.getMessage(), e);            }        }    }     private void waitForLock() throws Exception {        String currentPath = lockMap.get(Thread.currentThread());         List<String> children = zooKeeperService.getChildren(zk, lockWholePath, null);         if (children == null) {            logger.error("zookeeper acquire children failure, please check the connection.");            zooKeeperService.removeNode(zk, currentPath);            return;        }         if (children.size() == 1) {            return;        }              //升序排序        Collections.sort(children);         String beforePath = null;        logger.info("children : {}", JSON.toJSONString(children));         for (String var : children) {            String tmpVar = lockWholePath+PATH_SEPARATOR+var;             if (tmpVar.equals(currentPath)) {                break;            }            beforePath = tmpVar;        }        logger.info("currentPath : {}, beforePath : {}", currentPath, beforePath);         if (beforePath == null) {            return;        }         final CountDownLatch latch = new CountDownLatch(1);         zooKeeperService.exists(zk, beforePath, new Watcher() {            @Override            public void process(WatchedEvent event) {                 if (event.getType() == Event.EventType.NodeDeleted) {                    latch.countDown();                }            }        });        lockMap.put(Thread.currentThread(), currentPath);        latch.await();    }     @Override    public void lockInterruptibly() throws InterruptedException {     }     @Override    public boolean tryLock() {         try {            String currentPath = zooKeeperService.createNode(zk, lockPrefix, new byte[0], CreateMode.EPHEMERAL_SEQUENTIAL);            lockMap.put(Thread.currentThread(), currentPath);            List<String> children = zooKeeperService.getChildren(zk, lockWholePath, null);             if (children == null) {                logger.error("zookeeper acquire children failure, please check the connection.");                zooKeeperService.removeNode(zk, currentPath);                return false;            }             if (children.size() == 1) {                return true;            }            Collections.sort(children);            String tmpVar = lockWholePath+PATH_SEPARATOR+children.get(0);             if (tmpVar.equals(currentPath)) {                return true;            }             return false;        } catch (Exception e) {            logger.error(e.getMessage(), e);        }         return false;    }     @Override    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {        return false;    }     @Override    public void unlock() {        try {            String currentPath = lockMap.get(Thread.currentThread());             if (! StringUtils.isEmpty(currentPath)) {                zooKeeperService.removeNode(zk, lockMap.get(Thread.currentThread()));            }        } catch (Exception e) {            logger.error(e.getMessage(), e);        }    }     @Override    public Condition newCondition() {        return null;    }

Spring容器工具类

public class SpringContextHolder implements ApplicationContextAware {     private static ApplicationContext context;     @Override    public void setApplicationContext(ApplicationContext applicationContext) throws BeansException {        this.context = applicationContext;    }     public static <T> T getBean(Class<T> clazz) {        return context.getBean(clazz);    }}

测试类:

public class DistributeLockTest extends AbstractServiceTest {     private Logger logger = LoggerFactory.getLogger(DistributeLockTest.class);     private static int num = 10;     @Test    public void testDistributeLock() throws IOException, InterruptedException {        final CountDownLatch latch = new CountDownLatch(num);         Runnable runnable = new Runnable() {            @Override            public void run() {                try {                    DistributeLock lock = new DistributeLock("testlock");                     lock.lock();                    try {                        System.out.println("thread : " + Thread.currentThread().getName() + " do something");                        Thread.sleep(5000);                        latch.countDown();                    } finally {                        lock.unlock();                    }                 } catch (IOException e) {                    logger.error(e.getMessage(), e);                } catch (InterruptedException e) {                    logger.error(e.getMessage(), e);                }             }        };         for (int i = 0; i < num; i++) {            Thread t = new Thread(runnable);            t.start();        }        latch.await();    }}

日志:


原网址: 访问
创建于: 2020-12-03 16:45:15
目录: default
标签: 无

请先后发表评论
  • 最新评论
  • 总共0条评论