Lock


LOCK接口

锁是用来控制多个线程访问共享资源的方式,一般来说,一个锁能够防止多个线程同时
访问共享资源(但是有些锁可以允许多个线程并发的访问共享资源,比如读写锁)。在Lock接
口出现之前,Java程序是靠synchronized关键字实现锁功能的,而Java SE 5之后,并发包中新增
了Lock接口(以及相关实现类)用来实现锁功能,它提供了与synchronized关键字类似的同步功
能,只是在使用时需要显式地获取和释放锁。虽然它缺少了(通过synchronized块或者方法所提
供的)隐式获取释放锁的便捷性,但是却拥有了锁获取与释放的可操作性、可中断的获取锁以
及超时获取锁等多种synchronized关键字所不具备的同步特性。

Lock接口有三个实现类,一个是ReentrantLock,另两个是ReentrantReadWriteLock类中的两个静态内部类ReadLock和WriteLock。

  • 与互斥锁定相比,读-写锁定允许对共享数据进行更高级别的并发访问。虽然一次只有一个线程(writer 线程)可以修改共享数据,但在许多情况下,任何数量的线程可以同时读取共享数据(reader 线程)。从理论上讲,与互斥锁定相比,使用读-写锁定所允许的并发性增强将带来更大的性能提高。
  • 在实践中,只有在多处理器上并且只在访问模式适用于共享数据时,才能完全实现并发性增强。——例如,某个最初用数据填充并且之后不经常对其进行
    修改的 collection,因为经常对其进行搜索(比如搜索某种目录),所以这样的 collection 是使用读-写锁定的理想候选者。

队列同步器

队列同步器AbstractQueuedSynchronizer(以下简称同步器),是用来构建锁或者其他同步组
件的基础框架,它使用了一个int成员变量表示同步状态,通过内置的FIFO队列来完成资源获
取线程的排队工作。

同步器是实现锁(也可以是任意同步组件)的关键,在锁的实现中聚合同步器,利用同步
器实现锁的语义。可以这样理解二者之间的关系:锁是面向使用者的,它定义了使用者与锁交
互的接口(比如可以允许两个线程并行访问),隐藏了实现细节;同步器面向的是锁的实现者,
它简化了锁的实现方式,屏蔽了同步状态管理、线程的排队、等待与唤醒等底层操作。

同步器的主要使用方式是继承,子类通过继承同步器并实现它的抽象方法来管理同步状
态,在抽象方法的实现过程中免不了要对同步状态进行更改,这时就需要使用同步器提供的3
个方法(getState()、setState(int newState)和compareAndSetState(int expect,int update))来进行操作,因为它们能够保证状态的改变是安全的。子类推荐被定义为自定义同步组件的静态内部
类,同步器自身没有实现任何同步接口,它仅仅是定义了若干同步状态获取和释放的方法来
供自定义同步组件使用,同步器既可以支持独占式地获取同步状态,也可以支持共享式地获
取同步状态,这样就可以方便实现不同类型的同步组件(ReentrantLock、
ReentrantReadWriteLock和CountDownLatch等)。

实现一个lock–通过队列同步器

同步器AQS的设计是基于模板方法模式的,也就是说,使用者需要继承同步器并重写指定的
方法,随后将同步器组合在自定义同步组件的实现中,并调用同步器提供的模板方法,而这些
模板方法将会调用使用者重写的方法。

同步器提供的模板方法基本上分为3类:独占式获取与释放同步状态、共享式获取与释放
同步状态和查询同步队列中的等待线程情况。自定义同步组件将使用同步器提供的模板方法
来实现自己的同步语义。

下面为自定义的一个lock锁:

import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.AbstractQueuedSynchronizer;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;

public class Mutex implements Lock {
    // 静态内部类,自定义同步器,随后模板方法会调用这些方法
    private static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = -4387327721959839431L;

        // 是否处于占用状态
        protected boolean isHeldExclusively() {
            return getState() == 1;
        }

        // 当状态为0的时候获取锁
        public boolean tryAcquire(int acquires) {
            assert acquires == 1; // Otherwise unused
            if (compareAndSetState(0, 1)) {
                setExclusiveOwnerThread(Thread.currentThread());
                return true;
            }
            return false;
        }

        // 释放锁,将状态设置为0
        protected boolean tryRelease(int releases) {
            assert releases == 1; // Otherwise unused
            if (getState() == 0)
                throw new IllegalMonitorStateException();
            setExclusiveOwnerThread(null);
            setState(0);
            return true;
        }

