Java并发编程

Java并发编程之ConcurrentHashMap

1

本篇文章已经发表在GoldenDoc上,欢迎大家访问GoldenDoc,在自己的博客这里仅仅做一次备份。

ConcurrentHashMap

ConcurrentHashMap是一个线程安全的Hash Table,它的主要功能是提供了一组和HashTable功能相同但是线程安全的方法。ConcurrentHashMap可以做到读取数据不加锁,并且其内部的结构可以让其在进行写操作的时候能够将锁的粒度保持地尽量地小,不用对整个ConcurrentHashMap加锁。

ConcurrentHashMap的内部结构

ConcurrentHashMap为了提高本身的并发能力,在内部采用了一个叫做Segment的结构,一个Segment其实就是一个类Hash Table的结构,Segment内部维护了一个链表数组,我们用下面这一幅图来看下ConcurrentHashMap的内部结构:
图表1
从上面的结构我们可以了解到,ConcurrentHashMap定位一个元素的过程需要进行两次Hash操作,第一次Hash定位到Segment,第二次Hash定位到元素所在的链表的头部,因此,这一种结构的带来的副作用是Hash的过程要比普通的HashMap要长,但是带来的好处是写操作的时候可以只对元素所在的Segment进行加锁即可,不会影响到其他的Segment,这样,在最理想的情况下,ConcurrentHashMap可以最高同时支持Segment数量大小的写操作(刚好这些写操作都非常平均地分布在所有的Segment上),所以,通过这一种结构,ConcurrentHashMap的并发能力可以大大的提高。

Segment

我们再来具体了解一下Segment的数据结构:

    static final class Segment<K,V> extends ReentrantLock implements Serializable {
        transient volatile int count;
        transient int modCount;
        transient int threshold;
        transient volatile HashEntry<K,V>[] table;
        final float loadFactor;
    }

详细解释一下Segment里面的成员变量的意义:

  • count:Segment中元素的数量
  • modCount:对table的大小造成影响的操作的数量(比如put或者remove操作)
  • threshold:阈值,Segment里面元素的数量超过这个值依旧就会对Segment进行扩容
  • table:链表数组,数组中的每一个元素代表了一个链表的头部
  • loadFactor:负载因子,用于确定threshold

HashEntry

Segment中的元素是以HashEntry的形式存放在链表数组中的,看一下HashEntry的结构:

    static final class HashEntry<K,V> {
        final K key;
        final int hash;
        volatile V value;
        final HashEntry<K,V> next;
    }

可以看到HashEntry的一个特点,除了value以外,其他的几个变量都是final的,这样做是为了防止链表结构被破坏,出现ConcurrentModification的情况。

ConcurrentHashMap的初始化

下面我们来结合源代码来具体分析一下ConcurrentHashMap的实现,先看下初始化方法:

    public ConcurrentHashMap(int initialCapacity,
                             float loadFactor, int concurrencyLevel) {
        if (!(loadFactor > 0) || initialCapacity < 0 || concurrencyLevel <= 0)
            throw new IllegalArgumentException();

        if (concurrencyLevel > MAX_SEGMENTS)
            concurrencyLevel = MAX_SEGMENTS;

        // Find power-of-two sizes best matching arguments
        int sshift = 0;
        int ssize = 1;
        while (ssize < concurrencyLevel) {
            ++sshift;
            ssize <<= 1;
        }
        segmentShift = 32 - sshift;
        segmentMask = ssize - 1;
        this.segments = Segment.newArray(ssize);

        if (initialCapacity > MAXIMUM_CAPACITY)
            initialCapacity = MAXIMUM_CAPACITY;
        int c = initialCapacity / ssize;
        if (c * ssize < initialCapacity)
            ++c;
        int cap = 1;
        while (cap < c)
            cap <<= 1;

        for (int i = 0; i < this.segments.length; ++i)
            this.segments[i] = new Segment<K,V>(cap, loadFactor);
    }

CurrentHashMap的初始化一共有三个参数,一个initialCapacity,表示初始的容量,一个loadFactor,表示负载参数,最后一个是concurrentLevel,代表ConcurrentHashMap内部的Segment的数量,ConcurrentLevel一经指定,不可改变,后续如果ConcurrentHashMap的元素数量增加导致ConrruentHashMap需要扩容,ConcurrentHashMap不会增加Segment的数量,而只会增加Segment中链表数组的容量大小,这样的好处是扩容过程不需要对整个ConcurrentHashMap做rehash,而只需要对Segment里面的元素做一次rehash就可以了。

