zookeeper distribute read write lock
By
yuanxiaolong
2015-01-13
更新日期:2016-01-16
利用zookeeper实现分布式读写锁,来协调业务一致
zookeeper实现读写锁,读并发,写等待其他进程释放,并获取锁,执行写逻辑
环境准备 zookeeper实例,单机或伪分布式,全分布式任选,可以参照我上篇文章搭建个伪分布式的。
利用如下命令,利用zk客户端连接到zk实例,其中2182是 zoo.cfg
的clientPort
1 /home/web/xiaolong.yuanxl/zookeeper-3.4 .6 -server-1 //bin/zkCli.sh -server 127.0 .0.1 :2182
新建读锁
和写锁
的路径,/lock/read/
和 /lock/write/
1 2 3 4 5 [zk: 127.0 .0.1 :2182 (CONNECTED) 8 ] create /lock/read read Lock Created /lock/read [zk: 127.0 .0.1 :2182 (CONNECTED) 8 ] create /lock/write writeLock Created /lock/write
由于互斥锁,只需要在不同进程上,create 相同路径的 znode 即可由zk保证,并发下只有1个进程获得锁,十分简单。
这里实现的是,读写锁(读并发、写等待)。
思想
引用 http://www.wuzesheng.com/?p=2609
6.读写锁(Read/Write Lock)
我们知道,读写锁跟互斥锁相比不同的地方是,它分成了读和写两种模式,多个读可以并发执行,但写和读、写都互斥,不能同时执行行。利用ZooKeeper,在上面的基础上,稍做修改也可以实现传统的读写锁的语义,下面是基本的步骤:
每个进程都在ZooKeeper上创建一个临时的顺序结点(Ephemeral Sequential) /locks/lock_${seq}
${seq}最小的一个或多个结点为当前的持锁者,多个是因为多个读可以并发
需要写锁的进程,Watch比它次小的进程对应的结点
需要读锁的进程,Watch比它小的最后一个写进程对应的结点
当前结点释放锁后,所有Watch该结点的进程都会被通知到,他们成为新的持锁者,如此循环反复
解释一下:
1.写并发 时:zk保证了 Ephemeral Sequential
序号自增,且应该根据时序,让节点依次得到 写锁
.
例如有write-01、write-02、write-03,那么 write-02 需要等 write-01 写完后才能执行写动作, write-03 在 write-02后写,这样保持了业务的并发,类似“九连环” 。 场景,对订单付款,产生3个并发,那么 write-01 ,先向账户扣款,同时标记订单状态为已完成。那么 write-02 执行时候,就不会产生重复扣了。
2.读并发 时:由于每个进程并不关心当前其他进程在读什么,相反需要关心最后一个写的进程,不然并发时其他进程未写完时就读,就产生了脏读。
所以,这里要获取 write-MaxNo 并所有读进程,都对这个 write-MaxNo znode 设置 watch,当此 znode 写动作执行完后,触发 deleteNode
事件,回调到 read上的 watcher。
代码 Step 1.入口我们模拟 3个读锁 3个写锁的并发,利用线程池
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 public static final int corePoolSize = 5 ;public static final int maximumPoolSize = 10 ;public static final int keepAliveTime = 3 ;public static final int maximumLinkedQueueSize = 200000 ;private static final ThreadPoolExecutor pool = new ThreadPoolExecutor( corePoolSize, maximumPoolSize, keepAliveTime, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(maximumLinkedQueueSize), new ThreadPoolExecutor.AbortPolicy()); public static void main (String[] args) throws IOException { for (int i = 0 ; i < 3 ; i++) { WriteLockThread thread = new WriteLockThread("Thread-Write-" + (i+1 )); pool.execute(thread); } for (int i = 0 ; i < 3 ; i++) { ReadLockThread thread = new ReadLockThread("Thread-Read-" + (i+1 )); pool.execute(thread); } }
Step 2.线程产生读锁
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 public class ReadLockThread implements Runnable { private static final int TIMEOUT = 3000 ; private static final String WRITE_LOCK_PATH = "/lock/write" ; private static final String WRITE_SIGNAL = "write-" ; private String name; public ReadLockThread (String name) { this .name = name; } @Override public void run () { ZooKeeper zkp = null ; try { zkp = new ZooKeeper("116.211.20.207:2182" , TIMEOUT, null ); zkp.create("/lock/read/read-" , this .name.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); Thread.sleep(1000 ); List<String> children = zkp.getChildren(WRITE_LOCK_PATH, null ); String maxWriteLockName = maxSeq(children); String writeLock = WRITE_LOCK_PATH + "/" + maxWriteLockName; Stat stat = zkp.exists(writeLock , new MyReadBusinessLogic(this .name,zkp)); if (stat != null ) { System.out.println("[read theard] register watcher in " + writeLock + " success!" ); }else { System.out.println("lastest writeLock node has been delete, now I can direct execute read bussiness" ); doReadBussiness(this .name); zkp.close(); } } catch (Exception e) { e.printStackTrace(); } finally { } } private static synchronized String maxSeq (List<String> list) { String maxSeq = "" ; int max = -1 ; if (list != null && !list.isEmpty()) { if (StringUtils.containsIgnoreCase(list.get(0 ), WRITE_SIGNAL)) { String m = StringUtils.remove(list.get(0 ), WRITE_SIGNAL); max = Integer.parseInt(m); maxSeq = list.get(0 ); } for (int i = 1 ; i < list.size(); i++) { if (StringUtils.containsIgnoreCase(list.get(i), WRITE_SIGNAL)) { int temp = Integer.parseInt(StringUtils.remove(list.get(i), WRITE_SIGNAL)); if (max < temp){ max = temp; maxSeq = list.get(i); } } } } return maxSeq; } private static synchronized void doReadBussiness (String bussinessNo) { System.out.println("do read My Bussiness! " + bussinessNo); } }
Step 3.产生写锁线程
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70 71 72 73 74 75 76 77 78 79 80 package com.yxl.lock;import java.util.Collections;import java.util.List;import org.apache.commons.lang.StringUtils;import org.apache.zookeeper.CreateMode;import org.apache.zookeeper.ZooDefs.Ids;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.data.Stat; * 创建线程,产生并发写 * * @author xiaolong.yuanxl */ public class WriteLockThread implements Runnable { private static final int TIMEOUT = 3000 ; private static final String WRITE_LOCK_PATH = "/lock/write" ; private String name; public WriteLockThread (String name) { this .name = name; } @Override public void run () { ZooKeeper zkp = null ; try { zkp = new ZooKeeper("116.211.20.207:2182" , TIMEOUT, null ); String currentPath = zkp.create(WRITE_LOCK_PATH + "/write-" , this .name.getBytes(), Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL_SEQUENTIAL); System.out.println("current: " + currentPath); Thread.sleep(6000 ); List<String> children = zkp.getChildren(WRITE_LOCK_PATH, null ); Collections.sort(children); String current = StringUtils.remove(currentPath, WRITE_LOCK_PATH + "/" ); int index = children.indexOf(current); if (-1 != index && index != 0 ) { System.out.println(this .name + " mine: " + current + " preview: " + children.get(index-1 )); String previewNodePath = WRITE_LOCK_PATH + "/" + children.get(index-1 ); Stat stat = zkp.exists(previewNodePath , new MyWriteBusinessLogic(this .name,zkp)); if (stat != null ) { System.out.println("[write thread] register watcher in " + previewNodePath + " success!" ); }else { System.out.println("preview node has been delete, now I can direct execute write bussiness" ); doWriteBussiness(this .name); zkp.close(); } }else { Thread.sleep(3000 ); zkp.close(); } } catch (Exception e) { e.printStackTrace(); } finally { } } private static synchronized void doWriteBussiness (String bussinessNo) { System.out.println("do write My Bussiness! " + bussinessNo); } }
Step 4.读锁watcher回调的业务逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 package com.yxl.lock;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.Watcher.Event.EventType; * 读锁watch回调 * * @author xiaolong.yuanxl */ public class MyReadBusinessLogic implements Watcher { private ZooKeeper zkp; private String bussinessNo; public MyReadBusinessLogic (String bussinessNo,ZooKeeper zkp) { this .bussinessNo = bussinessNo; this .zkp = zkp; } @Override public void process (WatchedEvent event) { if (EventType.NodeDeleted.getIntValue() == event.getType().getIntValue()) { System.out.println("[callback read thread] max write node has been deleted, path: " + event.getPath()); doReadBussiness(bussinessNo,event.getPath()); try { zkp.close(); } catch (Exception e) { e.printStackTrace(); } } } private static void doReadBussiness (String bussinessNo,String path) { System.out.println("path deleted! " + path + " do My read Bussiness! " + bussinessNo); } }
Step 5.写锁watcher回调的业务逻辑
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 package com.yxl.lock;import org.apache.zookeeper.WatchedEvent;import org.apache.zookeeper.Watcher;import org.apache.zookeeper.ZooKeeper;import org.apache.zookeeper.Watcher.Event.EventType; * 写锁watch回调 * * @author xiaolong.yuanxl */ public class MyWriteBusinessLogic implements Watcher { private ZooKeeper zkp; private String bussinessNo; public MyWriteBusinessLogic (String bussinessNo,ZooKeeper zkp) { this .bussinessNo = bussinessNo; this .zkp = zkp; } @Override public void process (WatchedEvent event) { if (EventType.NodeDeleted.getIntValue() == event.getType().getIntValue()) { System.out.println("[callback write thread] preview node has been deleted, path: " + event.getPath()); doBussiness(bussinessNo,event.getPath()); try { zkp.close(); } catch (Exception e) { e.printStackTrace(); } } } private static void doBussiness (String bussinessNo,String path) { System.out.println("path deleted! " + path + " do My write Bussiness! " + bussinessNo); } }
Step 6.执行后输出类似如下,可见 read thread 同时对最后一个写锁znode write-0000000040
添加 watcher,等待最大写锁znode写完毕close后,触发watcher执行逻辑 ,同时写锁 write-0000000040
向write-0000000039
加watcher,write-0000000039
向write-0000000038
加watcher,write-0000000038
由于没有 前一个节点,不加wathcer
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 current: /lock/write/write-0000000038 current: /lock/write/write-0000000039 current: /lock/write/write-0000000040 [read theard] register watcher in /lock/write/write-0000000040 success! [read theard] register watcher in /lock/write/write-0000000040 success! [read theard] register watcher in /lock/write/write-0000000040 success! Thread-Write-2 mine: write-0000000040 preview: write-0000000039 Thread-Write-3 mine: write-0000000039 preview: write-0000000038 [write thread] register watcher in /lock/write/write-0000000038 success! [write thread] register watcher in /lock/write/write-0000000039 success! [callback write thread] preview node has been deleted, path: /lock/write/write-0000000038 path deleted! /lock/write/write-0000000038 do My write Bussiness! Thread-Write-3 [callback write thread] preview node has been deleted, path: /lock/write/write-0000000039 path deleted! /lock/write/write-0000000039 do My write Bussiness! Thread-Write-2 [callback read thread] max write node has been deleted, path: /lock/write/write-0000000040 [callback read thread] max write node has been deleted, path: /lock/write/write-0000000040 [callback read thread] max write node has been deleted, path: /lock/write/write-0000000040 path deleted! /lock/write/write-0000000040 do My read Bussiness! Thread-Read-3 path deleted! /lock/write/write-0000000040 do My read Bussiness! Thread-Read-2 path deleted! /lock/write/write-0000000040 do My read Bussiness! Thread-Read-1
PS: 所有代码在 Github 上,clone后 运行 mvn eclipse:eclipse
获取依赖