Fork me on GitHub

ZK客户端curator的基本使用

Curator的使用

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
/**
* curator zk客户端api操作demo
*/
public class CuratorApiDemo {

public static void main(String[] args) throws Exception{
// zk地址 如果是集群 多个节点地址逗号隔开
String address = "127.0.0.1:2181";
// 重试策略 连接不上服务端的时候 会重试 重试间隔递增
RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);
CuratorFramework client = CuratorFrameworkFactory.newClient(address, retryPolicy);
// 启动zk客户端
client.start();

// createZNode(client);
// testBackGroundCallback(client);
// simpleWatcher(client);
watcherWithCache(client);

new CountDownLatch(1).await();
}


/**
* curator 提供了三种cache包装了watcher机制 内部封装了收到watcher通知之后再次进行watcher事件注册的逻辑 对编程很友好
* @param client
* @throws Exception
*/
static void watcherWithCache(CuratorFramework client) throws Exception{

// 创建NodeCache,监听的是"/user"这个节点
NodeCache nodeCache = new NodeCache(client, "/user");
// start()方法有个boolean类型的参数,默认是false。如果设置为true,
// 那么NodeCache在第一次启动的时候就会立刻从ZooKeeper上读取对应节点的
// 数据内容,并保存在Cache中。
nodeCache.start(true);
if (nodeCache.getCurrentData() != null) {
System.out.println("NodeCache节点初始化数据为:"
+ new String(nodeCache.getCurrentData().getData()));
} else {
System.out.println("NodeCache节点数据为空");
}

// 添加监听器
nodeCache.getListenable().addListener(() -> {
String data = new String(nodeCache.getCurrentData().getData());
System.out.println("NodeCache节点路径:" + nodeCache.getCurrentData().getPath()
+ ",节点数据为:" + data);
});

// 创建PathChildrenCache实例,监听的是"user"这个节点
PathChildrenCache childrenCache = new PathChildrenCache(client, "/user", true);
// StartMode指定的初始化的模式
// NORMAL:普通异步初始化
// BUILD_INITIAL_CACHE:同步初始化
// POST_INITIALIZED_EVENT:异步初始化,初始化之后会触发事件
childrenCache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
// childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT);
// childrenCache.start(PathChildrenCache.StartMode.NORMAL);
List<ChildData> children = childrenCache.getCurrentData();
System.out.println("获取子节点列表:");
// 如果是BUILD_INITIAL_CACHE可以获取这个数据,如果不是就不行
children.forEach(childData -> {
System.out.println(new String(childData.getData()));
});
childrenCache.getListenable().addListener(((client1, event) -> {
System.out.println(LocalDateTime.now() + " " + event.getType());
if (event.getType().equals(PathChildrenCacheEvent.Type.INITIALIZED)) {
System.out.println("PathChildrenCache:子节点初始化成功...");
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_ADDED)) {
String path = event.getData().getPath();
System.out.println("PathChildrenCache添加子节点:" + event.getData().getPath());
if (event.getData()!= null && event.getData() != null) {
System.out.println("PathChildrenCache子节点数据:" + new String(event.getData().getData()));
}
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_REMOVED)) {
System.out.println("PathChildrenCache删除子节点:" + event.getData().getPath());
} else if (event.getType().equals(PathChildrenCacheEvent.Type.CHILD_UPDATED)) {
System.out.println("PathChildrenCache修改子节点路径:" + event.getData().getPath());
System.out.println("PathChildrenCache修改子节点数据:" + new String(event.getData().getData()));
}
}));

// 创建TreeCache实例监听"user"节点
TreeCache cache = TreeCache.newBuilder(client, "/user").setCacheData(false).build();
cache.getListenable().addListener((c, event) -> {
if (event.getData() != null) {
System.out.println("TreeCache,type=" + event.getType() + " path=" + event.getData().getPath());
} else {
System.out.println("TreeCache,type=" + event.getType());
}
});
cache.start();
}