整个ConcurrentHashMap的初始化方法还是非常简单的,先是根据concurrentLevel来new出Segment,这里Segment的数量是不大于concurrentLevel的最大的2的指数,就是说Segment的数量永远是2的指数个,这样的好处是方便采用移位操作来进行hash,加快hash的过程。接下来就是根据intialCapacity确定Segment的容量的大小,每一个Segment的容量大小也是2的指数,同样使为了加快hash的过程。

这边需要特别注意一下两个变量,分别是segmentShift和segmentMask,这两个变量在后面将会起到很大的作用,假设构造函数确定了Segment的数量是2的n次方,那么segmentShift就等于32减去n,而segmentMask就等于2的n次方减一。

ConcurrentHashMap的get操作

前面提到过ConcurrentHashMap的get操作是不用加锁的,我们这里看一下其实现:

    public V get(Object key) {
        int hash = hash(key.hashCode());
        return segmentFor(hash).get(key, hash);
    }

看第三行,segmentFor这个函数用于确定操作应该在哪一个segment中进行,几乎对ConcurrentHashMap的所有操作都需要用到这个函数,我们看下这个函数的实现:

    final Segment<K,V> segmentFor(int hash) {
        return segments[(hash >>> segmentShift) & segmentMask];
    }

这个函数用了位操作来确定Segment,根据传入的hash值向右无符号右移segmentShift位,然后和segmentMask进行与操作,结合我们之前说的segmentShift和segmentMask的值,就可以得出以下结论:假设Segment的数量是2的n次方,根据元素的hash值的高n位就可以确定元素到底在哪一个Segment中。

在确定了需要在哪一个segment中进行操作以后,接下来的事情就是调用对应的Segment的get方法:

        V get(Object key, int hash) {
            if (count != 0) { // read-volatile
                HashEntry<K,V> e = getFirst(hash);
                while (e != null) {
                    if (e.hash == hash && key.equals(e.key)) {
                        V v = e.value;
                        if (v != null)
                            return v;
                        return readValueUnderLock(e); // recheck
                    }
                    e = e.next;
                }
            }
            return null;
        }

先看第二行代码,这里对count进行了一次判断,其中count表示Segment中元素的数量,我们可以来看一下count的定义:

transient volatile int count;

可以看到count是volatile的,实际上这里里面利用了volatile的语义:

对volatile字段的写入操作happens-before于每一个后续的同一个字段的读操作。

因为实际上put、remove等操作也会更新count的值,所以当竞争发生的时候,volatile的语义可以保证写操作在读操作之前,也就保证了写操作对后续的读操作都是可见的,这样后面get的后续操作就可以拿到完整的元素内容。

然后,在第三行,调用了getFirst()来取得链表的头部:

        HashEntry<K,V> getFirst(int hash) {
            HashEntry<K,V>[] tab = table;
            return tab[hash & (tab.length - 1)];
        }

同样,这里也是用位操作来确定链表的头部,hash值和HashTable的长度减一做与操作,最后的结果就是hash值的低n位,其中n是HashTable的长度以2为底的结果。

在确定了链表的头部以后,就可以对整个链表进行遍历,看第4行,取出key对应的value的值,如果拿出的value的值是null,则可能这个key,value对正在put的过程中,如果出现这种情况,那么就加锁来保证取出的value是完整的,如果不是null,则直接返回value。

ConcurrentHashMap的put操作

看完了get操作,再看下put操作,put操作的前面也是确定Segment的过程,这里不再赘述,直接看关键的segment的put方法:

        V put(K key, int hash, V value, boolean onlyIfAbsent) {
            lock();
            try {
                int c = count;
                if (c++ > threshold) // ensure capacity
                    rehash();
                HashEntry<K,V>[] tab = table;
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue;
                if (e != null) {
                    oldValue = e.value;
                    if (!onlyIfAbsent)
                        e.value = value;
                }
                else {
                    oldValue = null;
                    ++modCount;
                    tab[index] = new HashEntry<K,V>(key, hash, first, value);
                    count = c; // write-volatile
                }
                return oldValue;
            } finally {
                unlock();
            }
        }

