基于curator 操作Zookeeper

本章节基于Curator操作Zookeeper讲解:

 curator解决了很多Zookeeper客户端非常底层的细节开发工作,包括链接重连,反复注册Watcher和NodeExistsException异常等,还提供了各种场景抽象封装(分布式锁、Master选举机制,分布式计数器等)。本章简单的介绍基于curator增删改查操作;

系列目录:

    1. Zookeeper介绍  

    2.Zookeeper运维管理

    3. Zookeeper的Zab协议

   4. 基于Java操作ZookeeperAPI

 创建会话:

RetryPolicy policy=new ExponentialBackoffRetry(1000, 3);
		CuratorFramework client=CuratorFrameworkFactory.builder()
				.connectString("127.0.0.1:2181")
				.sessionTimeoutMs(5000)
				.retryPolicy(policy)
				.namespace("tests")
				.build();
		client.start();

  .namespace("tests") 

      是为了实现不同的Zookeeper业务之间的隔离,从而分配一个独立的命名空间,即指定一个Zookeeper的跟路径,那么这个Zookeeper客户端都将基于这个目录下进行Node操作;

创建一个节点:

String path="/x1";

client.create().forPath(path);

client.create().forPath(path,"xxoo".getBytes());

如果没有指定CreateMode 那么默认创建的是持久节点;

client.create().withMode(CreateMode.EPHEMERAL).forPath(path,"xxoo".getBytes());

自动递归创建父节点:

client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path,"xxoo".getBytes());

    注:Zookeeper规定所有的非叶子节点必须为持久化节点,所以上述API,path为临时节点,父节点为持久化节点;

删除节点;

                client.delete().forPath(path);          

          client.delete().deletingChildrenIfNeeded().forPath(path); //递归删除子节点

   client.delete().withVersion(11); //指定版本号删除

client.delete().guaranteed().forPath(path); //强制删除,保障措施;(只要会话有效,其会在后台反复重试,直到节点删除成功);

读取节点:

client.getData().forPath(path);

Stat stat=new Stat();

client.getData().storingStatIn(stat).forPath(path); //读取节点内容,并同时获取该节点的State信息;

更新节点:

Stat stat2=client.setData().forPath(path);

Stat stat3=client.setData().withVersion(11).forPath(path);

Stat stat4=client.setData().withVersion(11).forPath(path,"xxoo".getBytes());

curator的异步操作:

ExecutorService tp = Executors.newFixedThreadPool(2);
		client.create().creatingParentsIfNeeded().inBackground(new BackgroundCallback() {
			
			@Override
			public void processResult(CuratorFramework client, CuratorEvent event)
					throws Exception {
				 System.out.println("event[code: " + event.getResultCode() + ", type: " + event.getType() + "]");
	              System.out.println("Thread of processResult: " + Thread.currentThread().getName());
			}
		},tp).forPath(path);

事件监听:

     Zookeeper原声支持通过注册Watcher来进行事件监听,但使用起来不是特别方便,需要反复注册Watcher,比较繁琐。Curator引入了Cache来实现对Zookeeper服务端事件的监听,Cache是Curator中对事件监听的包装,并且自动反复注册监听。Cache分为两类监听类型:节点监听和子节点监听。

NodeCache

    NodeCache用于监听指定Zookeeper数据节点本身的变化。

  private static String path = "/zk-book/nodecache";
	public static void main(String[] args) throws Exception {
		RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
		CuratorFramework client = CuratorFrameworkFactory.builder()
				.connectString("127.0.0.1:2181").sessionTimeoutMs(5000)
				.retryPolicy(policy).build();
		client.start();
		client.create().creatingParentsIfNeeded().forPath(path);
		final NodeCache nodeCache=new NodeCache(client, path);
		nodeCache.start(true);
		nodeCache.getListenable().addListener(new NodeCacheListener() {
			
			@Override
			public void nodeChanged() throws Exception {
				System.out.println("Node data update ,new Data:"+new String(nodeCache.getCurrentData().getData()));
			}
		});
		
		client.setData().forPath(path, "xxoo".getBytes());
		client.setData().forPath(path, "xxoo2".getBytes());
		client.setData().forPath(path, "xxoo3".getBytes());
		Thread.sleep(1000);
		client.delete().deletingChildrenIfNeeded().forPath(path);
		Thread.sleep(Integer.MAX_VALUE);
	}

PathChildrenCache

       PathChildrenCache用于监听指定Zookeeper数据节点的子节点变化情况,无法对二级子节点进行事件监听。主要的事件类型,包括新增子节点(CHILD_ADDED)、子节点变更(CHILD_UPDATED) 、子节点删除(CHILD_REMOVED) 

String path="/zk-child-node"
RetryPolicy policy = new ExponentialBackoffRetry(1000, 3);
		CuratorFramework client = CuratorFrameworkFactory.builder()
				.connectString("127.0.0.1:2181").sessionTimeoutMs(5000)
				.retryPolicy(policy).build();
		client.start();
		client.create().creatingParentsIfNeeded().forPath(path);
		System.out.println("-----");
		PathChildrenCache childrenCache=new PathChildrenCache(client, path, true);
		childrenCache.start();
		childrenCache.getListenable().addListener(new PathChildrenCacheListener() {
			
			@Override
			public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
					throws Exception {
				switch(event.getType()) {
					case CHILD_ADDED:
						System.out.println("CHILD_ADDED,"+ event.getData().getPath());
					break;
					case CHILD_UPDATED:
						System.out.println("CHILD_UPDATED,"+ event.getData().getPath());
					break;
					case CHILD_REMOVED:
						System.out.println("CHILD_REMOVED,"+ event.getData().getPath());
					break;
					default:
					 System.out.println(event.getType());
					break;
		     }
			}
		});
		
		//新增子节点会触发PathChildrenCacheListener
		 client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1");
		 Thread.sleep(1000);
		
		 client.setData().forPath(path + "/c1" , "xxoo".getBytes());
		 Thread.sleep(1000);
		
		 client.create().withMode(CreateMode.PERSISTENT).forPath(path + "/c1/c2" , "xxoo".getBytes());
		 Thread.sleep(1000);
		 
		 client.setData().forPath(path + "/c1/c2" , "xxoo".getBytes()); 
		 #无法对二级子节点进行事件监听,对于节点的自身变更也是不会通知;
		 Thread.sleep(1000);
		 	 client.delete().deletingChildrenIfNeeded().forPath(path);
		 Thread.sleep(Integer.MAX_VALUE);

发表评论