切換語言為:簡體
透過原始碼分析執行緒池是如何處理異常執行緒和執行緒保活的?

透過原始碼分析執行緒池是如何處理異常執行緒和執行緒保活的?

  • 爱糖宝
  • 2024-10-09
  • 2044
  • 0
  • 0

這個問題對於看過執行緒池原始碼的同學應該已經知道答案了,沒有看過原始碼的也不要慌,現在我們就一起看一下執行緒池執行緒的保活策略。

一、執行緒池中在哪執行任務

首先進入ThreadPoolExecutorexecute方法。

透過原始碼分析執行緒池是如何處理異常執行緒和執行緒保活的?

  • 首先檢查當前工作執行緒數量,workerCountOf(c) 是否小於核心執行緒數量corePoolSize,如果小於就建立一個新執行緒addWorker()執行這個任務。

  • 如果執行緒池在執行且任務加入佇列成功,但是當前工作執行緒數量為0,也會建立一個新執行緒addWorker()

  • 如果任務不能被加入任務佇列(workQueue.offer(command)返回false), 也會建立一個新執行緒addWorker()

execute方法內部有三處呼叫addWorker()方法的位置,進入到addWorker() 方法中,可以看到有這樣一行程式碼new Worker(firstTask)

透過原始碼分析執行緒池是如何處理異常執行緒和執行緒保活的?

Worker類又實現了 Runnable,所以執行緒池中的執行緒也就是Worker,而執行就在run()方法內部,我們只需要找到Worker類中的run方法即可。

Worker(Runnable firstTask) {
            setState(-1); // inhibit interrupts until runWorker
            this.firstTask = firstTask;
            this.thread = getThreadFactory().newThread(this);
        }
        /** Delegates main run loop to outer runWorker  */
        public void run() {
            runWorker(this);
        }

run 方法內部呼叫了 runWorker,秘密就在runWorker方法內部。

所以執行緒池中執行緒最終就在runWorker方法中執行的。

二、getTask 獲取任務方法

在上面ThreadPoolExecutorexecute方法中,有一步是往阻塞佇列中放入任務(workQueue.offer(command))。

透過原始碼分析執行緒池是如何處理異常執行緒和執行緒保活的?

上面我們找到了執行緒池中執行緒執行任務的地方,那麼我們看看是從哪讀取的任務?

runWorker方法中有一行程式碼task = getTask(),此處就是從阻塞佇列中獲取任務的程式碼,讓我們看一下 getTask() 的內部實現。

透過原始碼分析執行緒池是如何處理異常執行緒和執行緒保活的?

核心程式碼就是

Runnable r = timed ?
                    workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :
                    workQueue.take();

根據timed 的狀態判斷,當工作執行緒數量大於核心執行緒數量時呼叫poll方法獲取任務,當工作執行緒數量小於核心執行緒數量時呼叫take方法。

對於阻塞佇列的介紹如下:

  • poll 方法如果佇列不為空,返回頭部元素。如果佇列為空會將執行緒阻塞在此處,阻塞時間是keepAliveTime。當時間到了獲取不到任務時返回null

  • take方法如果佇列不為空返回頭部元素。如果佇列為空會將執行緒阻塞在此處,直到佇列中有元素可供使用。

對於非核心執行緒,當執行緒池中的執行緒數量超過核心執行緒數量且空閒時間超過keepAliveTime時,非核心執行緒會被回收。

透過這種方式,執行緒在沒有任務時就阻塞在佇列上,從而實現保活。

上面我們知道了非核心執行緒的保活策略,那麼對於核心執行緒又是如何保活的呢?

三、核心執行緒如何實現的保活

線上程池中核心執行緒預設是不會被回收的。

不過我們可以透過設定allowCoreThreadTimeOut來實現,getTask方法中timed的校驗。

boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;

所以當allowCoreThreadTimeOut 設定為true時,核心執行緒在沒有任務可以執行時會使用take方法進行阻塞,直到獲取到任務位置,而不是因為沒有任務就被銷燬,從而實現的執行緒保活。

任務執行的過程是無法保證不出問題的,不管是核心執行緒還是非核心執行緒,當執行緒中出現異常之後,執行緒池是如何處理的呢?

四、執行緒異常之後如何保活

繼續回到runWorker方法中,其中task.run()是真正執行任務的地方,當此處發生異常之後,有try catch

透過原始碼分析執行緒池是如何處理異常執行緒和執行緒保活的?

所以當任務執行異常之後,會依次執三個finally塊中的程式碼。

再看processWorkerExit() 方法之前,我們先看一個引數completedAbruptly

completedAbruptly 代表是否是異常退出的,預設是true代表異常退出。

runWorker方法中,如果執行緒任務是正常執行完成的,會在最後修改該值。

透過原始碼分析執行緒池是如何處理異常執行緒和執行緒保活的?

由於我們是異常執行緒,所以程式碼肯定不會走到這,直接走到finally中的processWorkerExit方法。

processWorkerExit方法中,它會根據completedAbruptly的值來調整執行緒池中的工作執行緒數量,從工作執行緒集合中移除該執行緒,並根據執行緒池的狀態和工作執行緒數量決定是否需要新增新執行緒。

透過原始碼分析執行緒池是如何處理異常執行緒和執行緒保活的?

!completedAbruptly 判斷工作執行緒是不是異常退出的,如果不是異常退出的計算最小執行緒數量。

如果允許核心執行緒回收allowCoreThreadTimeOut=truemin0

如果min0 且工作佇列不為空! workQueue.isEmpty()min1

如果當前工作執行緒workerCountOf(c)大於等於這個最小的執行緒數量min,直接返回。

如果小於這個最小的工作執行緒數量min,呼叫addWorker

此處addWorker 的觸發條件就是當執行緒池的狀態小於STOP 也就是執行緒池還在執行runStateLessThan(c, STOP)時且不滿足上述不需要新增新執行緒的判斷。

當上述條件滿足的時候,則呼叫addWorker(null,false)新增一個新的工作執行緒,因為傳入的引數Runnablenull,所以這個新執行緒會從任務佇列中繼續讀取任務來執行。

最後總結一下,當執行緒異常之後,按照正常情況來說執行緒就直接消失了,但是透過processWorkerExit方法的補救,增加了一個新的執行緒,保證執行緒池的執行。

五、總結

執行緒池中的執行緒分為核心執行緒與非核心執行緒。

核心執行緒預設不回收,可以透過設定allowCoreThreadTimeOuttrue 來回收。

非核心執行緒在獲取任務為空且空閒時間超過一定時間之後進行回收。

執行緒池的保活策略透過阻塞佇列的阻塞特性實現,poll 方法實現可以指定超時時間的阻塞,take 方法實現阻塞直到獲取到任務。

當執行緒異常之後,透過新增執行緒的方式實現執行緒的補救,保證執行緒池的執行。

0則評論

您的電子郵件等資訊不會被公開,以下所有項目均必填

OK! You can skip this field.