Disruptor学习

在学习并发的时候,关注了一下Disruptor这个框架,只是简单性的了解,并没有深入分析代码结构,自己在工作上没有用到过。

本章章节目录:

 1、Disruptor介绍

 2、Disruptor 适用场景

 3、Disruptor 实现原理;

 4、Disruptor 编程实战;

1、Disruptor介绍:

    官网说Disruptor是一个很吊的并发框架,它号称"能够在一个线程里每秒处理6百万订单"是一个基于事件源驱动机制的业务逻辑处理器,整个业务逻辑处理器完全运行在内存中,disruptor在无锁的网络情况下,实现了Queue的并发。

2.Disruptor的适用场景

    disruptor适用于大规模低延迟的并发场景。可用于读写操作分离、数据缓存,速度匹配(因为其实现了生产者-消费者模型)、或者是基于内存的事件流处理机制的场景。  

3.Disruptor的实现原理

Disruptor怎么高效法:

简单的总结:

   1,Disruptor使用了一个RingBuffer替代队列,用生产者消费者指针替代锁。

  2,生产者消费者指针使用CPU支持的整数自增,无需加锁并且速度很快。Java的实现在Unsafe package中。

ringbuffer到底是什么?

   它是一个环(首尾相接的环),你可以把它用做在不同上下文(线程)间传递数据的buffer。

 blob.png

基本来说,ringbuffer拥有一个序号,这个序号指向数组中下一个可用元素。

 blob.png

随着你不停地填充这个buffer(可能也会有相应的读取),这个序号会一直增长,直到绕过这个环。

 blob.png

要找到数组中当前序号指向的元素,可以通过mod操作:sequence mod array length = array index(取模操作)以上面的ringbuffer为例(java的mod语法):12 % 10 = 2,重新覆盖原来的槽位值。槽的个数是2的N次方更有利于基于二进制的计算机进行计算。

注:

    很多人都会有疑问,这样基于环形存储,不会有造成Ring Buffer重叠:

   原理很简单,当生产者或者消费者遇到槽位备占用的时候就会自旋等待。 当生产者结束向 Entry 写入数据后,它会要求 ProducerBarrier 提交。ProducerBarrier 先等待 Ring Buffer 的游标追上当前的位置(对于单生产者这毫无意义,比如,我们已经知道游标到了12 ,而且没有其他人正在写入 Ring Buffer)。然后 ProducerBarrier更新Ring Buffer的游标到刚才写入的Entry 序号13。接下来,ProducerBarrier 会让消费者知道 buffer 中有新东西了。它戳一下 ConsumerBarrier 上的 WaitStrategy 对象说-“喂,醒醒!有事情发生了!”(注意-不同的 WaitStrategy 实现以不同的方式来实现提醒,取决于它是否采用阻塞模式。)

优点:

    环形数组结构

    为了避免垃圾回收,采用数组而非链表。同时,数组对处理器的缓存机制更加友好。

    元素位置定位

        数组长度2^n,通过位运算,加快定位的速度。下标采取递增的形式。不用担心index溢出的问题。index是long类型,即使100万QPS的处理速度,也需要30万年才能用完。

     无锁设计

        每个生产者或者消费者线程,会先申请可以操作的元素在数组中的位置,申请到之后,直接在该位置写入或者读取数据。

4、Disruptor的编程实践

 -disruptor的主要编程部件 

   1.Disruptor:用于控制整个消费者-生产者模型的处理器 

   2.RingBuffer:用于存放数据 

   3.EventHandler:一个用于处理事件的接口(可以当做生产者,也可以当做消费者)。 

   4.EventFactory:事件工厂类。 

   5.WaitStrategy:用于实现事件处理等待RingBuffer游标策略的接口。 

   6.SequeueBarrier:队列屏障,用于处理访问RingBuffer的序列。 

   7.用于运行disruptor的线程或者线程池。

  disruptor编程主要的编程流程 

   1.定义事件 

   2.定义事件工厂 

   3.定义事件处理类 

   4.定义事件处理的线程或者线程池 

   5.指定等待策略 

   6.通过disruptor处理器组装生产者和消费者 

   7.发布事件 

   8.关闭disruptor业务逻辑处理器 

/********************场景********************************/

       1.一个生产者—一个消费者的场景 

   2.一个生产者—多个消费者的场景 

   3.多个生产者—一个消费者的场景 

   4.多个生产者—多个消费者的场景 

   5.生产者-消费流(这个是我自己取的名字,因为disruptor可以实现菱形的事件处理模型,这种结构有点像storm里面的实时计算数据处理模型。但这个是面向线程的,storm是面向进程的,有点像,但不是一个层次上的东西,不过两者可以结合起来使用。)  

   更为详细的代码参看:

                   https://github.com/LMAX-Exchange/disruptor/tree/master/src/perftest/java/com/lmax/disruptor/sequenced

   具体的代码实现如下:

