基于JAVA操作ZookeeperAPI

 本章节简单的介绍一下基于Java 操作Zookeeper的增删改查,下章节将基于Curator操作Zookeeper讲解;

系列目录:

    1. Zookeeper介绍  

    2.Zookeeper运维管理

    3. Zookeeper的Zab协议

1. 创建会话:

Zookeeper重载了几个构造函数,

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)  

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher,  boolean canBeReadOnly)  

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)  


参数说明:

    connectString:服务器地址,多台机器使用","隔开;

    sessionTimeout:会话超时时间,毫秒为单位;Zookeeper客户端与服务端以心跳机制来维持会话的有效性,假设在sessionTimeout时间内没有进行有效的心跳检测,则认为会话失效.

    watcher: 事件通知处理器。

    canBeReadOnly:用于标识当前会话是否支持“read-only”模式。默认情况下,在ZK集群中,一个机器如果和集群中的半数以及半数以上的机器失去连接,那么这个机器将不再处理客户端请求(读请求+写请求均不处理);但是有时候我们希望在发生此类故障时不影响读取请求的处理,这个就是zk的read-only 模式;

   sessionId和sessionPasswd,分别代表会话Id和会话密钥。这个参数可以唯一确定一个会话。

 代码:

/**
 * 基本的回话实例
 * Zookeeper客户端与服务端会话建立是一个异步的过程,
 * 并没有真正建立一个可用的会话,在会话周期中处于“CONNECTING”状态,所以使用CountDownLatch
 * 
 */
public class ZooKeeperCreateSession implements Watcher {

	private static CountDownLatch countDownLatch=new CountDownLatch(1);
	
	public static void main(String[] args) {
		try {
			ZooKeeper zooKeeper=new ZooKeeper("127.0.0.1:2181", 500, new ZooKeeperCreateSession());
		    System.out.println(zooKeeper.getState());
		    countDownLatch.await();
		    System.out.println("Zookeeper session established.");
		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} 
	}
	
	@Override
	public void process(WatchedEvent event) {
		System.out.println("receive watch event :"+event);
		if(KeeperState.SyncConnected== event.getState()){
			countDownLatch.countDown();
		}
	}

}

使用:

      long sessionId=zooKeeper.getSessionId();

      byte[]pwd=zooKeeper.getSessionPasswd();

     可以获取Zookeeper session 提高会话复用,维持会话的有效性;

2.创建节点:

public void create(final String path, byte data[], List<ACL> acl, CreateMode createMode,  StringCallback cb, Object ctx)  #异步方式

public String create(final String path, byte data[], List<ACL> acl, CreateMode createMode) #同步方式;

1.	/** 
2.	     *  创建节点 
3.	     * @param path 节点path 
4.	     * @param data 初始数据内容 
5.	     * @return 
6.	     */ 
7.	    public boolean createPath( String path, String data ) { 
8.	        try { 
9.	            System.out.println( "节点创建成功, Path: " 
10.	                    + this.zk.create( path, // 
11.	                                              data.getBytes(), // 
12.	                                              Ids.OPEN_ACL_UNSAFE, // 
13.	                                              CreateMode.EPHEMERAL ) 
14.	                    + ", content: " + data ); 
15.	        } catch ( KeeperException e ) { 
16.	            System.out.println( "节点创建失败,发生KeeperException" ); 
17.	            e.printStackTrace(); 
18.	        } catch ( InterruptedException e ) { 
19.	            System.out.println( "节点创建失败,发生 InterruptedException" ); 
20.	            e.printStackTrace(); 
21.	        } 
22.	        return true; 
23.	    }

   异步方式:

/**
 *  异步创建节点接口
 */
public class ZooKeeper_Create_Sync  implements Watcher{
    private static CountDownLatch countDownLatch=new CountDownLatch(1);
	public static void main(String[] args) {
		try {
			ZooKeeper zooKeeper=new ZooKeeper("127.0.0.1:2181", 500, new ZooKeeper_Create_Sync());
		    System.out.println(zooKeeper.getState());
		    countDownLatch.await();
		    
		    //临时节点
		    zooKeeper.create("/zk-test-1", "xxoo".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,new IStringCallback(),"Hello word");
		    
		    zooKeeper.create("/zk-test-1", "xxoo".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT,new IStringCallback(),"Hello word");
		    
		    zooKeeper.create("/zk-test-1", "xxoo".getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT_SEQUENTIAL,new IStringCallback(),"Hello word");
		    
		    System.out.println("-----------------");
		} catch (IOException e) {
			e.printStackTrace();
		} catch (InterruptedException e) {
			e.printStackTrace();
		}  
	}
	
	@Override
	public void process(WatchedEvent event) {
		System.out.println("---------------receive watch event :"+event);
		if(KeeperState.SyncConnected== event.getState()){
			countDownLatch.countDown();
		}
	}
}

class IStringCallback implements    AsyncCallback.StringCallback{
	@Override
	public void processResult(int rc, String path, Object ctx, String name) {
		System.out.println("Create path result: 服务响应码="+rc + "
		 节点参数值:"+path+" ctx参数值:"+ctx+" 节点名="+name);
	}
}

注:

  rc是result code 服务端响应结果码。客户端可以从这个结果码中识别出API的调用结果,常见的结果码有:

       0(OK),接口调用成功

       -4(ConnectionLoss),客户端和服务器连接断开

       -110(NodeExists) 节点已存在

       -112(SessionExpired)会话已过期

  path: 接口调用传入的数据节点的节点路径

   ctx: 接口调用传入的ctx参数

  name: 实际在服务器端创建的节点名

3.删除节点:

   void delete(final String path,int version);

   void delete(final String path,int version,VoidCallback cb,Object ctx);


  如果删除的节点不存在则会抛出一个NoNodeException,如果删除数据的版本号不正确则抛出一个BadVersionException

代码如上雷同;

1.	/** 
2.	     * 删除指定节点 
3.	     * @param path 节点path 
4.	     */ 
5.	    public void deleteNode( String path ) { 
6.	        try { 
7.	            this.zk.delete( path, -1 ); 
8.	            System.out.println( "删除节点成功,path:" + path ); 
9.	        } catch ( KeeperException e ) { 
10.	            System.out.println( "删除节点失败,发生KeeperException,path: " + path  ); 
11.	            e.printStackTrace(); 
12.	        } catch ( InterruptedException e ) { 
13.	            System.out.println( "删除节点失败,发生 InterruptedException,path: " + path  ); 
14.	            e.printStackTrace(); 
15.	        } 
16.	    }

4.读取数据:

 获取子节点列表的接口:getChildren

public class Zookeeper_GetChildren_Sync implements Watcher {

	private static CountDownLatch connectedSemaphore = new CountDownLatch(1);
	private static ZooKeeper zk = null;

	public static void main(String[] args) throws Exception {
		String path = "/zk-book";
		zk = new ZooKeeper("127.0.0.1:2181", 5000, //
				new Zookeeper_GetChildren_Sync());
		connectedSemaphore.await(); // awaint方法,调用此方法会一直阻塞当前线程,直到计时器的值为0
		List<String> childrenList = null;

		zk.create(path, "".getBytes(), Ids.OPEN_ACL_UNSAFE,
				CreateMode.PERSISTENT);
		zk.create(path + "/c1", "".getBytes(), Ids.OPEN_ACL_UNSAFE,
				CreateMode.EPHEMERAL);

		childrenList = zk.getChildren(path, true);
		System.out.println(childrenList);

		zk.create(path + "/c2", "".getBytes(), Ids.OPEN_ACL_UNSAFE,
				CreateMode.EPHEMERAL);

		Thread.sleep(Integer.MAX_VALUE);

	}

	@Override
	public void process(WatchedEvent event) {
		if (KeeperState.SyncConnected == event.getState()) {
			if (EventType.None == event.getType() && null == event.getPath()) {
				connectedSemaphore.countDown();
			} else if (event.getType() == EventType.NodeChildrenChanged) {
				try {
					System.out.println("ReGet Child:"
							+ zk.getChildren(event.getPath(), true));
				} catch (Exception e) {
				}
			}
		}
	}
}

获取节点数据的接口: getData 与 getChildren 雷同,getData返回的是字节结果类型

1.	/**
2.	     * 读取指定节点数据内容 
3.	     * @param path 节点path 
4.	     * @return 
5.	     */ 
6.	    public String readData( String path ) { 
7.	        try { 
8.	            System.out.println( "获取数据成功,path:" + path ); 
9.	            return new String( this.zk.getData( path, false, null ) ); 
10.	        } catch ( KeeperException e ) { 
11.	            System.out.println( "读取数据失败,发生KeeperException,path: " + path  ); 
12.	            e.printStackTrace(); 
13.	            return ""; 
14.	        } catch ( InterruptedException e ) { 
15.	            System.out.println( "读取数据失败,发生 InterruptedException,path: " + path  ); 
16.	            e.printStackTrace(); 
17.	            return ""; 
18.	        } 
19.	    }

5.更新节点:setData

   更新节点的内容可以带上内容版本号,CAS 原理;

  /** 
2.	     * 更新指定节点数据内容 
3.	     * @param path 节点path 
4.	     * @param data  数据内容 
5.	     * @return 
6.	     */ 
7.	    public boolean writeData( String path, String data ) { 
8.	        try { 
9.	            System.out.println( "更新数据成功,path:" + path + ", stat: " + 
10.	                                                        this.zk.setData( path, data.getBytes(), -1 ) ); 
11.	        } catch ( KeeperException e ) { 
12.	            System.out.println( "更新数据失败,发生KeeperException,path: " + path  ); 
13.	            e.printStackTrace(); 
14.	        } catch ( InterruptedException e ) { 
15.	            System.out.println( "更新数据失败,发生 InterruptedException,path: " + path  ); 
16.	            e.printStackTrace(); 
17.	        } 
18.	        return false; 
19.	    }

Zookeeper权限控制:

    http://blog.csdn.net/yueyedeai/article/details/17106147

发表评论