        // 返回一个Condition,每个condition都包含了一个condition队列
        Condition newCondition() {
            return new ConditionObject();
        }
    }

    // 仅需要将操作代理到Sync上即可
    private final Sync sync = new Sync();

    public void lock() {
        sync.acquire(1);
    }

    public boolean tryLock() {
        return sync.tryAcquire(1);
    }

    public void unlock() {
        sync.release(1);
    }

    public Condition newCondition() {
        return sync.newCondition();
    }

    public boolean isLocked() {
        return sync.isHeldExclusively();
    }

    public boolean hasQueuedThreads() {
        return sync.hasQueuedThreads();
    }

    public void lockInterruptibly() throws InterruptedException {
        sync.acquireInterruptibly(1);
    }

    public boolean tryLock(long timeout, TimeUnit unit) throws InterruptedException {
        return sync.tryAcquireNanos(1, unit.toNanos(timeout));
    }
}

重入锁(ReentrantLock)

重入锁就是该锁支持一个线程对资源的重复可利用。除此之外,该锁还支持获取锁的公平与非公平选择。

ReentrantLock中有3个内部类,分别是Sync、FairSync和NonfairSync。

Sync是一个继承AQS的抽象类,使用独占锁,复写了tryRelease方法。tryAcquire方法由它的两个FairSync(公平锁)和NonfairSync(非公平锁)实现。

ReentrantLock的lock方法使用sync的lock方法,Sync的lock方法是个抽象方法,由公平锁和非公平锁去实现。unlock方法直接使用AQS的release方法。所以说公平锁和非公平锁的释放锁过程是一样的,不一样的是获取锁过程。

先看一下公平锁nonfair的lock方法:

final void lock() {
// acquire方法内部调用tryAcquire方法
// 公平锁的获取锁方法,对于没有获取到的线程,会按照队列的方式挂起线程
acquire(1);
}

protected final boolean tryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        // 公平锁这里多了一个!hasQueuedPredecessors()判断,表示是否有线程在队列里等待的时间比当前线程要长,如果有等待时间更长的线程,那么放弃获取锁
        if (!hasQueuedPredecessors() &&
            compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0)
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

该方法与nonfairTryAcquire(int acquires)比较,唯一不同的位置为判断条件多了hasQueuedPredecessors()方法,即加入了同步队列中当前节点是否有前驱节点的判断,如果该
方法返回true,则表示有线程比当前线程更早地请求获取锁,因此需要等待前驱线程获取并释
放锁之后才能继续获取锁。

非公平锁lock方法:

final void lock() {
    // 非公平锁的获取锁
    // 跟公平锁的区别就在这里。直接对状态位state进行cas操作,成功就获取锁,这是一种抢占式的方式。不成功跟公平锁一样进入队列挂起线程
    if (compareAndSetState(0, 1))
        setExclusiveOwnerThread(Thread.currentThread());
    else
        acquire(1);
}
// 调用Sync的nonfairTryAcquire方法
protected final boolean tryAcquire(int acquires) {
    return nonfairTryAcquire(acquires);
}