public class Handler1 implements EventHandler<Trade>,WorkHandler<Trade> {  
	  
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
  
    @Override  
    public void onEvent(Trade event) throws Exception {  
    	System.out.println("handler1: set name");
    	event.setName("h1");
    	Thread.sleep(1000);
    }  
}  

public class Handler2 implements EventHandler<Trade> {  
	  
    @Override  
    public void onEvent(Trade event, long sequence,  boolean endOfBatch) throws Exception {  
    	System.out.println("handler2: set price");
    	event.setPrice(17.0);
    	Thread.sleep(1000);
    }  
      
}  


public class Handler3 implements EventHandler<Trade> {
    @Override  
    public void onEvent(Trade event, long sequence,  boolean endOfBatch) throws Exception {  
    	System.out.println("handler3: name: " + event.getName() + " , price: " + event.getPrice() + ";  instance: " + event.toString());
    }  
}



public class Handler4 implements EventHandler<Trade>,WorkHandler<Trade> {  
	  
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
  
    @Override  
    public void onEvent(Trade event) throws Exception {  
    	System.out.println("handler4: get name : " + event.getName());
    	event.setName(event.getName() + "h4");
    }  
}


public class Handler5 implements EventHandler<Trade>,WorkHandler<Trade> {  
	  
    @Override  
    public void onEvent(Trade event, long sequence, boolean endOfBatch) throws Exception {  
        this.onEvent(event);  
    }  
  
    @Override  
    public void onEvent(Trade event) throws Exception {  
    	System.out.println("handler5: get price : " + event.getPrice());
    	event.setPrice(event.getPrice() + 3.0);
    }  
}  




public class Main {  
    public static void main(String[] args) throws InterruptedException {  
       
    	long beginTime=System.currentTimeMillis();  
        int bufferSize=1024;  
        ExecutorService executor=Executors.newFixedThreadPool(8);  

        Disruptor<Trade> disruptor = new Disruptor<Trade>(new EventFactory<Trade>() {  
            @Override  
            public Trade newInstance() {  
                return new Trade();  
            }  
        }, bufferSize, executor, ProducerType.SINGLE, new BusySpinWaitStrategy());  
        
        //菱形操作
        /**
        //使用disruptor创建消费者组C1,C2  
        EventHandlerGroup<Trade> handlerGroup = 
        		disruptor.handleEventsWith(new Handler1(), new Handler2());
        //声明在C1,C2完事之后执行JMS消息发送操作 也就是流程走到C3 
        handlerGroup.then(new Handler3());
        */
        
        //顺序操作
        /**
        disruptor.handleEventsWith(new Handler1()).
        	handleEventsWith(new Handler2()).
        	handleEventsWith(new Handler3());
        */
        
        //六边形操作. 
        /**
        Handler1 h1 = new Handler1();
        Handler2 h2 = new Handler2();
        Handler3 h3 = new Handler3();
        Handler4 h4 = new Handler4();
        Handler5 h5 = new Handler5();
        disruptor.handleEventsWith(h1, h2);
        disruptor.after(h1).handleEventsWith(h4);
        disruptor.after(h2).handleEventsWith(h5);
        disruptor.after(h4, h5).handleEventsWith(h3);
        */
        
        
        
        disruptor.start();//启动  
        CountDownLatch latch=new CountDownLatch(1);  
        //生产者准备  
        executor.submit(new TradePublisher(latch, disruptor));
        
        latch.await();//等待生产者完事. 
       
        disruptor.shutdown();  
        executor.shutdown();  
        System.out.println("总耗时:"+(System.currentTimeMillis()-beginTime));  
    }  
}  


public class TradePublisher implements Runnable {  
	
    Disruptor<Trade> disruptor;  
    private CountDownLatch latch;  
    
    private static int LOOP=10;//模拟百万次交易的发生  
  
    public TradePublisher(CountDownLatch latch,Disruptor<Trade> disruptor) {  
        this.disruptor=disruptor;  
        this.latch=latch;  
    }  
  
    @Override  
    public void run() {  
    	TradeEventTranslator tradeTransloator = new TradeEventTranslator();  
        for(int i=0;i<LOOP;i++){  
            disruptor.publishEvent(tradeTransloator);  
        }  
        latch.countDown();  
    }  
      
}  
  
class TradeEventTranslator implements EventTranslator<Trade>{  
    
	private Random random=new Random();  
    
	@Override  
    public void translateTo(Trade event, long sequence) {  
        this.generateTrade(event);  
    }  
    
	private Trade generateTrade(Trade trade){  
        trade.setPrice(random.nextDouble()*9999);  
        return trade;  
    }  
	
}

参考资料:

   http://www.360doc.com/content/15/0324/17/11962419_457721378.shtml

   http://blog.csdn.net/jeffsmish/article/details/53572043

   http://developer.51cto.com/art/201306/399370.htm

发表评论