首先对Segment的put操作是加锁完成的,然后在第五行,如果Segment中元素的数量超过了阈值(由构造函数中的loadFactor算出)这需要进行对Segment扩容,并且要进行rehash,关于rehash的过程大家可以自己去了解,这里不详细讲了。

第8和第9行的操作就是getFirst的过程,确定链表头部的位置。

第11行这里的这个while循环是在链表中寻找和要put的元素相同key的元素,如果找到,就直接更新更新key的value,如果没有找到,则进入21行这里,生成一个新的HashEntry并且把它加到整个Segment的头部,然后再更新count的值。

ConcurrentHashMap的remove操作

Remove操作的前面一部分和前面的get和put操作一样,都是定位Segment的过程,然后再调用Segment的remove方法:

        V remove(Object key, int hash, Object value) {
            lock();
            try {
                int c = count - 1;
                HashEntry<K,V>[] tab = table;
                int index = hash & (tab.length - 1);
                HashEntry<K,V> first = tab[index];
                HashEntry<K,V> e = first;
                while (e != null && (e.hash != hash || !key.equals(e.key)))
                    e = e.next;

                V oldValue = null;
                if (e != null) {
                    V v = e.value;
                    if (value == null || value.equals(v)) {
                        oldValue = v;
                        // All entries following removed node can stay
                        // in list, but all preceding ones need to be
                        // cloned.
                        ++modCount;
                        HashEntry<K,V> newFirst = e.next;
                        for (HashEntry<K,V> p = first; p != e; p = p.next)
                            newFirst = new HashEntry<K,V>(p.key, p.hash,
                                                          newFirst, p.value);
                        tab[index] = newFirst;
                        count = c; // write-volatile
                    }
                }
                return oldValue;
            } finally {
                unlock();
            }
        }

首先remove操作也是确定需要删除的元素的位置,不过这里删除元素的方法不是简单地把待删除元素的前面的一个元素的next指向后面一个就完事了,我们之前已经说过HashEntry中的next是final的,一经赋值以后就不可修改,在定位到待删除元素的位置以后,程序就将待删除元素前面的那一些元素全部复制一遍,然后再一个一个重新接到链表上去,看一下下面这一幅图来了解这个过程:
1
假设链表中原来的元素如上图所示,现在要删除元素3,那么删除元素3以后的链表就如下图所示:
2

ConcurrentHashMap的size操作

在前面的章节中,我们涉及到的操作都是在单个Segment中进行的,但是ConcurrentHashMap有一些操作是在多个Segment中进行,比如size操作,ConcurrentHashMap的size操作也采用了一种比较巧的方式,来尽量避免对所有的Segment都加锁。

前面我们提到了一个Segment中的有一个modCount变量,代表的是对Segment中元素的数量造成影响的操作的次数,这个值只增不减,size操作就是遍历了两次Segment,每次记录Segment的modCount值,然后将两次的modCount进行比较,如果相同,则表示期间没有发生过写入操作,就将原先遍历的结果返回,如果不相同,则把这个过程再重复做一次,如果再不相同,则就需要将所有的Segment都锁住,然后一个一个遍历了,具体的实现大家可以看ConcurrentHashMap的源码,这里就不贴了。

Java并发编程J.U.C之Condition

0

本篇文章已经发表在GoldenDoc上,欢迎大家访问GoldenDoc,在自己的博客这里仅仅做一次备份。

在上一篇中,我们了解了下J.U.C的锁的获取与释放的过程,这个过程主要通过在A.Q.S中维持一个等待队列来实现,其中我们也提到了,在A.Q.S中除了一个等待队列之外,还有一个Condition队列,在了解Condition队列之前,先来看一下Condition是怎么回事:

The synchronizer framework provides a ConditionObject class for use by synchronizers that maintain exclusive synchronization and conform to the Lock interface. Any number of condition objects may be attached to a lock object, providing classic monitor-style await, signal, and signalAll operations, including those with timeouts, along with some inspection and monitoring methods.

