zookeeper典型应用场景之二:worker 任务分发

 传统的worker实现,可以基于队列的形式,通过轮询扫描来实现,任务的派发动作;

 如果想实现分布式的形式,来实现多worker的任务派发,怎么实现了??

 本章主要简单的通过Zookeeper来实现多任务的派发;

  分布式任务(Worker)处理方案 这篇文章也给出了分布式任务派发的不同解决方案;

  基于Zookeeper的 worker 任务分发简单实现原理 如图:  blob.png

  

  竟供于思路参考:

  

  Master 实现代码:

public class Master extends Thread implements Watcher, AsyncCallback.ChildrenCallback {

	public static String WAITING = "WAITING";

	private ZooKeeperUtils zkUtils;

	public Master(String hostPort) throws IOException {
		zkUtils = new ZooKeeperUtils(new ZooKeeper(hostPort, 15000, this));
	}

	@Override
	public void process(WatchedEvent event) {
		// 实现重复注册监听服务
		zkUtils.asyncGetChildren("/tasks", true, this, null);
	}

	@Override
	public void processResult(int rc, String path, Object ctx,
			List<String> children) {
		if (children == null || children.size() == 0) {
			return;
		}
		for (String task : children) {
			String taskStatus = zkUtils.getData("/tasks/" + task);
			if ("RUNNING".equals(taskStatus)) {
				continue;
			}else if("DONE".equals(taskStatus)){
				System.out.println(task+"=task--已经处理成功过了");
				continue;
			}
			String availableWorker = getAvailableWorkers();

			zkUtils.asyncCreate("/assign/" + availableWorker, task.getBytes(), 
			Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, null, null);
			
			//zkUtils.asyncGetChildren("/tasks", true, this, null);
		}

	}

	private String getAvailableWorkers() {
		while (true) {
			List<String> workers = zkUtils.getChildren("/workers", false, null);
			for (String worker : workers) {
				String workerStatus = zkUtils.getData("/workers/" + worker);
				if ("IDLE".equals(workerStatus)) {
					return worker;
				}
			}
			try {
				Thread.sleep(1000);
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
	}
	
	@Override
	public void run() {
		zkUtils.asyncGetChildren("/tasks", true, this, null);
		try {
			synchronized (this) {
				while (true) {
					wait();
				}
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}

}

 Worker 实现代码:

public class Worker implements Runnable , Watcher, AsyncCallback.ChildrenCallback {


	private ZooKeeperUtils zkUtils;
	
	private String workerName;
	
	private StringCallback createZNodeCallback = new StringCallback() {
		public void processResult(int rc, String path, Object ctx, String name) {
			switch (Code.get(rc)) {
				case CONNECTIONLOSS:
					createWorkerZNode((byte[])ctx);
					break;
				case OK:
					workerName = 
					name.substring(name.lastIndexOf("/") + 1);
					break;
				case NODEEXISTS:
					break;
				default:
					break;
			}
		}
	};
	
	public Worker(String hostPort) throws IOException {
		zkUtils = new ZooKeeperUtils(new ZooKeeper(hostPort, 15000, this));
	}

	@Override
	public void run() {
		createWorkerZNode("IDLE".getBytes());
		zkUtils.asyncGetChildren("/assign", true, this, null);
		try {
			synchronized (this) {
				while (true) {
					wait();
				}
			}
		} catch (InterruptedException e) {
			
		}
	}

	@Override
	public void processResult(int rc, String path, Object ctx,
			List<String> children) {
		for (String child : children) {
			if (child.equals(workerName)) {
				doTasksProcess();
			}
		}
	}

	private void doTasksProcess() {
		String task = zkUtils.getData("/assign/" + workerName);
		System.out.println("task--->"+task +"; workerName -----> "+workerName);
		zkUtils.setData("/tasks/" + task, "RUNNING"); //在运行服务中
		zkUtils.setData("/workers/" + workerName, "BUSY");  //忙碌状态
		System.out.println("Worker处理业务中--------------");
		for (int i = 0; i < 10; i++) {
			try {
				
				Thread.sleep(1000);
			} catch (InterruptedException e) {					
			}
		}
		System.out.println("Worker处理完毕--------------");
		zkUtils.setData("/tasks/" + task, "DONE"); //该节点服务已经处理成功了
		zkUtils.setData("/workers/" + workerName, "IDLE"); //空闲
		zkUtils.delete("/assign/" + workerName);
		zkUtils.delete("/tasks/" + task);
	}

	@Override
	public void process(WatchedEvent event) {
		zkUtils.asyncGetChildren("/assign", true,this, null);		
	}
	
	private void createWorkerZNode(byte[] data) {
		zkUtils.asyncCreate("/workers/worker-", data, 
		Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL, createZNodeCallback, null);
	}

}

  

Bootstrap 实现代码:

public class Bootstrap {
	
	private static ZooKeeper zooKeeper;
	
	 private static CountDownLatch countDownLatch=new CountDownLatch(1);
	 
	 private static String hostPort="127.0.0.1:2181";
	 
	 private static StringCallback createZNodeCallback = new StringCallback() {

			public void processResult(int rc, String path, Object ctx, String name) {
				switch (Code.get(rc)) {
					case CONNECTIONLOSS:
						createZNode(path, (byte[])ctx);
						break;
					case OK:
						break;
					case NODEEXISTS:
						break;
					default:
						break;
				}
			}
		};
		
	
	public static void main(String[] args) throws Exception {
		
		zooKeeper = new ZooKeeper(hostPort, 15000, new Watcher(){

			@Override
			public void process(WatchedEvent event) {
				System.out.println("---------------receive watch event :"+event);
				if(KeeperState.SyncConnected== event.getState()){
					countDownLatch.countDown();
				}
			}
			
		});
		countDownLatch.await();
		initZNodes();
		Master master = new Master(hostPort);
		master.start();
		
		int workerCount =3;
       ThreadPoolExecutor executor = 
       new ThreadPoolExecutor(10, 20, 60, TimeUnit.SECONDS, new SynchronousQueue<Runnable>());
		for (int i = 0; i < workerCount; i++) {
			executor.execute(new Worker(hostPort));
		}
		
	}
	
	private static void initZNodes() {
		createZNode("/workers", new byte[0]);
		createZNode("/assign", new byte[0]);
		createZNode("/tasks", new byte[0]);
	}

	private static void createZNode(String path, byte[] data) {
		zooKeeper.create(path, data, Ids.OPEN_ACL_UNSAFE, 
		CreateMode.PERSISTENT, createZNodeCallback, data);
	}
	
}

发表评论