final boolean nonfairTryAcquire(int acquires) {
    final Thread current = Thread.currentThread();
    int c = getState();
    if (c == 0) {
        if (compareAndSetState(0, acquires)) {
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    else if (current == getExclusiveOwnerThread()) {
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    return false;
}

该方法增加了再次获取同步状态的处理逻辑:通过判断当前线程是否为获取锁的线程来
决定获取操作是否成功,如果是获取锁的线程再次请求,则将同步状态值进行增加并返回
true,表示获取同步状态成功。

对于unlock方法直接使用AQS的release方法。公平锁和非公平锁的释放锁过程是一样的,不一样的是获取锁过程。
protected final boolean tryRelease(int releases) {
int c = getState() - releases; // 释放
if (Thread.currentThread() != getExclusiveOwnerThread()) // 如果当前线程不是独占线程,直接抛出异常
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) { // 由于是可重入锁,需要判断是否全部释放了
free = true;
setExclusiveOwnerThread(null); // 全部释放的话直接把独占线程设置为null
}
setState(c);
return free;
}

// 恢复线程
public final boolean release(int arg) {
    if (tryRelease(arg)) {
        Node h = head;  // 恢复第一个挂起的线程
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

ReentrantLock的默认构造函数使用的是NonfairSync,如果想使用FairSync,使用带有boolean参数的构造函数,传入true表示FairSync,否则是NonfairSync。

读写锁 ReentrantReadWriteLock

读写锁在同一时刻可以允许多个读线程访问,但是在写线程访问时,所有的读
线程和其他写线程均被阻塞。读写锁维护了一对锁,一个读锁和一个写锁,通过分离读锁和写
锁,使得并发性相比一般的排他锁有了很大提升。

  • 使用范例

    public class Cache {
        static Map<String, Object> map = new HashMap<String, Object>();
        static ReentrantReadWriteLock rwl = new ReentrantReadWriteLock();
        static Lock r = rwl.readLock();
        static Lock w = rwl.writeLock();
        // 获取一个key对应的value
        public static final Object get(String key) {
            r.lock();
            try {
                return map.get(key);
            } finally {
                r.unlock();
            }
        }
        // 设置key对应的value,并返回旧的value
        public static final Object put(String key, Object value) {
            w.lock();
            try {
                return map.put(key, value);
            } finally {
                w.unlock();
            }
            }// 清空所有的内容
            public static final void clear() {
                w.lock();
                try {
                    map.clear();
                } finally {
                    w.unlock();
                }
        }
    }    
    
  • 读写锁实现分析
  • 1 - 读写状态的设计

读写锁同样依赖自定义同步器来实现同步功能,而读写状态就是其同步器的同步状态。回想ReentrantLock中自定义同步器的实现,同步状态表示锁被一个线程重复获取的次数,而读写锁的自定义同步器需要在同步状态(一个整型变量)上维护多个读线程和一个写线程的状态,使得该状态的设计成为读写锁实现的关键。
如果在一个整型变量上维护多种状态,就一定需要“按位切割使用”这个变量,读写锁将变量切分成了两个部分,高16位表示读,低16位表示写。

  • 2 - 写锁的获取与释放

ReentrantReadWriteLock的tryAcquire方法

protected final boolean tryAcquire(int acquires) {
    Thread current = Thread.currentThread();
    int c = getState();
    int w = exclusiveCount(c);
    if (c != 0) {
        // 存在读锁或者当前获取线程不是已经获取写锁的线程
        if (w == 0 || current != getExclusiveOwnerThread())
        return false;
        if (w + exclusiveCount(acquires) > MAX_COUNT)
        throw new Error("Maximum lock count exceeded");
        setState(c + acquires);
        return true;
    }
    if (writerShouldBlock() || !compareAndSetState(c, c + acquires)) {
        return false;
    }
    setExclusiveOwnerThread(current);
    return true;
}

写锁的释放与ReentrantLock的释放过程基本类似,每次释放均减少写状态,当写状态为0
时表示写锁已被释放,从而等待的读写线程能够继续访问读写锁,同时前次写线程的修改对
后续读写线程可见。

  • 3- 读锁的获取的释放

ReentrantReadWriteLock的tryAcquireShared方法

protected final int tryAcquireShared(int unused) {
    for (;;) {
        int c = getState();
        int nextc = c + (1 << 16);
        if (nextc < c)
            throw new Error("Maximum lock count exceeded");
        if (exclusiveCount(c) != 0 && owner != Thread.currentThread())
            return -1;
        if (compareAndSetState(c, nextc))
            return 1;
    }
}

在tryAcquireShared(int unused)方法中,如果其他线程已经获取了写锁,则当前线程获取读
锁失败,进入等待状态。如果当前线程获取了写锁或者写锁未被获取,则当前线程(线程安全,
依靠CAS保证)增加读状态,成功获取读锁。
读锁的每次释放(线程安全的,可能有多个读线程同时释放读锁)均减少读状态,减少的
值是(1<<16)

Condition接口


简单使用:

public class BoundedQueue<T> {
    private Object[] items;
    // 添加的下标,删除的下标和数组当前
    private int addIndex, removeI
    private Lock lock = new R
    private Condition notEmpty
    private Condition notFull
    public BoundedQueue(int size)
        items = new Object[si
    }
    // 添加一个元素,如果数组满,则添加线
    public void add(T t) throws I
        lock.lock();
        try {
            while (count == items.length)
            notFull.await();
            items[addIndex] = t;
            if (++addIndex == items.length)
            addIndex = 0;
            ++count;
            notEmpty.signal();
        } finally {
            ock.unlock();
        }
    }
    // 由头部删除一个元素,如果数组空,则删除线程进入等待状态,直到有新添加
    @SuppressWarnings("unchecked")
    public T remove() throws InterruptedException {
        lock.lock();
        try {
            while (count == 0)
            notEmpty.await();
            Object x = items[removeIndex];
            if (++removeIndex == items.length)
            removeIndex = 0;
            --count;
            notFull.signal();
            return (T) x;
        } finally {
            lock.unlock();
        }
    }
}

全文来自 java并发编程的艺术

-------------本文结束感谢您的阅读-------------