/**
* 简单的watcher监听机制
* @param client
* @throws Exception
*/
static void simpleWatcher(CuratorFramework client) throws Exception{
client.create().withMode(CreateMode.PERSISTENT).forPath("/test-children-watch");

client.getChildren().usingWatcher(new CuratorWatcher() {
// 为getChildren目录节点的子节点去创建一个watcher 注意 watcher只会被触发一次
@Override
public void process(WatchedEvent watchedEvent) throws Exception {
// type NodeChildrenChanged
System.out.println("收到监听回调事件, type: " + watchedEvent.getType());
System.out.println("回调事件的path: " + watchedEvent.getPath());
}
}).forPath("/test-children-watch");

// 去添加children 来触发watcher回调
client.create().withMode(CreateMode.EPHEMERAL).forPath("/test-children-watch/child0");
// watcher是一次性的 不再去注册 不会再收到通知
client.create().withMode(CreateMode.EPHEMERAL).forPath("/test-children-watch/child1");
}


static void testBackGroundCallback(CuratorFramework client) throws Exception{
// backGround是异步处理机制 搭配CuratorListener监听器使用

// 添加一个CuratorListener 来处理backGround后台处理之后的回调
client.getCuratorListenable().addListener(new CuratorListener() {
@Override
public void eventReceived(CuratorFramework curatorFramework, CuratorEvent curatorEvent) throws Exception {
// 针对不通过的事件去处理
String path = curatorEvent.getPath();
System.out.println("当前线程名称:" + Thread.currentThread().getName());
switch (curatorEvent.getType()) {
case CREATE:
System.out.println("节点创建,path: "+ path);
break;
case EXISTS:
System.out.println("检查节点是否存在。path:" + path);
break;
case DELETE:
System.out.println("delete节点。path: " + path);
break;
case GET_DATA:
System.out.println("执行getdata path:" + path + " 数据:" + new String(curatorEvent.getData()));
break;
case SET_DATA:
System.out.println("执行setData path: " + path);
break;
case CHILDREN:
System.out.println("执行children path : "+ path);
break;
default:
}
}
});

// inBackGround操作 都是在上边的回调处理
client.create().withMode(CreateMode.PERSISTENT).inBackground().forPath("/curator-background-api", "test".getBytes(StandardCharsets.UTF_8));
client.checkExists().inBackground().forPath("/curator-background-api");
client.getData().inBackground().forPath("/curator-background-api");
client.setData().inBackground().forPath("/curator-background-api", "test2".getBytes(StandardCharsets.UTF_8));
}

/**
* 创建节点
* @param client
*/
static void createZNode(CuratorFramework client) {
try {
String path = client.create().withMode(CreateMode.PERSISTENT).forPath("/curator-demo", "test".getBytes(StandardCharsets.UTF_8));
System.out.println("创建的持久化节点path是:" + path);

// 检查一个节点是否存在 返回节点的stat信息
Stat stat = client.checkExists().forPath("/curator-demo");
Stat stat1 = client.checkExists().forPath("/curator-demo1");
System.out.println("节点信息" + stat);
System.out.println("不存在的节点 checkExists方法返回值" + stat1);


// 查询节点存储的内容
byte[] bytes = client.getData().forPath("/curator-demo");
System.out.println("节点内容:" + new String(bytes));

// 设置节点存储的内容
client.setData().forPath("/curator-demo", "testChanged".getBytes(StandardCharsets.UTF_8));

// 在子目录下创建多个临时顺序节点
for (int i =0; i < 5 ; i++) {
client.create().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).forPath("/curator-demo/child-");
}
// 获取所有子节点
List<String> strings = client.getChildren().forPath("/curator-demo");
strings.stream().forEach(System.out::println);


// 删除指定的节点 级联删除
client.delete().deletingChildrenIfNeeded().forPath("/curator-demo");
} catch (Exception e) {

}
}
}
-------------本文结束感谢您的阅读-------------

本文标题:ZK客户端curator的基本使用

文章作者:夸克

发布时间:2021年07月03日 - 07:07

最后更新:2022年07月01日 - 07:07

原始链接:https://zhanglijun1217.github.io/2021/07/03/ZK客户端curator的基本使用/

许可协议: 署名-非商业性使用-禁止演绎 4.0 国际 转载请保留原文链接及作者。