上面的这一段内容摘自Doug Lea的AQS论文,从上面这一段话可以看出,Condition主要是为了在J.U.C框架中提供和Java传统的监视器风格的wait,notify和notifyAll方法类似的功能,那么先来解释一下这三个方法的作用:

  • Object.wait()方法:使当前线程释放Object上的监视器并且挂起,直到有另外的线程调用Object.notify()方法或者Object.notifyAll()方法唤醒当前线程,当被唤醒后,Object.wait()方法会尝试重新获取监视器,成功获取后继续往下执行。注意Object.wait()方法只有在当前线程持有Object的监视器的时候才能够调用,不然会抛出异常。
  • Object.notify()方法:用于唤醒另外一个调用了Object.wait()方法的线程,如果有多个都调用了Object.wait()方法,那么就会选择一个线程去notify(),具体选择哪一个和具体的实现有关,当前线程在调用Object.notify()方法以后会就释放Object的监视器,和wait()方法一样,Object.notify()方法只有在当前线程只有Object的监视器的时候才能够调用,不然就会抛出异常。
  • Object.notifyAll()方法:唤醒所有调用了Object.wait()方法的线程,如果有多个线程调用了Object.wait()方法,那么就会引发这些线程之间的竞争,最后谁成功获取到Object的监视器和具体的实现有关,当前线程在调用Object.notifyAll()方法以后会就释放Object的监视器,和wait()方法一样,Object.notifyAll()方法只有在当前线程只有Object的监视器的时候才能够调用,不然就会抛出异常。

那么Condition是如何实现wait,notify和notifyAll方法的功能呢?我们接下来看:

在Condition中,wait,notify和notifyAll方法分别对应了await,signal和signalAll方法,当然Condition也提供了超时的、不可被中断的await()方法,不过我们主要还是看一看await,notify和notifyAll的实现,先看await:

await方法:

        public final void await() throws InterruptedException {
            if (Thread.interrupted())
                throw new InterruptedException();
            Node node = addConditionWaiter();
            int savedState = fullyRelease(node);
            int interruptMode = 0;
            while (!isOnSyncQueue(node)) {
                LockSupport.park(this);
                if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
                    break;
            }
            if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
                interruptMode = REINTERRUPT;
            if (node.nextWaiter != null)
                unlinkCancelledWaiters();
            if (interruptMode != 0)
                reportInterruptAfterWait(interruptMode);
        }

整个await的过程如下:

  1. 在第2行处,如果当前线程被中断,则抛出中断异常。
  2. 在第4行处,将节点加入到Condition队列中去,这里如果lastWaiter是cancel状态,那么会把它踢出Condition队列。
  3. 在第5行处,调用tryRelease,释放当前线程的锁
  4. 在第7行处,判断节点是否在等待队列中(signal操作会将Node从Condition队列中拿出并且放入到等待队列中去),如果不在等待队列中了,就park当前线程,如果在,就退出循环,这个时候如果被中断,那么就退出循环
  5. 在第12行处,这个时候线程已经被signal()或者signalAll()操作给唤醒了,退出了4中的while循环,尝试再次获取锁,调用acquireQueued方法。

可以看到,这个await的操作过程和Object.wait()方法是一样,只不过await()采用了Condition队列的方式实现了Object.wait()的功能。

signal和signalAll方法:

在了解了await方法的实现以后,signal和signalAll方法的实现就相对简单了,先看看signal方法:

        public final void signal() {
            if (!isHeldExclusively())
                throw new IllegalMonitorStateException();
            Node first = firstWaiter;
            if (first != null)
                doSignal(first);
        }

这里先判断当前线程是否持有锁,如果没有持有,则抛出异常,然后判断整个condition队列是否为空,不为空则调用doSignal方法来唤醒线程,看看doSignal方法都干了一些什么:

        private void doSignal(Node first) {
            do {
                if ( (firstWaiter = first.nextWaiter) == null)
                    lastWaiter = null;
                first.nextWaiter = null;
            } while (!transferForSignal(first) &&
                     (first = firstWaiter) != null);
        }

这个while循环的作用就是将firstWaiter往Condition队列的后面移一位,并且唤醒first,看看while循环中tranferForSignal:

    final boolean transferForSignal(Node node) {
        /*
         * If cannot change waitStatus, the node has been cancelled.
         */
        if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))
            return false;

        /*
         * Splice onto queue and try to set waitStatus of predecessor to
         * indicate that thread is (probably) waiting. If cancelled or
         * attempt to set waitStatus fails, wake up to resync (in which
         * case the waitStatus can be transiently and harmlessly wrong).
         */
        Node p = enq(node);
        int c = p.waitStatus;
        if (c > 0 || !compareAndSetWaitStatus(p, c, Node.SIGNAL))
            LockSupport.unpark(node.thread);
        return true;
    }

