zookeeper典型应用场景之一:master选举

1、使用场景及结构

  现在很多时候我们的服务需要7*24小时工作,假如一台机器挂了,我们希望能有其它机器顶替它继续工作。此类问题现在多采用master-salve模式,也就是常说的主从模式,正常情况下主机提供服务,备机负责监听主机状态,当主机异常时,可以自动切换到备机继续提供服务(这里有点儿类似于数据库主库跟备库,备机正常情况下只监听,不工作),这个切换过程中选出下一个主机的过程就是master选举。

  对于以上提到的场景,传统的解决方式是采用一个备用节点,这个备用节点定期给当前主节点发送ping包,主节点收到ping包后会向备用节点发送应答ack,当备用节点收到应答,就认为主节点还活着,让它继续提供服务,否则就认为主节点挂掉了,自己将开始行使主节点职责。RocketMQ 的底层: broker和所有nameserver保持长连接就是通过发送心跳包机制来实现保持长链接(心跳间隔、心跳超时);

    如图1所示:blob.png

    但这种方式会存在一个隐患,就是网络故障问题,可能会出现脑裂或者机器假死的问题

    看一下图2:

blob.png 

     小说了一下Zookeeper,接下来讨论一下脑裂,假死等等问题以及解决方法吧。

 脑裂:

       我们的主节点并没有挂掉,只是在备用节点ping主节点,请求应答的时候发生网络故障,这样我们的备用节点同样收不到应答,就会认为主节点挂掉,然后备机会启动自己的master实例。这样就会导致系统中有两个主节点,也就是双master。出现双master以后,我们的从节点会将它做的事情一部分汇报给主节点,一部分汇报给备用节点,这样服务数据不统一。这种现象就是脑裂现象:

    题外话:

     LVS在配置的时候不处理好,也会常有脑裂问题:  http://blog.csdn.net/u013694670/article/details/60580175

      网上很多说法是基于Shell脚本,检测正常情况下keepalived的VIP地址是在主节点上的,如果在从节点发现了VIP,就设置报警信息;

      Zookeeper在集群环境的时候也会出现脑裂问题:    http://blog.csdn.net/u010185262/article/details/49910301

  

    那我们怎么保持上述服务在同一时刻只存在一个主节点。我们来看zookeeper是怎么实现的:

    在此处,抢主程序是包含在服务程序中,需要程序员来手动写抢主逻辑的,比如当当开源框架elastic-job中,就有关于选主的部分,参见:        https://github.com/dangdangdotcom/elastic-job/tree/7dc099541a16de49f024fc59e46377a726be7f6b/elastic-job-lite/elastic-job-lite-core/src/main/java/com/dangdang/ddframe/job/lite/internal/election

master选举的架构图:

blob.png

        选主原理介绍:zookeeper的节点有两种类型,持久节点跟临时节点。临时节点有个特性,就是如果注册这个节点的机器失去连接(通常是宕机),那么这个节点会被zookeeper删除。选主过程就是利用这个特性,在服务器启动的时候,去zookeeper特定的一个目录下注册一个临时节点(这个节点作为master,谁注册了这个节点谁就是master),注册的时候,如果发现该节点已经存在,则说明已经有别的服务器注册了(也就是有别的服务器已经抢主成功),那么当前服务器只能放弃抢主,作为从机存在。同时,抢主失败的当前服务器需要订阅该临时节点的删除事件,以便该节点删除时(也就是注册该节点的服务器宕机了或者网络断了之类的)进行再次抢主操作。从机具体需要去哪里注册服务器列表的临时节点,节点保存什么信息,根据具体的业务不同自行约定。选主的过程,其实就是简单的争抢在zookeeper注册临时节点的操作,谁注册了约定的临时节点,谁就是master。


        ps:本文的例子中,并未用到结构图server节点下的数据。但换一种算法或者业务场景就会用到,算法比如提到的最小编号,主要逻辑是主节点挂掉后,从节点里边编号最小的成为主节点,此时会用到该节点内容。换一种业务场景:集群环境中,有很多任务要处理, 主节点负责接收任务,并根据一定算法将任务分配到不同的机器上执行;这种情况下,主节点跟从节点的职责也是不同的,主节点挂掉也会涉及到从节点进行master选举的问题。这种情况下,很显然,作为主节点需要知道当前有多少个从节点还活着,那么此时也会需要用到servers节点下的数据了。


2、编码实现

   主要有两个类,WorkServer为主服务类,RunningData用于记录运行数据。因为是简单的demo,我们只做抢master节点的编码,对于从节点应该去哪里注册服务列表信息,不作编码。

 采用zkClient实现,代码如下:

 WorkServer类:

public class WorkServer {
 
    /**
     * 是否运行
     */
    private volatile boolean running = false;
   
