Java中的可重入锁ReentrantLock很常见,可以用它来代替内置锁synchronized,ReentrantLock是语法级别的锁,所以比内置锁更加灵活。

ReentrantLock是Java并发包中互斥锁,它有公平锁和非公平锁两种实现方式,默认构造函数采用非公平锁的方式实现。

lock流程

1
2
3
4
5
6
7
8
9
10
11
12
//默认非公平锁
public ReentrantLock() {
sync = new NonfairSync();
}
//根据fair来初始化使用哪种锁
public ReentrantLock(boolean fair) {
sync = fair ? new FairSync() : new NonfairSync();
}
//调用FairSync或者NonfairSync的lock方法,默认sync=NonfairSync
public void lock() {
sync.lock();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
//1. 首先会通过CAS方法,尝试将当前的java.util.concurrent.locks.AbstractQueuedSynchronizer#state中的state字段改成从0改成1
/*
NonfairSync 继承于 AbstractQueuedSynchronizer, AQS有volatile字段state如下:
private volatile int state;
*/
//2. 如果修改成功,则state锁加锁成功,然后将当前线程通过setExclusiveOwnerThread设置为NonfairSync的独占线程
//3. 否则,就像普通线程一样,acquire(1), 请求加锁
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}
//aquire(1)方法
public final void acquire(int arg) {
if (!tryAcquire(arg) &&
acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
selfInterrupt();
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
//4. 在tryAcquire(1)中调用nonfairTryAcquire(1)
//4.1 获取AQS的 state
//4.1.1 如果c==0,则说明AQS没有加锁,就开始进行CAS操作,成功的化话就setExclusiveOwnerThread,返回true
//4.1.2 如果c!=0, 则检查当前线程释放为AQS的独占线程,如果是,则int nextc = c + acquires;将state + 1,然后setState(nextc),并返回true
//否则返回false
//PS: 这里的CAS操作,是调用Unsafe类的compareAndSwapInt通过native直接郊游系统操作CPU完成的
protected final boolean tryAcquire(int acquires) {
return nonfairTryAcquire(acquires);
}
final boolean nonfairTryAcquire(int acquires) {
final Thread current = Thread.currentThread();
int c = getState();
if (c == 0) {
if (compareAndSetState(0, acquires)) {
setExclusiveOwnerThread(current);
return true;
}
}
else if (current == getExclusiveOwnerThread()) {
int nextc = c + acquires;
if (nextc < 0) // overflow
throw new Error("Maximum lock count exceeded");
setState(nextc);
return true;
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
//5. 如果4的!tryAcquire()成立,即tryAcquire失败,则开始 acquireQueued(addWaiter(Node.EXCLUSIVE), arg)
//5.1 首先 addWaiter(Node.EXCLUSIVE),将当前线程添加到AQS的队列中:
//(1)首先创建一个为独占模式的Node,
//(2)再判断一下队列上有没有结点,没有就创建一个空结点头,然后将Node添加到末尾,创建和添加凑采用的是AQS的CAS操作,保证可见性
private Node addWaiter(Node mode) {
Node node = new Node(Thread.currentThread(), mode);
// Try the fast path of enq; backup to full enq on failure
Node pred = tail;
if (pred != null) {
node.prev = pred;
if (compareAndSetTail(pred, node)) {
pred.next = node;
return node;
}
}
enq(node);
return node;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
//5.2 acquireQueued
//(1)首先判断node是不是队列第一个,如果是、且尝试获取锁成功,则将node设成head,并把此前的head.next=null,帮助gc回收
//(2)如果node不是队列第一个,或者获取锁不成功(其他线程还没释放),则进入shouldParkAfterFailedAcquire方法
final boolean acquireQueued(final Node node, int arg) {
boolean failed = true;
try {
boolean interrupted = false;
for (;;) {
final Node p = node.predecessor();
if (p == head && tryAcquire(arg)) {
setHead(node);
p.next = null; // help GC
failed = false;
return interrupted;
}
if (shouldParkAfterFailedAcquire(p, node) &&
parkAndCheckInterrupt())
interrupted = true;
}
} finally {
if (failed)
cancelAcquire(node);
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
//5.2.1 shouldParkAfterFailedAcquire
//(1)首次进来,waitStatus肯定为0,那么设置为SIGNAL,并立即返回false。然后返回acquireQueued方法,继续走for循环,再一次尝试获取锁,不成功继续走shouldParkAfterFailedAcquire方法,此时waitStatus=-1,因此这里直接走第一个if,返回true。
//(2)然后开始走parkAndCheckInterrupt方法
private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
int ws = pred.waitStatus;
if (ws == Node.SIGNAL)
/*
* This node has already set status asking a release
* to signal it, so it can safely park.
*/
return true;
if (ws > 0) {
/*
* Predecessor was cancelled. Skip over predecessors and
* indicate retry.
*/
do {
node.prev = pred = pred.prev;
} while (pred.waitStatus > 0);
pred.next = node;
} else {
/*
* waitStatus must be 0 or PROPAGATE. Indicate that we
* need a signal, but don't park yet. Caller will need to
* retry to make sure it cannot acquire before parking.
*/
compareAndSetWaitStatus(pred, ws, Node.SIGNAL);
}
return false;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
//5.2.2 parkAndCheckInterrupt
//(1)调用LockSupport的park方法阻塞住了当前的线程
//(2)至此,使用ReentrantLock让线程1独占锁、线程2进入FIFO队列并阻塞的完整流程已经整理出来了
private final boolean parkAndCheckInterrupt() {
LockSupport.park(this);
return Thread.interrupted();
}
public static void park(Object blocker) {
Thread t = Thread.currentThread();
setBlocker(t, blocker);
UNSAFE.park(false, 0L);
setBlocker(t, null);
}

unlock流程

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
//走AQS(AbstractQueuedSynchronizer)的release流程
public void unlock() {
sync.release(1);
}
public final boolean release(int arg) {
if (tryRelease(arg)) {
Node h = head;
if (h != null && h.waitStatus != 0)
unparkSuccessor(h);
return true;
}
return false;
}
//先调用tryRelease尝试释放锁
//首先,只有当c==0的时候才会让free=true,这和上面一个线程多次调用lock方法累加state是对应的,调用了多少次的lock()方法自然必须调用同样次数的unlock()方法才行,这样才把一个锁给全部解开。
//当一条线程对同一个ReentrantLock全部解锁之后,AQS的state自然就是0了,AbstractOwnableSynchronizer的exclusiveOwnerThread将被设置为null,这样就表示没有线程占有锁,方法返回true。
protected final boolean tryRelease(int releases) {
int c = getState() - releases;
if (Thread.currentThread() != getExclusiveOwnerThread())
throw new IllegalMonitorStateException();
boolean free = false;
if (c == 0) {
free = true;
setExclusiveOwnerThread(null);
}
setState(c);
return free;
}
//代码继续往下走,上面的release方法的第四行,h不为null成立,h的waitStatus为-1,不等于0也成立,所以走第5行的unparkSuccessor方法:
//s即h的下一个Node,这个Node里面的线程就是线程2,由于这个Node不等于null,所以走21行,线程2被unPark了,得以运行。
//有一个很重要的问题是:锁被解了怎样保证整个FIFO队列减少一个Node呢?这是一个很巧妙的设计,又回到了AQS的acquireQueued方法了:
//阻塞完成线程2依然会进行for循环。然后,阻塞完成了,线程2所在的Node的前驱Node是p,线程2尝试tryAcquire,成功,然后线程2就成为了head节点了,把p的next设置为null,这样原头Node里面的所有对象都不指向任何块内存空间,h属于栈内存的内容,方法结束被自动回收,这样随着方法的调用完毕,原头Node也没有任何的引用指向它了,这样它就被GC自动回收了。此时,遇到一个return语句,acquireQueued方法结束,后面的Node也是一样的原理。
private void unparkSuccessor(Node node) {
/*
* If status is negative (i.e., possibly needing signal) try
* to clear in anticipation of signalling. It is OK if this
* fails or if status is changed by waiting thread.
*/
int ws = node.waitStatus;
if (ws < 0)
compareAndSetWaitStatus(node, ws, 0);
/*
* Thread to unpark is held in successor, which is normally
* just the next node. But if cancelled or apparently null,
* traverse backwards from tail to find the actual
* non-cancelled successor.
*/
Node s = node.next;
if (s == null || s.waitStatus > 0) {
s = null;
for (Node t = tail; t != null && t != node; t = t.prev)
if (t.waitStatus <= 0)
s = t;
}
if (s != null)
LockSupport.unpark(s.thread);
}

公平锁 与 非公平锁

公平锁

1
2
3
4
//一上来就进入普通跟拿锁流程:加入队列中,按照lock顺序获得锁。排队,公平
final void lock() {
acquire(1);
}

非公平锁

1
2
3
4
5
6
7
//上来就拿锁,拿不再进入普通拿锁流程,插队,不公平
final void lock() {
if (compareAndSetState(0, 1))
setExclusiveOwnerThread(Thread.currentThread());
else
acquire(1);
}

参考链接

http://www.cnblogs.com/szlbm/p/5505698.html

http://www.cnblogs.com/wanly3643/p/3835839.html