这段代码的作用就是修改Node的waitStatus为0,然后将Node插入到等待队列中,并且唤醒Node。

signalAll和signal方法类似,主要的不同在于它不是调用doSignal方法,而是调用doSignalAll方法:

        private void doSignalAll(Node first) {
            lastWaiter = firstWaiter  = null;
            do {
                Node next = first.nextWaiter;
                first.nextWaiter = null;
                transferForSignal(first);
                first = next;
            } while (first != null);
        }

这个方法就相当于把Condition队列中的所有Node全部取出插入到等待队列中去。

总结:

在了解了await(),signal()和signalAll方法的实现以后,我们再来通过一副gif动画来看一看这一个整体的过程:

Java并发编程J.U.C之锁的获取与释放

0

上一篇文章中,我们了解了一下J.U.C的AQS内部的一些实现细节,在这一篇文章我们将来以ReentrantLock为例,来分析一下锁的获取和释放的过程,让大家能够对锁的获取和释放的整体过程有一个了解。

一、锁的获取

先看下ReentrantLock的lock()方法,整个方法只有一行,调用acquire方法,看看acquire方法的实现:

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        selfInterrupt();
}

这段代码的实现也是比较简洁,先尝试一次tryAcquire操作,如果失败,则把当前线程加入到同步队列中去,这个时候可能会反复的阻塞与唤醒这个线程,直到后续的tryAcquire(看acquireQueued的实现)操作成功。

在看看tryAcquire的实现:

        protected final boolean tryAcquire(int acquires) {
            final Thread current = Thread.currentThread();
            int c = getState();
            if (c == 0) {
                if (isFirst(current) &&
                    compareAndSetState(0, acquires)) {
                    setExclusiveOwnerThread(current);
                    return true;
                }
            }
            else if (current == getExclusiveOwnerThread()) {
                int nextc = c + acquires;
                if (nextc < 0)
                    throw new Error("Maximum lock count exceeded");
                setState(nextc);
                return true;
            }
            return false;
        }

这段代码是尝试获取锁的过程,它先判断当前的AQS的state值,如果为0,则表示该锁没有被持有过,如果这个时候同步队列是空的或者当前线程就是在同步队列的头部,那么修改state的值,并且设置排他锁的持有线程为当前线程

如果大于0,则判断当前线程是否是排他锁的持有线程,如果是,那么把state值加1(注意state是int类型的,所以state的最大值是就是int的最大值)
如果第一次tryAcquire()操作失败,那么就把当前线程加入到等待队列中去,看addWaiter()方法:

    private Node addWaiter(Node mode) {
        Node node = new Node(Thread.currentThread(), mode);
        // Try the fast path of enq; backup to full enq on failure
        Node pred = tail;
        if (pred != null) {
            node.prev = pred;
            if (compareAndSetTail(pred, node)) {
                pred.next = node;
                return node;
            }
        }
        enq(node);
        return node;
    }

这段代码中先尝试了一下了下enq()方法中等待队列不为空的情况,如果失败,再调用enq()方法将当前线程加入等待队列,enq()的过程我们已经在上一篇文章中讲过了,不再赘述。

最后在当前线程被加入到等待队列中去以后,再调用acquireQueued去获取锁,看看acquireQueued的代码:

    final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }

这段代码中拿到当前线程在同步队列中的前面一个节点,如果这个节点是是头部,那么马上进行一次tryAcquire操作,如果操作成功,那么把当前线程弹出队列,整个操作就此结束。如果这个节点不是头部或者说tryAcquire操作失败的话,那么就判断是不是要将当前线程给阻塞掉(shouldParkAfterFailedAcquire)方法:判断当前线程是否应该被阻塞掉,实际上判断的是当前线程的前一个节点的状态,如果前一个节点的状态小于0(condition或者signal),那么返回true,阻塞当前线程;如果前一个节点的状态大于0(cancelled),则向前遍历,直到找到一个节点状态不大于0的节点,并且将中间的cancelled状态的节点全部踢出队列;如果前一个节点的状态等于0,那么将其状态置为-1(signal),并且返回false,等待下一次循环的时候再阻塞。