    private static final String MASTER_PATH = "/master";
   
    private ZkClient zkClient;
   
    private RunningData serverData;
   
    private RunningData masterData;
   
    private IZkDataListener dataListener;  //节点监听
   
    private IZkChildListener childListener; //子节点列表变更
   
    private ScheduledExecutorService executorService=Executors.newScheduledThreadPool(1); //启动一个定时线程服务
 
    public WorkServer(RunningData rd) {
        this.serverData=rd;
        this.dataListener=new IZkDataListener() {
            @Override
            public void handleDataDeleted(String dataPath) throws Exception {
                takeMaster();  //重新选举
               
                //为了防止网络抖动情况,Zookeeper认为该服务器失效而造成重新选举,如果选择的Master不是上一台Master机器,会照成资源迁移,
                //损耗一定的性能,所有优化: 重新选举以上一台Master机器优先权高高一点。
                /*if (masterData!=null && masterData.getName().equals(serverData.getName())){
                    takeMaster();
                   
                }else{
                    executorService.schedule(new Runnable(){
                        public void run(){
                            takeMaster();
                        }
                    }, delayTime, TimeUnit.SECONDS);
                   
                }*/
               
            }
           
            @Override
            public void handleDataChange(String dataPath, Object data) throws Exception {
               
            }
           
        };
       
        this.childListener=new IZkChildListener() {
            @Override
            public void handleChildChange(String parentPath, List<String> currentChilds)
                    throws Exception {
                System.out.println(parentPath + " 's child changed, currentChilds:" + currentChilds);
                if(currentChilds!=null && currentChilds.size()>0){
                    for (String node  : currentChilds) {
                         String path = parentPath + "/" + node; 
                         System.out.println(path);
                    }
                }
            }
        };
       
    }
   
    public void start() throws Exception {
        if (running) {
            throw new Exception("server has startup...");
        }
        running=true;
        zkClient.subscribeDataChanges(MASTER_PATH, dataListener);
        zkClient.subscribeChildChanges(MASTER_PATH, childListener);
        takeMaster();
    }
   
    public void stop() throws Exception {
        if (!running) {
            throw new Exception("server has stoped");
        }
        running = false;
        zkClient.unsubscribeDataChanges(MASTER_PATH, dataListener);
        zkClient.unsubscribeChildChanges(MASTER_PATH, childListener);
        releaseMaster();
    }
   
    /**
     * 释放选举
     */
    private void releaseMaster() {
        if(checkMaster()){ //如果自己是Master
            zkClient.delete(MASTER_PATH);
        }
    }
   
   
 
    /**
     * 重新选举
     */
    private void takeMaster() {
        if (!running)
            return;
        //
        try {
            zkClient.createEphemeral(MASTER_PATH, serverData);
            masterData = serverData;
            System.out.println(serverData.getName()+" is master");
           
            /**
             * 为了测试选举目的
             */
            executorService.schedule(new Runnable() {        
                public void run() {
                    if (checkMaster()){
                        System.out.println("释放选举");
                        releaseMaster();
                    }
                }
            }, 5, TimeUnit.SECONDS);
        } catch (ZkNodeExistsException e) {
            System.out.println("master node exist ");
            RunningData runningData=zkClient.readData(MASTER_PATH);
            if(runningData==null){
                takeMaster();
            }else{
                masterData=runningData;
            }
        }
    }
   
    private boolean checkMaster() {
        try{
            RunningData runningData=zkClient.readData(MASTER_PATH);
            masterData = runningData;
            if (masterData.getName().equals(serverData.getName())) {
                return true;
            }
        }catch (ZkNoNodeException e) {
            return false;
        }catch (ZkInterruptedException e) {  //网络中断情况
            checkMaster();
        }catch (ZkException e) {
            return false;
        }
        return false;
    }
   
   
    public ZkClient getZkClient() {
        return zkClient;
    }
 
    public void setZkClient(ZkClient zkClient) {
        this.zkClient = zkClient;
    }
   
}

 RunningData类:

public class RunningData implements Serializable {
 
    /**
     *
     */
    private static final long serialVersionUID = 4260577459043203630L;
 
    private Long cid;  //客户端ID
   
    private String name; //客户端名称
 
    public Long getCid() {
        return cid;
    }
 
    public void setCid(Long cid) {
        this.cid = cid;
    }
 
    public String getName() {
        return name;
    }
 
    public void setName(String name) {
        this.name = name;
    }
 
}

     说明:在实际生产环境中,可能会由于插拔网线等导致网络短时的不稳定,也就是网络抖动。由于正式生产环境中可能server在zk上注册的信息是比较多的,而且server的数量也是比较多的,那么每一次切换主机,每台server要同步的数据量(比如要获取谁是master,当前有哪些salve等信息,具体视业务不同而定)也是比较大的。那么我们希望,这种短时间的网络抖动最好不要影响我们的系统稳定,也就是最好选出来的master还是原来的机器,那么就可以避免发现master更换后,各个salve因为要同步数据等导致的zk数据网络风暴。所以在WorkServer中,

