申明:jdk版本為1.8
AbstractQueuedSynchronizer是jdk中實現鎖的一個抽象類,有排他和共享兩種模式。
我們這里先看排他模式,共享模式后面結合java.util.concurrent.locks.ReentrantReadWriteLock單獨寫一篇隨筆。
后面還會分析可重入鎖java.util.concurrent.locks.ReentrantLock。
aqs源碼分析。言歸正傳,一起來看看吧。
AQS中主要是通過state變量來控制鎖狀態的,子類通過重寫下面的某些方法并控制state值來達到獲取鎖或者解鎖效果:
一般情況下要實現自己的鎖,會實現java.util.concurrent.locks.Lock接口,并通過內部類繼承自java.util.concurrent.locks.AbstractQueuedSynchronizer實現。形如下面這樣:
AbstractQueuedSynchronizer,好了,AQS的擴展就先到這里,后面講具體的鎖時再詳細分析。
下面分析AQS本身的代碼實現。
核心方法是java.util.concurrent.locks.AbstractQueuedSynchronizer.acquire(int)獲取指定數量的鎖(實際就是給state加多少的問題)以及java.util.concurrent.locks.AbstractQueuedSynchronizer.release(int)釋放指定數量的鎖(實際就是給state減多少的問題)
acquire
代碼以及注釋說明如下;
linux源碼分析,看注釋就明白了,這是以排他模式獲取鎖,并忽略線程中斷,與acquireInterruptibly相對應。acquireInterruptibly在遇到線程被中斷時會拋出InterruptedException異常。
這里會先調用一次tryAcquire嘗試獲取鎖,如果獲取失敗,就會新建一個Node并加到雙向鏈表尾部,該Node會在自旋里面重復地嘗試直到獲得鎖為止,過程中線程可能會阻塞或者不阻塞,這點完全取決于Node中的waitStatus狀態值,這一點后面會詳細分析。
tryAcquire是子類實現的,負責操作state值,通過state值的控制,子類可以實現各種各樣的鎖,父類的主要邏輯就是控制好Node的鏈表,線程的阻塞、喚醒等。
下面看下addWaiter方法,方法的邏輯就是用一個Node對象包裝當前線程,并將Node加入到雙向鏈表的尾部:
thinkphp源碼分析、acquire采用的是排他模式,這里的入參是Node.EXCLUSIVE,創建一個排他模式的Node。大部分情況下tail是不為null的,看注釋就知道了,
這里外部加了層判斷嘗試以最快方式將新建的Node掛到鏈表尾部。
因為是并發環境,所以compareAndSetTail有可能失敗,失敗的話就進入enq方法,以自旋方式往鏈表尾部添加。
addWaiter完之后將剛剛新建的Node傳入acquireQueued,自旋嘗試獲得鎖:
netty源碼分析、
通過上面代碼可知,如果node的前任node是head,并且tryAcquire獲得鎖成功,就將當前node設為head并返回是否是因為線程中斷而從阻塞狀態喚醒的。
否則,shouldParkAfterFailedAcquire,先檢查是否要阻塞當前線程,如果需要阻塞,則調用parkAndCheckInterrupt,阻塞線程,
并在喚醒后檢查是否是終端導致的線程喚醒,同時設置interrupted = true
下面分別看下這兩個方法:
grbl源碼分析?shouldParkAfterFailedAcquire:
根據前任節點的狀態決定是否應該阻塞,如果前任節點狀態為Node.SIGNAL就直接阻塞當前節點對應的線程,否則檢查前任節點狀態是否為cancelled,
如果是的話,就將前面所有狀態為cancelled的節點剔除,剩下的邏輯就是前任狀態設置為Node.SIGNAL,總結一句話,在自旋過程中如果沒有成功獲得鎖
的情況下,兜兜轉轉最終會返回true,阻塞當前線程。
ceph源碼分析。node狀態描述如下:
返回true后就會執行parkAndCheckInterrupt方法,如下:
調用LockSupport.park阻塞當前線程,LockSupport的分析見我的另外一篇隨筆。
qt源碼分析?這里喚醒一般通過LockSupport.unpark喚醒,注意,線程的interrupt方法也會喚醒該線程,所以這里調用了Thread.interrupted()靜態方法
判斷喚醒方式。喚醒后就會繼續進入自旋嘗試獲得鎖。
release
釋放指定數量的鎖,調用子類tryRelease,將State減去入參對應的數值,如果為0,則表示鎖徹底釋放,進入unparkSuccessor方法,喚醒head后面節點對應的線程。
aqs詳解。?正常情況下喚醒head后面一個節點的thread,但是,如果nextNode為cancel狀態,就從tail往前找最前面符合條件的node。
至此,鎖的獲取、釋放都講完了,還有一個涉及到共享模式的那一路邏輯后面結合java.util.concurrent.locks.ReentrantReadWriteLock再分析。
ConditionObject
首先,一把AQS鎖可以new多個ConditionObject對象,調用await和signal必須在鎖的lock和unlock中間,也就是必須獲得鎖的情況下才能調用,否則會拋出異常。
類似synchronized中調用監視器的wait和notify,不同的是AQS同一把鎖可以有多個conditionObject。
下面詳細分析下AQS的await和signal。
await
/*** Implements interruptible condition wait.* <ol>* <li> If current thread is interrupted, throw InterruptedException.* <li> Save lock state returned by {@link #getState}.* <li> Invoke {@link #release} with saved state as argument,* throwing IllegalMonitorStateException if it fails.* <li> Block until signalled or interrupted.* <li> Reacquire by invoking specialized version of* {@link #acquire} with saved state as argument.* <li> If interrupted while blocked in step 4, throw InterruptedException.* </ol>*/public final void await() throws InterruptedException {
// 檢查線程是否已經中斷if (Thread.interrupted())throw new InterruptedException();
// 將當前線程包裝進Node,狀態為Node.CONDITION,并加到條件隊列末尾Node node = addConditionWaiter();
// 釋放掉該線程獲取的所有鎖狀態int savedState = fullyRelease(node);int interruptMode = 0;
// 循環等待當前node直到該node從條件隊列轉入等待隊列
// 或者該線程被中斷,也就是checkInterruptWhileWaiting返回0
while (!isOnSyncQueue(node)) {
// 如果還沒有轉移到等待隊列則阻塞當前線程
// 這里的線程喚醒有一下幾種情況:
// 1.調用signal后轉移到等待隊列并排隊到該node正常喚醒
// 2.線程中斷喚醒
// 3.signal時調用transferForSignal入隊后檢查發現前驅節點取消了,或者對前驅節點設置狀態為Node.SIGNAL時CAS失敗
LockSupport.park(this);
// 喚醒后檢查狀態,是否被中斷喚醒,如果不是返回0,如果是,再看是在signal之前還是之后,
// 之前的話返回THROW_IE,await結束后拋出InterruptedException異常
// 之后的話返回REINTERRUPT,需要重新設置中斷標志
// 值得說明的是,線程被中斷喚醒后,不管處在條件隊列的什么位置,都會直接將對應node轉移到等待隊列
if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
break;
}
// 分析checkInterruptWhileWaiting可知,即使線程中斷也會轉移到等待隊列,所以這里acquireQueued自旋排隊獲取鎖
// 如果acquireQueued返回true則說明等待過程中發生了中斷,如果不是signal之前發生的中斷,需要重新設置中斷狀態以便外部邏輯感知到
// 因為checkInterruptWhileWaiting和parkAndCheckInterrupt中都是調用的Thread.interrupted靜態方法,這個方法會清除中斷狀態
if (acquireQueued(node, savedState) && interruptMode != THROW_IE)
interruptMode = REINTERRUPT;
// 正常signal的node會doSignal的時候設置nextWaiter = null,
//所以這里的條件滿足情況就是在直到獲得鎖之前都沒有調用signal(因為signal開始執行就設置了nextWaiter = null)
if (node.nextWaiter != null) // clean up if cancelled
// 在上述情況下,將node從條件隊列中移除
unlinkCancelledWaiters();
// 如果發生中斷,則根據中斷情況向外反饋
if (interruptMode != 0)
reportInterruptAfterWait(interruptMode);
}
jquery源碼分析,整個主線邏輯就是上面分析的那樣,下面詳解下關鍵的方法:
checkInterruptWhileWaiting,檢查是否是中斷導致的線程喚醒并返回后續如何處理的標志:
THROW_IE:拋出異常
REINTERRUPT:重新設置中斷標志
有了源碼該如何使用?從上面代碼可知,如果不是中斷,則返回0,重新進入while循環判斷是否在等待隊列中,如果是中斷,則判斷中斷時機:
這里的中斷時機主要針對的signal之前還是之后,因為在signal中會先將node狀態設置為0再將node轉移到等待隊列,如下圖所示:
因為都是CAS操作,所以這里通過判斷能否將狀態由CONDITION設置為0判斷是在signal之前還是之后,如果設置成功,說明在signal操作之前,則將node加入到等待隊列并返回true,對應的狀態是THROW_IE,如果設置失敗,說明signal中的CAS操作搶先了,那就while循環等待調用signal的線程將該node轉移到等待隊列,對應的狀態是REINTERRUPT。
arraylist源碼分析,備注:ConditionObject在jdk的BlockingQueue中有很好應用,可以結合一起看下效果更佳。