ZK客户端curator的基本使用 发表于 2021-07-03 | 分类于 zookeeper | 热度: ℃ 字数统计: 1,373 | 阅读时长 ≈ 7 Curator的使用123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200/** * curator zk客户端api操作demo */public class CuratorApiDemo { public static void main(String[] args) throws Exception{ // zk地址 如果是集群 多个节点地址逗号隔开 String address = "127.0.0.1:2181"; // 重试策略 连接不上服务端的时候 会重试 重试间隔递增 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3); CuratorFramework client = CuratorFrameworkFactory.newClient(address, retryPolicy); // 启动zk客户端 client.start();// createZNode(client);// testBackGroundCallback(client);// simpleWatcher(client); watcherWithCache(client); new CountDownLatch(1).await(); } /** * curator 提供了三种cache包装了watcher机制 内部封装了收到watcher通知之后再次进行watcher事件注册的逻辑 对编程很友好 * @param client * @throws Exception */ static void watcherWithCache(CuratorFramework client) throws Exception{ // 创建NodeCache,监听的是"/user"这个节点 NodeCache nodeCache = new NodeCache(client, "/user"); // start()方法有个boolean类型的参数,默认是false。如果设置为true, // 那么NodeCache在第一次启动的时候就会立刻从ZooKeeper上读取对应节点的 // 数据内容,并保存在Cache中。 nodeCache.start(true); if (nodeCache.getCurrentData() != null) { System.out.println("NodeCache节点初始化数据为:" + new String(nodeCache.getCurrentData().getData())); } else { System.out.println("NodeCache节点数据为空"); } // 添加监听器 nodeCache.getListenable().addListener(() -> { String data = new String(nodeCache.getCurrentData().getData()); System.out.println("NodeCache节点路径:" + nodeCache.getCurrentData().getPath() + ",节点数据为:" + data); }); // 创建PathChildrenCache实例,监听的是"user"这个节点 PathChildrenCache childrenCache = new PathChildrenCache(client, "/user", true); // StartMode指定的初始化的模式 // NORMAL:普通异步初始化 // BUILD_INITIAL_CACHE:同步初始化 // POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件 childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE); // childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); // childrenCache.start(PathChildrenCache.StartMode.NORMAL); List<ChildData> children = childrenCache.getCurrentData(); System.out.println("获取子节点列表:"); // 如果是BUILD_INITIAL_CACHE可以获取这个数据,如果不是就不行 children.forEach(childData -> { System.out.println(new String(childData.getData())); }); childrenCache.getListenable().addListener(((client1, event) -> { System.out.println(LocalDateTime.now() + " " + event.getType()); if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) { System.out.println("PathChildrenCache:子节点初始化成功..."); } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) { String path = event.getData().getPath(); System.out.println("PathChildrenCache添加子节点:" + event.getData().getPath()); if (event.getData()!= null && event.getData() != null) { System.out.println("PathChildrenCache子节点数据:" + new String(event.getData().getData())); } } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) { System.out.println("PathChildrenCache删除子节点:" + event.getData().getPath()); } else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) { System.out.println("PathChildrenCache修改子节点路径:" + event.getData().getPath()); System.out.println("PathChildrenCache修改子节点数据:" + new String(event.getData().getData())); } })); // 创建TreeCache实例监听"user"节点 TreeCache cache = TreeCache.newBuilder(client, "/user").setCacheData(false).build(); cache.getListenable().addListener((c, event) -> { if (event.getData() != null) { System.out.println("TreeCache,type=" + event.getType() + " path=" + event.getData().getPath()); } else { System.out.println("TreeCache,type=" + event.getType()); } }); cache.start(); } /** * 简单的watcher监听机制 * @param client * @throws Exception */ static void simpleWatcher(CuratorFramework client) throws Exception{ client.create().withMode(CreateMode.PERSISTENT).forPath("/test-children-watch"); client.getChildren().usingWatcher(new CuratorWatcher() { // 为getChildren目录节点的子节点去创建一个watcher 注意 watcher只会被触发一次 @Override public void process(WatchedEvent watchedEvent) throws Exception { // type NodeChildrenChanged System.out.println("收到监听回调事件, type: " + watchedEvent.getType()); System.out.println("回调事件的path: " + watchedEvent.getPath()); } }).forPath("/test-children-watch"); // 去添加children 来触发watcher回调 client.create().withMode(CreateMode.EPHEMERAL).forPath("/test-children-watch/child0"); // watcher是一次性的 不再去注册 不会再收到通知 client.create().withMode(CreateMode.EPHEMERAL).forPath("/test-children-watch/child1"); } static void testBackGroundCallback(CuratorFramework client) throws Exception{ // backGround是异步处理机制 搭配CuratorListener监听器使用 // 添加一个CuratorListener 来处理backGround后台处理之后的回调 client.getCuratorListenable().addListener(new CuratorListener() { @Override public void eventReceived(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception { // 针对不通过的事件去处理 String path = curatorEvent.getPath(); System.out.println("当前线程名称:" + Thread.currentThread().getName()); switch (curatorEvent.getType()) { case CREATE: System.out.println("节点创建,path: "+ path); break; case EXISTS: System.out.println("检查节点是否存在。path:" + path); break; case DELETE: System.out.println("delete节点。path: " + path); break; case GET_DATA: System.out.println("执行getdata path:" + path + " 数据:" + new String(curatorEvent.getData())); break; case SET_DATA: System.out.println("执行setData path: " + path); break; case CHILDREN: System.out.println("执行children path : "+ path); break; default: } } }); // inBackGround操作 都是在上边的回调处理 client.create().withMode(CreateMode.PERSISTENT).inBackground().forPath("/curator-background-api", "test".getBytes(StandardCharsets.UTF_8)); client.checkExists().inBackground().forPath("/curator-background-api"); client.getData().inBackground().forPath("/curator-background-api"); client.setData().inBackground().forPath("/curator-background-api", "test2".getBytes(StandardCharsets.UTF_8)); } /** * 创建节点 * @param client */ static void createZNode(CuratorFramework client) { try { String path = client.create().withMode(CreateMode.PERSISTENT).forPath("/curator-demo", "test".getBytes(StandardCharsets.UTF_8)); System.out.println("创建的持久化节点path是:" + path); // 检查一个节点是否存在 返回节点的stat信息 Stat stat = client.checkExists().forPath("/curator-demo"); Stat stat1 = client.checkExists().forPath("/curator-demo1"); System.out.println("节点信息" + stat); System.out.println("不存在的节点 checkExists方法返回值" + stat1); // 查询节点存储的内容 byte[] bytes = client.getData().forPath("/curator-demo"); System.out.println("节点内容:" + new String(bytes)); // 设置节点存储的内容 client.setData().forPath("/curator-demo", "testChanged".getBytes(StandardCharsets.UTF_8)); // 在子目录下创建多个临时顺序节点 for (int i =0; i < 5 ; i++) { client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator-demo/child-"); } // 获取所有子节点 List<String> strings = client.getChildren().forPath("/curator-demo"); strings.stream().forEach(System.out::println); // 删除指定的节点 级联删除 client.delete().deletingChildrenIfNeeded().forPath("/curator-demo"); } catch (Exception e) { } }} -------------本文结束感谢您的阅读------------- 本文标题:ZK客户端curator的基本使用 文章作者:夸克 发布时间:2021年07月03日 - 07:07 最后更新:2022年07月01日 - 07:07 原始链接:https://zhanglijun1217.github.io/2021/07/03/ZK客户端curator的基本使用/ 许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。