kd__8301 2014-09-23 11:18
浏览 371
已采纳

关于zooKeeper,下面这个例子能保证数据安全吗

package zooKeeper.zooKeeperLock;

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.data.Stat;
/**
DistributedLock lock = null;
try {
lock = new DistributedLock("127.0.0.1:2182","test");
lock.lock();
//do something...
} catch (Exception e) {
e.printStackTrace();
}
finally {
if(lock != null)
lock.unlock();
}

  • @author xueliang
    *
    */
    public class DistributedLock implements Lock, Watcher{
    private ZooKeeper zk;
    private String root = "/build";//根
    private String lockName;//竞争资源的标志
    private String waitNode;//等待前一个锁
    private String myZnode;//当前锁
    private CountDownLatch latch;//计数器
    private int sessionTimeout = 30000;
    private List exception = new ArrayList();

    /**

    • 创建分布式锁,使用前请确认config配置的zookeeper服务可用
    • @param config 127.0.0.1:2181
    • @param lockName 竞争资源标志,lockName中不能包含单词lock / public DistributedLock(String config, String lockName){ this.lockName = lockName; // 创建一个与服务器的连接 try { zk = new ZooKeeper(config, sessionTimeout, this); Stat stat = zk.exists(root, false); if(stat == null){ // 创建根节点 zk.create(root, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.PERSISTENT); } } catch (IOException e) { exception.add(e); } catch (KeeperException e) { exception.add(e); } catch (InterruptedException e) { exception.add(e); } } /*
    • zookeeper节点的监视器 */ public void process(WatchedEvent event) { if(this.latch != null) {
      this.latch.countDown();
      } }

    public void lock() {
    if(exception.size() > 0){
    throw new LockException(exception.get(0));
    }
    try {
    if(this.tryLock()){
    System.out.println("Thread " + Thread.currentThread().getId() + " " +myZnode + " get lock true");
    return;
    }
    else{
    waitForLock(waitNode, sessionTimeout);//等待锁
    }
    } catch (KeeperException e) {
    throw new LockException(e);
    } catch (InterruptedException e) {
    throw new LockException(e);
    }
    }
    public boolean tryLock() {
    try {
    String splitStr = "_lock_";
    if(lockName.contains(splitStr))
    throw new LockException("lockName can not contains \u000B");
    //创建临时子节点
    myZnode = zk.create(root + "/" + lockName + splitStr, new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE,CreateMode.EPHEMERAL_SEQUENTIAL);
    System.out.println(myZnode + " is created ");
    //取出所有子节点
    List subNodes = zk.getChildren(root, false);
    //取出所有lockName的锁
    List lockObjNodes = new ArrayList();
    for (String node : subNodes) {
    String _node = node.split(splitStr)[0];
    if(_node.equals(lockName)){
    lockObjNodes.add(node);
    }
    }
    Collections.sort(lockObjNodes);
    System.out.println(myZnode + "==" + lockObjNodes.get(0));
    if(myZnode.equals(root+"/"+lockObjNodes.get(0))){
    //如果是最小的节点,则表示取得锁
    return true;
    }
    //如果不是最小的节点,找到比自己小1的节点
    String subMyZnode = myZnode.substring(myZnode.lastIndexOf("/") + 1);
    waitNode = lockObjNodes.get(Collections.binarySearch(lockObjNodes, subMyZnode) - 1);
    } catch (KeeperException e) {
    throw new LockException(e);
    } catch (InterruptedException e) {
    throw new LockException(e);
    }
    return false;
    }
    public boolean tryLock(long time, TimeUnit unit) {
    try {
    if(this.tryLock()){
    return true;
    }
    return waitForLock(waitNode,time);
    } catch (Exception e) {
    e.printStackTrace();
    }
    return false;
    }
    private boolean waitForLock(String lower, long waitTime) throws InterruptedException, KeeperException {
    Stat stat = zk.exists(root + "/" + lower,true);
    //判断比自己小一个数的节点是否存在,如果不存在则无需等待锁,同时注册监听
    if(stat != null){
    System.out.println("Thread " + Thread.currentThread().getId() + " waiting for " + root + "/" + lower);
    this.latch = new CountDownLatch(1);
    this.latch.await(waitTime, TimeUnit.MILLISECONDS);
    this.latch = null;
    }
    return true;
    }
    public void unlock() {
    try {
    System.out.println("unlock " + myZnode);
    zk.delete(myZnode,-1);
    myZnode = null;
    zk.close();
    } catch (InterruptedException e) {
    e.printStackTrace();
    } catch (KeeperException e) {
    e.printStackTrace();
    }
    }
    public void lockInterruptibly() throws InterruptedException {
    this.lock();
    }
    public Condition newCondition() {
    return null;
    }

    public class LockException extends RuntimeException {
    private static final long serialVersionUID = 1L;
    public LockException(String e){
    super(e);
    }
    public LockException(Exception e){
    super(e);
    }
    }
    }

  • 写回答

2条回答 默认 最新

  • ll89308839 2014-09-23 23:36
    关注

    楼主是自己在用zookeeper做分布式的锁么,官方文档里,recipes有queue、lock、election,这三个例子,直接使用即可

    本回答被题主选为最佳回答 , 对您是否有帮助呢?
    评论
查看更多回答(1条)

报告相同问题?

悬赏问题

  • ¥50 永磁型步进电机PID算法
  • ¥15 sqlite 附加(attach database)加密数据库时,返回26是什么原因呢?
  • ¥88 找成都本地经验丰富懂小程序开发的技术大咖
  • ¥15 如何处理复杂数据表格的除法运算
  • ¥15 如何用stc8h1k08的片子做485数据透传的功能?(关键词-串口)
  • ¥15 有兄弟姐妹会用word插图功能制作类似citespace的图片吗?
  • ¥200 uniapp长期运行卡死问题解决
  • ¥15 latex怎么处理论文引理引用参考文献
  • ¥15 请教:如何用postman调用本地虚拟机区块链接上的合约?
  • ¥15 为什么使用javacv转封装rtsp为rtmp时出现如下问题:[h264 @ 000000004faf7500]no frame?