整个锁的获取过程就是这样,我们再来总结一下整个过程:acquire()方法会先调用一次tryAcquire方法获取一次锁,如果失败,则把当前线程加入到等待队列中去,然后再调用acquireQueued获取锁,acquireQueued在当前节点不在头部的时候会把当前线程的前一个结点的状态置为SIGNAL,然后阻塞当前线程。当当前线程到了队列的头部的时候,那么获取锁的操作就会成功返回。

二、锁的释放

首先,我们知道在acquireQueued方法中,如果一个线程成功获取到了锁,那么它就应该是整个等待队列的head节点,然后,我们再来看一看unlock()方法,和lock()方法一样,unlock()方法也是只有一行代码,直接调用release()方法,我们看看release()方法的实现:

    public final boolean release(int arg) {
        if (tryRelease(arg)) {
            Node h = head;
            if (h != null && h.waitStatus != 0)
                unparkSuccessor(h);
            return true;
        }
        return false;
    }

这个过程首先调用tryRelease方法,如果锁已经完全释放,那么就唤醒下一个节点,先来看看tryRelease方法:

        protected final boolean tryRelease(int releases) {
            int c = getState() - releases;
            if (Thread.currentThread() != getExclusiveOwnerThread())
                throw new IllegalMonitorStateException();
            boolean free = false;
            if (c == 0) {
                free = true;
                setExclusiveOwnerThread(null);
            }
            setState(c);
            return free;
        }

这段代码首先获取当前AQS的state状态并且将其值减一,如果结果等于0(锁已经被完全释放),那么将排他锁的持有线程置为null。将AQS的state状态置为减一后的结果。

然后再看看唤醒继任节点的代码:

    private void unparkSuccessor(Node node) {
        compareAndSetWaitStatus(node, Node.SIGNAL, 0);
        Node s = node.next;
        if (s == null || s.waitStatus > 0) {
            s = null;
            for (Node t = tail; t != null && t != node; t = t.prev)
                if (t.waitStatus <= 0)
                    s = t;
        }
        if (s != null)
            LockSupport.unpark(s.thread);
    }

这段代码先清除当前节点的waitStatus为0,然后判断下一个节点是不是null或者cancelled的状态,如果是,则从队列的尾部往前开始找,找到一个非cancelled状态的节点,最后唤醒这个节点。

最后,总结一下释放操作的整个过程:其实整个释放过程就做了两件事情,一个是将state值减1,然后就是判断锁是否被完全释放,如果被完全释放,则唤醒继任节点。

三、整体过程描述

看了上面的锁的获取与释放操作以后,整体过程还是比较清晰的,在文章的最后,我们把获取与释放操作串在一起在简单看一下:

  • 获取锁的时候将当前线程放入同步队列,并且将前一个节点的状态置为signal状态,然后阻塞
  • 当这个节点的前一个节点成功获取到锁,前一个节点就成了整个同步队列的head。
  • 当前一个节点释放锁的时候,它就唤醒当前线程的这个节点,然后当前线程的节点就可以成功获取到锁了
  • 这个时候它就到整个队列的头部了,然后release操作的时候又可以唤醒下一个。

Java并发编程之AQS源码分析

0

A.Q.S全名是AbstractQueuedSynchronizer,它是java.util.concurrent包的核心类,J.U.C里面的很多类都是基于A.Q.S实现的。A.Q.S设计的时候主要为了实现三个目标:

  1. 同步状态的原子操作
  2. 对线程的阻塞和唤醒操作
  3. 维持一个阻塞线程的等待队列

下面,我们就来看看A.Q.S是怎么实现这三个目标的:

一、同步状态的原子操作

A.Q.S里面包含了一个存储同步状态的变量,它的声明如下:

private volatile int state;

这里采用了volatile修饰符的原因是为了保证对state变量的写对所有的线程都是可见的。但是大家都知道,volatile只能保证变量的可见性,不能保证对变量操作的原子性,所以A.Q.S里面就采用了CAS(Compare And Swap)操作来更新state变量的值,代码如下:

protected final boolean compareAndSetState(int expect, int update) {
    // See below for intrinsics setup to support this
    return unsafe.compareAndSwapInt(this, stateOffset, expect, update);
}

通过CAS操作,只有当state和期望的值expect相同的时候,才会被更新为update值,整个CAS操作由操作系统保证是原子的,这样对state变量的修改就即保证了原子性又无需加锁,大大提高了效率。

