ReentrantLock工作原理及源码分析

使用 synchronized 来做同步处理时,锁的获取和释放都是隐式的,实现的原理是通过编译后加上不同的机器指令来实现。

而 ReentrantLock 就是一个普通的类,它是基于 AQS(AbstractQueuedSynchronizer)来实现的。

是一个重入锁:一个线程获得了锁之后仍然可以反复的加锁,不会出现自己阻塞自己的情况。

AQS 是 Java 并发包里实现锁、同步的一个重要的基础框架。

锁类型

排它锁

ReentrantLock继承自父类Lock,然后有3个内部类,其中Sync内部类继承自AQS,另外的两个内部类继承自Sync分别实现公平锁和非公平锁。

    abstract static class Sync extends AbstractQueuedSynchronizer {...}

    static final class NonfairSync extends Sync {...}

    static final class FairSync extends Sync {...}

ReentrantLock通过Sync实现的是AQS的独占模式,也就是独占锁,这个锁是悲观锁。

公平锁与非公平锁

ReentrantLock 分为公平锁和非公平锁,可以通过构造方法来指定具体类型:

    public ReentrantLock() {//默认非公平
        sync = new NonfairSync();
    }
    public ReentrantLock(boolean fair) {
        sync = fair ? new FairSync() : new NonfairSync();//可指定为公平锁
    }

默认一般使用非公平锁,它的效率和吞吐量都比公平锁高的多。

获取锁

使用方式

通常的使用方式如下:

    private ReentrantLock lock = new ReentrantLock();
    public void run() {
        lock.lock();
        try {
            //do bussiness
        } catch (InterruptedException e) {
            e.printStackTrace();
        } finally {
            lock.unlock();//为保证一定释放锁,unlock一定要在finally
        }
    }

非公平锁获取锁

首先看下获取锁的过程:

    public void lock() {
        sync.lock();
    }

可以看到是使用 sync的方法,而这个方法是一个抽象方法,具体是由其子类(FairSync)来实现的,以下是公平锁的实现:

        final void lock() {
            acquire(1);
        }
    //同步队列中有线程 且 锁的所有者不是当前线程那么将线程加入到同步队列的尾部,
    //保证了公平性,也就是先来的线程先获得锁,后来的不能抢先获取。
    public final void acquire(int arg) {//AbstractQueuedSynchronizer 中的 acquire()
        if (!tryAcquire(arg) &&
            acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
            selfInterrupt();
    }
  1. tryAcquire()尝试直接去获取资源,如果成功则直接返回。
  2. addWaiter()将该线程加入等待队列的尾部,并标记为独占模式。
  3. acquireQueued()使线程阻塞在等待队列中获取资源,一直获取到资源后才返回。如果在整个等待过程中被中断过,则返回true,否则返回false。
  4. 如果线程在等待过程中被中断过,它是不响应的。只是获取资源后才再进行自我中断selfInterrupt(),将中断补上。

第一步是尝试获取锁(tryAcquire(arg)),这个也是由其子类实现:

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {//判断状态state是否等于0,等于0代表锁没有被占用,不等于0则代表锁被占用着
                //调用hasQueuedPredecessors方法判断同步队列中是否有线程在等待,如果同步队列中没有
                //线程在等待 则当前线程成为锁的所有者,如果同步队列中有线程在等待,则继续往下执行
                //这个机制就是公平锁的机制,也就是先让先来的线程获取锁,后来的不能抢先获取
                if (!hasQueuedPredecessors() &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {//判断当前线程是否为锁的所有者,如果是,那么直接更新状态state,然后返回true
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false; //如果同步队列中有线程存在 且 锁的所有者不是当前线程,则返回false
        }

首先会判断 AQS 中的 state 是否等于 0,0 表示目前没有其他线程获得锁,当前线程就可以尝试获取锁。

注意:尝试之前会利用 hasQueuedPredecessors() 方法来判断 AQS 的队列中中是否有其他线程,如果有则不会尝试获取锁。这实现了锁获取的公平性,即锁的获取按照先来先得的顺序,后来的不能抢先获取锁。

如果队列中没有线程就利用 CAS 来将 AQS 中的 state 修改为1,也就是获取锁,获取成功则将当前线程置为获得锁的独占线程(setExclusiveOwnerThread(current))。

如果 state 大于 0 时,说明锁已经被获取了,则需要判断获取锁的线程是否为当前线程(ReentrantLock 支持重入),是则需要将 state + 1,并将值更新。
流程图如下:

写入队列

如果 tryAcquire(arg) 获取锁失败,则需要用 addWaiter(Node.EXCLUSIVE) 将当前线程写入队列中。

写入之前需要将当前线程包装为一个 Node 对象(addWaiter(Node.EXCLUSIVE))。

AQS 中的队列是由 Node 节点组成的双向链表实现的。

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);//以给定模式构造结点。mode有两种:EXCLUSIVE(独占)和SHARED(共享)
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);//上一步失败则通过enq入队
        return node;
    }

