zookeeper典型应用场景之四:分布式原子与分布式Barrier

一、分布式原子计数介绍

         分布式锁主要用于在分布式环境中保护跨进程、跨主机、跨网络的共享资源实现互斥访问,以达到保证数据的值的原子性,单个JVM是基于AtomicInteger来实现数据的原子性, AtomicInteger 是基于CAS的乐观锁实现机制保证变量的原子性: 

       Zookeeper 来实现分布式的计数也是差不多方式,建立一个持久化节点,每次修改的时候,带上Node 的原始版本, 通过Version 实现CAS乐观锁操作;

二、基于Curator实现原子计数器:

DistributedAtomicInteger atomicInteger = new DistributedAtomicInteger(client,"/counter",new RetryNTimes(32,1000));
AtomicValue<Integer> rc = atomicInteger.increment();

三、分布式屏障 :

    Barrier是一种控制线程同步的经典方式,在JDK 并发包中 通过CyclicBarrier就能实现所有线程都处于就绪的状态后才同时执行其他业务逻辑,那么如果在分布式的环境下是如何实现的了??

  原理:

    所谓的屏障就是达到了某个条件,只有这个条件的满足,就能推动自己的任务操作,Zookeeper 中实现屏障时指定一个屏障节点(barrier node),如果屏障节点存在,屏障就会生效,下面是伪代码:

  1、客户端在屏障节点上调用 ZooKeeper API  exists(),watch 设置为 true.

  2、如果 exists() 返回 false,屏障消失,客户端可以推进的自己的工作。

  3、否则, exists() 返回 true,客户端等待屏障节点上监听事件的到来。

  4、如果监听事件被触发,客户端重新执行 exists( ), 再一次重复上述 1-3 步,直到屏障节点被移除。

https://github.com/apache/curator/blob/master/curator-recipes/src/main/

      java/org/apache/curator/framework/recipes/barriers/DistributedBarrier.java

基于Curator实现:

public static void main(String[] args) throws Exception {  
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,3);  
        CuratorFramework client = CuratorFrameworkFactory  
                .newClient("127.0.0.1:2181", retryPolicy);  
        client.start();  
          
        //创建屏障类 不同JVM需要使用相同的目录 即/DistributedBarrier  
        final DistributedBarrier barrier = new DistributedBarrier(client, "/DistributedBarrier");  
        //创建屏障节点  
        barrier.setBarrier();  
        
        ExecutorService executors=Executors.newFixedThreadPool(10);
        for (int i = 0; i < 20; i++) {
        	executors.submit(new Runnable() {
				@Override
				public void run() {
					  //等待屏障移除  
					 try {
						barrier.waitOnBarrier();
					} catch (Exception e) {
						e.printStackTrace();
					} 
					 System.out.println("-------冲啊 兄弟们------------");
				}
			});
		}
        Thread.sleep(5000);  
        //等待屏障移除  
        barrier.removeBarrier();
        System.out.println("======屏障已经移除======");  
    }

DistributedDoubleBarrier

final DistributedDoubleBarrier doubleBarrier = new DistributedDoubleBarrier(
client, "/DistributedBarrier", 5);
doubleBarrier.enter();
doubleBarrier.leave();

    上述“5”表示,每个Barrier的参与都会在调用enter方法之后进行等待,此时处于准备进入状态,一旦准备进入的Barrier的成员个数达到5个后,所有的成员会被同时触发进入,之后调用DistributedDoubleBarrier.leave方法会再次等待,此时处于准备退出状态,一旦准备退出的Barrier达到成员数5个后,所有的成员同样会同触发退出。

发表评论