二、对线程的阻塞和唤醒操作

Java的Thread类已经提供了对线程的阻塞和唤醒操作的方法:Thread.suspend()和Thread.resume(),但是因为Thread.resume()如果在Thread.suspend()之前被调用,那么就没有任何效果,在多线程环境下,这可能导致死锁。所以J.U.C通过在LockSupport类里面实现了两个方法:LockSupport.park()和LockSupport.unpark(),如果LockSupport.unpark()方法在LockSupport.park()调用,那么还是有效的,就相当于LockSupport.park()后马上调用了LockSupport.unpark()方法。这里需要注意的一点是,LockSupport.unpark()方法不会叠加,如果在LockSupport.park()方法之前调用了多次LockSupport.unpark()方法,那么就和调用了一次一样。

三、维持一个阻塞线程的等待队列

A.Q.S采用了一个叫做C.L.H(Craig,Landin,Hagersten)的队列实现了一个阻塞线程的等待队列,在A.Q.S类里面保持了这个队列的头部和尾部:

private transient volatile Node head;
private transient volatile Node tail;

所有入队和出队的操作都是通过CAS来操作head和tail来实现的,再来看一看Node节点的实现:

static final class Node {
    volatile int waitStatus;
    volatile Node prev;
    volatile Node next;
    volatile Thread thread;
    Node nextWaiter;
}

其中:

  • waitStatus:代表节点的等待状态:一共有四个值:
    • -1(SIGNAL):当前节点的后续节点被阻塞了,所以当当前节点被移出队列的时候(包括释放锁或者是被取消了),应当唤醒它的后续节点。
    • -2(CONDITION):当前节点正在Condition队列中,并不在等待队列中
    • 1(CANCELLED):当前节点因为超时或者中断的原因被取消掉了。
    • 0:非以上状态,所有的新生节点都是这个状态
  • prev:当前节点的前一个节点
  • next:当前节点的下一个节点
  • nextWaiter:指向下一个等待条件的节点
  • thread:尝试获取锁的那个线程

在看了等待队列的基本情况以后,我们接下来来了解下A.Q.S等待队列的入队和出队操作是如何进行的:

入队操作

看下入队操作的代码:

    private Node enq(final Node node) {
        for (;;) {                // ------ 1
            Node t = tail;
            if (t == null) { // Must initialize   -----------2
                Node h = new Node(); // Dummy header
                h.next = node;
                node.prev = h;
                if (compareAndSetHead(h)) {
                    tail = node;
                    return h;
                }
            }
            else {                  //----------------3
                node.prev = t;
                if (compareAndSetTail(t, node)) {
                    t.next = node;
                    return t;
                }
            }
        }
    }

这段代码看上去非常简单,首先看1处,一个无限循环,采用乐观锁的方式进行入队操作;再看2处,当整个等待队列为空的时候,就生成一个头部节点,并把当前节点放到这个头部节点后面,并且把当前节点置为尾部节点。最后看3处,如果等待队列不为空,那么就将节点放入到等待队列的尾部去,其中节点的置换全部采用CAS操作。

出队操作

出队操作的Java代码:

final boolean acquireQueued(final Node node, int arg) {
        try {
            boolean interrupted = false;
            for (;;) {
                final Node p = node.predecessor();
                if (p == head && tryAcquire(arg)) {
                    setHead(node);
                    p.next = null; // help GC
                    return interrupted;
                }
                if (shouldParkAfterFailedAcquire(p, node) &&
                    parkAndCheckInterrupt())
                    interrupted = true;
            }
        } catch (RuntimeException ex) {
            cancelAcquire(node);
            throw ex;
        }
    }

A.Q.S中线程在成功获取到锁的时候便出队了,主要看for无限循环中的第一个if条件里面的代码,如果当前节点的前一个节点是头部节点并且当前节点成功获取到了锁,那么就把当前节点设置成头部节点(出队),然后将原来的头部节点的下一个节点指向null,以帮助GC。

四、Condition队列

A.Q.S里面还有一个特殊的队列Condition队列,用于保存一些等待其他条件或者线程唤醒的阻塞线程,类似于Object.wait()和Object.notify()方法,其实现和A.Q.S的等待队列类似,我将于后面的一些章节详细分析Condition的使用、实现以及和Object.wait()和Object.notify()方法进行比较。

Go to Top