首先判断队列是否为空,不为空时则将封装好的 Node 利用 CAS 写入队尾,如果出现并发写入失败就需要调用 enq(node)来写入了。

    private Node enq(final Node node) {
        for (;;) { //CAS"自旋",直到成功加入队尾
            Node t = tail;
            if (t == null) { // Must initialize 队列为空,创建一个空的标志结点作为head结点,并将tail也指向它
                if (compareAndSetHead(new Node()))
                    tail = head;
            } else {//正常流程,放入队尾
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

这个处理逻辑就相当于自旋加上 CAS 保证一定能写入队列。CAS自旋volatile变量,一种很经典的用法!

挂起等待线程

写入队列之后需要将当前线程挂起(利用acquireQueued(addWaiter(Node.EXCLUSIVE), arg))

    final boolean acquireQueued(final Node node, int arg) {
        boolean failed = true;//标记是否成功拿到资源
        try {
            boolean interrupted = false;//标记等待过程中是否被中断过
            for (;;) {//又是一个“自旋”!
                final Node p = node.predecessor();//拿到前驱
                if (p == head && tryAcquire(arg)) { //如果前驱是head,即该结点已成老二,那么便有资格去尝试获取资源(可能是老大释放完资源唤醒自己的,当然也可能被interrupt了)
                    setHead(node);//拿到资源后,将head指向该结点。所以head所指的标杆结点,就是当前获取到资源的那个结点或null
                    p.next = null; // help GC setHead中node.prev已置为null,此处再将head.next置为null,就是为了方便GC回收以前的head结点。也就意味着之前拿完资源的结点出队了!
                    failed = false; // 成功获取资源
                    return interrupted;//返回等待过程中是否被中断过
                }
                 //如果自己可以休息了,就通过park()进入waiting状态,直到被unpark()。如果不可中断的情况下被中断了,那么会从park()中醒过来,发现拿不到资源,从而继续进入park()等待。
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;//如果等待过程中被中断过,哪怕只有那么一次,就将interrupted标记为true
            }
        } finally {
            if (failed)// 如果等待过程中没有成功获取资源(如timeout,或者可中断的情况下被中断了),那么取消结点在队列中的等待。
                cancelAcquire(node);
        }
    }

首先会根据 node.predecessor() 获取到上一个节点是否为头节点,如果是则尝试获取一次锁,获取成功就万事大吉了。

如果不是头节点,或者获取锁失败,则会根据上一个节点的 waitStatus 状态来处理(shouldParkAfterFailedAcquire(p, node))。

waitStatus 用于记录当前节点的状态,如节点取消、节点等待等。

shouldParkAfterFailedAcquire(p, node) 返回当前线程是否需要挂起,如果需要则调用 parkAndCheckInterrupt():

    private final boolean parkAndCheckInterrupt() {
        LockSupport.park(this);//调用park()使线程进入waiting状态
        return Thread.interrupted();//如果被唤醒,查看自己是不是被中断的
    }

他是利用 LockSupport 的 part 方法来挂起当前线程的,直到被唤醒。

非公平锁获取锁

公平锁与非公平锁的差异主要在获取锁:

公平锁就相当于买票,后来的人需要排到队尾依次买票,不能插队。

而非公平锁则没有这些规则,是抢占模式,每来一个人不会去管队列如何,直接尝试获取锁。

非公平锁:

        final void lock() {
            if (compareAndSetState(0, 1))//直接尝试获取锁
                setExclusiveOwnerThread(Thread.currentThread());
            else
                acquire(1);
        }

还要一个重要的区别是在尝试获取锁时tryAcquire(arg),非公平锁是不需要判断队列中是否还有其他线程,也是直接尝试获取锁:

        final boolean nonfairTryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {//如果状态state=0,即在这段时间内 锁的所有者把锁释放了 那么这里state就为0
                if (compareAndSetState(0, acquires)) {//没有 !hasQueuedPredecessors() 判断
            //操作成功 则将锁的所有者设置成当前线程 且返回true,也就是当前线程不会进入同步队列
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {//如果状态state不等于0,也就是有线程正在占用锁,那么先检查一下这个线程是不是自己
                int nextc = c + acquires;//如果线程就是自己了,那么直接将state+1,返回true,不需要再获取锁 因为锁就在自己身上了
                if (nextc < 0) // overflow
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false; //如果state不等于0,且锁的所有者又不是自己,那么线程就会进入到同步队列
        }

其流程图如下:

释放锁

公平锁和非公平锁的释放流程都是一样的:

    public void unlock() {
        sync.release(1);
    }

    public final boolean release(int arg) {
        if (tryRelease(arg)) { //子类重写的tryRelease方法,需要等锁的state=0,即tryRelease返回true的时候,才会去唤醒其它线程进行尝试获取锁。
            Node h = head;
            if (h != null && h.waitStatus != 0)//唤醒被挂起的线程
                unparkSuccessor(h);
            return true;
        }
        return false;
    }
    protected final boolean tryRelease(int releases) {//尝试释放锁
            int c = getState() - releases;//状态的state减去releases
            if (Thread.currentThread() != getExclusiveOwnerThread()) //判断锁的所有者是不是该线程
                throw new IllegalMonitorStateException();//如果所的所有者不是该线程 则抛出异常 也就是锁释放的前提是线程拥有这个锁,
            boolean free = false;
            if (c == 0) {//如果该线程释放锁之后 状态state=0,即锁没有重入,那么直接将将锁的所有者设置成null并且返回true,即代表可以唤醒其他线程去获取锁了。如果该线程释放锁之后state不等于0,那么代表锁重入了,返回false,代表锁还未正在释放,不用去唤醒其他线程。
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

首先会判断当前线程是否为获得锁的线程,由于是重入锁所以需要将 state 减到 0 才认为完全释放锁。

释放之后需要调用 unparkSuccessor(h) 来唤醒被挂起的线程。
流程图如下:

总结

由于公平锁需要关心队列的情况,得按照队列里的先后顺序来获取锁(会造成大量的线程上下文切换),而非公平锁则没有这个限制。

所以也就能解释非公平锁的效率会被公平锁更高。

# java 

评论

Your browser is out-of-date!

Update your browser to view this website correctly. Update my browser now

×