if (masterData!=null && masterData.getName().equals(serverData.getName())){
                    takeMaster();
                   
                }else{
                    executorService.schedule(new Runnable(){
                        public void run(){
                            takeMaster();
                        }
                    }, delayTime, TimeUnit.SECONDS);
                   
}

,我们抢主的时候,如果之前主机是本机,则立即抢主,否则延迟5s抢主。这样就给原来主机预留出一定时间让其在新一轮选主中占据优势,从而利于环境稳定。

public class LeaderSelectorZkClient {
 
    //启动的服务个数
    private static final int        CLIENT_QTY = 10;
    //zookeeper服务器的地址
    private static final String     ZOOKEEPER_SERVER = "127.0.0.1:2181";
   
    public static void main(String[] args) {
        //保存所有zkClient的列表
        List<ZkClient>  clients = new ArrayList<ZkClient>();
        //保存所有服务的列表
        List<WorkServer>  workServers = new ArrayList<WorkServer>();
       
        try {
            for ( int i = 0; i < CLIENT_QTY; ++i )
            {
            //创建zkClient
                ZkClient client = new ZkClient(ZOOKEEPER_SERVER, 5000, 5000, new SerializableSerializer());
                clients.add(client);
               
                //创建serverData
                RunningData runningData = new RunningData();
                runningData.setCid(Long.valueOf(i));
                runningData.setName("Client #" + i);
               
                //创建服务
                WorkServer  workServer = new WorkServer(runningData);
                workServer.setZkClient(client);
               
               
                workServers.add(workServer);
                workServer.start();
            }
            System.out.println("敲回车键退出!\n");
            new BufferedReader(new InputStreamReader(System.in)).readLine();
        } catch (Exception e) {
           
             System.out.println("Shutting down...");
             
             for ( WorkServer workServer : workServers )
                {
                    try {
                        workServer.stop();
                    } catch (Exception er) {
                        er.printStackTrace();
                    }            
                }
                for ( ZkClient client : clients )
                {
                    try {
                        client.close();
                    } catch (Exception er) {
                        er.printStackTrace();
                    }            
                }
        }
       
    }
}

   两次测试,本地模拟10台server,分别不启用防止网络抖动跟启动防抖动两次测试结果如下:

 未启动防抖动:

blob.png

 启用防抖动:

blob.png

基于Apache Curator实现master选举的demo

 第一种方式: LeaderLatch

        这种是有阻塞的,就是大家一起上,谁先上了,就一直阻塞着,直到方法执行完成。如果执行结束,那么其他的兄弟就选一个出来。我觉得这种适合主备,比如开2 个 job,一个挂了另一个就上。

public class LeaderLatchDemo {  
  
    public static void main(String[] args) throws Exception {  
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);  
        CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").sessionTimeoutMs(2000).connectionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("text").build();  
        client.start();  
        // 选举Leader 启动  
        LeaderLatch latch = new LeaderLatch(client,"/path");  
        latch.start();  
        latch.await();  
        System.err.println("我启动了");  
        Thread.currentThread().sleep(1000000);  
        latch.close();  
        client.close();  
    }  
}

 第二种方式  LeaderSelector:

      这种复杂一点,他有个leaderSelector.autoRequeue();就是自动抢,比如打印一个helloworld,第一个打印完,第2个打印,然后第3个打印。然后第一个再打印,大家一起抢。有点动态选举的味道。

     代码:

public class LeaderSelectorDemo {  
  
    public static void main(String[] args) throws Exception {  
        RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000, 3);  
        final CuratorFramework client = CuratorFrameworkFactory.builder().connectString("127.0.0.1:2181").sessionTimeoutMs(5000).connectionTimeoutMs(10000).retryPolicy(retryPolicy).namespace("text").build();  
        client.start();  
          
        final LeaderSelector leaderSelector = new LeaderSelector(client, "/led", new LeaderSelectorListenerAdapter(){  
  
            @Override  
            public void takeLeadership(CuratorFramework client) throws Exception {  
                System.err.println("work ing...");  
                Thread.currentThread().sleep(3000);  
                System.err.println("end");  
            }  
              
        });  
        leaderSelector.autoRequeue();  
        leaderSelector.start();  
        System.in.read();  
    }  
}

该文章的主要参考:http://www.cnblogs.com/nevermorewang/p/5611807.html

其他资料:

    http://blog.csdn.net/massivestars/article/details/53894551

    https://github.com/wbj0110/zookeeper-master-slave-example

    https://github.com/JuPyoHong/zookeeper-master-worker

发表评论