Loading... # zookeeper配置中心与分布式锁的简单实现 ## 配置中心 分布式系统的配置保存在一个公共区域,当配置被修改或删除的时候通知所有人。 **配置信息实体类** ``` public class MyConf { private String conf; public String getConf() { return conf; } public void setConf(String conf) { this.conf = conf; } } ``` **zk连接工具类** ``` public class ZKUtils { private static ZooKeeper zk; private static final String address = "192.168.106.3:2181,192.168.106.4:2181,192.168.106.5:2181,192.168.106.6:2181/testConf"; private static CountDownLatch latch = new CountDownLatch(1); static { DefaultWatch watch = new DefaultWatch(); watch.setLatch(latch); try { zk = new ZooKeeper(address, 1000, watch); latch.await(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } } public static ZooKeeper getZK() { return zk; } } ``` **连接事件监听类** ``` public class DefaultWatch implements Watcher { private CountDownLatch latch; @Override public void process(WatchedEvent watchedEvent) { System.out.println(watchedEvent); switch (watchedEvent.getState()) { case Unknown: break; case Disconnected: break; case NoSyncConnected: break; case SyncConnected: latch.countDown(); break; case AuthFailed: break; case ConnectedReadOnly: break; case SaslAuthenticated: break; case Expired: break; case Closed: break; } } public CountDownLatch getLatch() { return latch; } public void setLatch(CountDownLatch latch) { this.latch = latch; } } ``` **配置监听** ``` public class WatchCallBack implements Watcher, AsyncCallback.StatCallback, AsyncCallback.DataCallback { private ZooKeeper zk; private MyConf conf; private CountDownLatch latch = new CountDownLatch(1); public ZooKeeper getZk() { return zk; } public void setZk(ZooKeeper zk) { this.zk = zk; } public MyConf getConf() { return conf; } public void setConf(MyConf conf) { this.conf = conf; } @Override public void process(WatchedEvent event) { switch (event.getType()) { case None: break; case NodeCreated: zk.getData("/AppConf", this, this, "abc"); break; case NodeDeleted: //容忍性 conf.setConf(""); latch = new CountDownLatch(1); break; case NodeDataChanged: zk.getData("/AppConf", this, this, "abc"); break; case NodeChildrenChanged: break; case DataWatchRemoved: break; case ChildWatchRemoved: break; } } @Override //状态码 路径 上线文 数据 状态 public void processResult(int rc, String path, Object ctx, byte[] data, Stat stat) { if (data != null) { String s = new String(data); conf.setConf(s); latch.countDown(); } } @Override public void processResult(int rc, String path, Object ctx, Stat stat) { if (stat != null) { zk.getData("/AppConf", this, this, "abc"); } } public void await() { zk.exists("/AppConf", this, this, "ABC"); try { latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } } ``` 测试类 ``` public class TestConfig { ZooKeeper zk; @Before public void conn() { zk = ZKUtils.getZK(); } @After public void close() { try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } @Test public void getConf() { WatchCallBack watchCallBack = new WatchCallBack(); watchCallBack.setZk(zk); MyConf myConf = new MyConf(); watchCallBack.setConf(myConf); watchCallBack.await(); //1.节点不存在 //2.节点存在 while (true) { if (myConf.getConf().isEmpty()) { System.out.println("conf was lose......"); watchCallBack.await(); } else { System.out.println(myConf.getConf()); } try { TimeUnit.MILLISECONDS.sleep(200); } catch (InterruptedException e) { e.printStackTrace(); } } } } ``` 在zookeeper中创建/testConf节点,启动程序,尝试创建/testConf/AppConf,并set或修改/testConf/AppConf上的数据。 ## 分布式锁 zookeeper实现分布式锁,只需要创建一个临时节点,成功创建的人就是获得锁的人,就算这个获得锁的进程挂了,因为临时节点与session挂钩,进程挂了自然session消失,临时节点也就被删除了,其它人依然可以获得锁。为什么要有分布式锁?有些方法我们需要顺序执行,单机可以用jvm的锁,但是分布式环境下,jvm只能锁自己,因此需要将锁提出来放到一个公共的地方。 1. 所有节点争抢锁,只有一个人获得锁 2. 获得锁的人执行完,释放锁 锁被释放怎么通知其它人? - 主动轮询、心跳 - 弊端:有心跳延迟,zookeeper压力大 - watch 解决心跳延迟问题 - 弊端:zk需要通知所有其他客户端,所有其他客户端再访问zk抢锁,通信压力大 - sequential+watch:每个人都可以成功创建节点,但是序号不同,然后所有人watch自己的前一个,序号最小的获得锁,一旦最小的释放,它后边的马上监听到并获得锁。成本:zk只给后边的一个发事件回调(公平锁) **zk连接工具类** ``` public class ZKUtils { private static ZooKeeper zk; private static final String address = "192.168.106.3:2181,192.168.106.4:2181,192.168.106.5:2181,192.168.106.6:2181/testLock"; private static CountDownLatch latch = new CountDownLatch(1); static { DefaultWatch watch = new DefaultWatch(); watch.setLatch(latch); try { zk = new ZooKeeper(address, 1000, watch); latch.await(); } catch (IOException | InterruptedException e) { e.printStackTrace(); } } public static ZooKeeper getZK() { return zk; } } ``` **DefaultWatch复用配置中心的DefaultWatch** **时间监听及回调** ``` public class WatchCallBack implements Watcher, AsyncCallback.StringCallback, AsyncCallback.Children2Callback, AsyncCallback.StatCallback { private ZooKeeper zooKeeper; private String threadName; private CountDownLatch latch = new CountDownLatch(1); private String pathName; public String getPathName() { return pathName; } public void setPathName(String pathName) { this.pathName = pathName; } public String getThreadName() { return threadName; } public void setThreadName(String threadName) { this.threadName = threadName; } public ZooKeeper getZooKeeper() { return zooKeeper; } public void setZooKeeper(ZooKeeper zooKeeper) { this.zooKeeper = zooKeeper; } public void tryLock() { try { // if(zooKeeper.getData("/")) zooKeeper.create("/lock", threadName.getBytes(), ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, this, "abc"); latch.await(); } catch (InterruptedException e) { e.printStackTrace(); } } public void unLock() { try { zooKeeper.delete(pathName, -1); System.out.println(threadName + " over work"); } catch (InterruptedException | KeeperException e) { e.printStackTrace(); } } @Override public void process(WatchedEvent event) { //如果第一个节点挂了,那么只有第二个会收到通知 //如果不是第一个的某一个节点n1挂了,也会造成后边的n2收到通知,从而让n2去watch n1前边的节点 switch (event.getType()) { case None: break; case NodeCreated: break; case NodeDeleted: zooKeeper.getChildren("/", false, this, "abc"); break; case NodeDataChanged: break; case NodeChildrenChanged: break; case DataWatchRemoved: break; case ChildWatchRemoved: break; } } @Override public void processResult(int rc, String path, Object ctx, String name) { if (name != null) { System.out.println(threadName + "create node: " + name); pathName = name; //不需要watch 因为不关心锁目录的事件,只关注回调中取得的前一个节点的事件 zooKeeper.getChildren("/", false, this, "abc"); } } //getChildren callback @Override public void processResult(int rc, String path, Object ctx, List<String> children, Stat stat) { //一定能看到自己前边的 System.out.println(threadName + "look locks...."); // for (String child : children) { // System.out.println(child); // } children = children.stream().sorted(String::compareTo).collect(Collectors.toList()); int i = children.indexOf(pathName.substring(1)); if (i == 0) { System.out.println(threadName + " i am first....."); latch.countDown(); } else { //必须要callback 因为未必能成功,判定的一瞬间 前面的释放了 就会导致watch监控失败 zooKeeper.exists("/" + children.get(i - 1), this, this, "abc"); } } //exists callback @Override public void processResult(int i, String s, Object o, Stat stat) { //exists的一瞬间,前边的释放的watch失败的情况处理 if (stat == null) { } } } ``` **测试类** ``` public class TestLock { ZooKeeper zk; @Before public void conn() { zk = ZKUtils.getZK(); } @After public void close() { try { zk.close(); } catch (InterruptedException e) { e.printStackTrace(); } } @Test public void lock() { for (int i = 0; i < 10; i++) { new Thread(() -> { WatchCallBack watchCallBack = new WatchCallBack(); watchCallBack.setZooKeeper(zk); String threadName = Thread.currentThread().getName(); watchCallBack.setThreadName(threadName); //每一个线程: //抢锁 watchCallBack.tryLock(); //干活 System.out.println(threadName + "working..."); try { TimeUnit.SECONDS.sleep(1); } catch (InterruptedException e) { e.printStackTrace(); } //释放锁 watchCallBack.unLock(); }).start(); } while (true) ; } } ``` Last modification:June 11th, 2020 at 06:10 pm © 允许规范转载