当前位置: 首页>JAVA>正文

java多線程面試題及答案,JAVA8線程池THREADPOOLEXECUTOR底層原理及其源碼解析

java多線程面試題及答案,JAVA8線程池THREADPOOLEXECUTOR底層原理及其源碼解析

  • 小侃一下
  • 1. 使用線程池的好處. 為什么要使用線程池?
  • 2. 線程池核心參數介紹
  • 3. 提交任務到線程池中的流程
    • 3.1 ThreadPoolExecutor#execute方法整體流程
    • 3.2 排隊恰火鍋的場景
  • 4. ThreadPoolExecutor線程池源碼及其原理
    • 4.1 從創建ThreadPoolExecutor開始: 線程池構造函數的源碼
    • 4.2 ThreadPoolExecutor中的一些重要的屬性
      • 4.2.1 線程池的運行狀態
      • 4.2.2 核心屬性ctl源碼(線程池狀態和有效線程數)
      • 4.2.3 線程池中的mainLock鎖
      • 4.2.4 線程池中的線程集合
      • 4.2.5 mainLock的Condition()對象
      • 4.2.6 線程池中曾經達到的最大線程數
      • 4.2.7 線程池中已完成的任務數
      • 4.2.8 核心線程池中的空閑線程
  • 5. ThreadPoolExecutor一些重要的方法源碼及其原理解析
    • 5.1 execute方法源碼: 提交task到線程池
    • 5.2 addWorker方法源碼: 創建線程并啟動, 執行提交的task
    • 5.3 Worker類源碼: 線程是如何執行提交到線程池中的task?
      • 5.3.1 Worker 的定義
      • 5.3.2 Worker中的屬性
      • 5.3.3 Worker的構造方法
      • 5.3.4 Worker中的run方法
      • 5.3.5 Worker中的重寫AQS的方法tryAcquire, tryRelease, isHeldExclusively
      • 5.3.6 lock方法
      • 5.3.7 tryLock方法
      • 5.3.8 isLocked方法
      • 5.3.9 interruptIfStarted方法
      • 5.3.10 unlock方法
    • 5.4 runWorker方法源碼: 線程池中線程被復用的關鍵
    • 5.5 getTask方法源碼: 從BlockingQueue中獲取task
    • 5.6 shutdown方法源碼: 中斷所有空閑的線程
    • 5.7 shutdownNow方法源碼: 中斷所有空閑的線程, 刪除并返回BlockingQueue中所有的task
  • 6. ThreadPoolExecutor一些其他的方法和屬性介紹
    • 6.1 默認的線程工廠Executors#defaultThreadFactory
    • 6.2 ThreadPoolExecutor默認實現的4種拒絕策略
      • 6.2.1 CallerRunsPolicy
      • 6.2.2 AbortPolicy
      • 6.2.3 DiscardPolicy
      • 6.2.4 DiscardOldestPolicy
    • 6.3 addWorkerFailed方法源碼: 移除啟動線程失敗的worker
    • 6.4 tryTerminate方法源碼: 嘗試更改runState, workerCount, 嘗試關閉線程池
    • 6.5 awaitTermination方法源碼: 等待指定時間后, 線程池是否已經關閉
    • 6.6 prestartCoreThread方法源碼: 預啟動一個核心線程
    • 6.7 prestartAllCoreThreads方法源碼: 預先啟動線程池中的所有核心線程
    • 6.8 getActiveCount方法源碼: 獲得當前線程池中活躍的線程
  • 總結

?

小侃一下

日常開發中, 或許不會直接new線程或線程池, 但這些線程相關的基礎或思想是非常重要的, 參考林迪效應;

就算沒有直接用到, 可能間接也用到了類似的思想或原理, 例如tomcat, jetty, 數據庫連接池, MQ;

本文不會對線程的基礎知識進行介紹, 所以最好已"進食"關于線程的基礎知識, 再"食用"本文更佳;

由于在下的工作及其它原因, 前后花費了數月的時間才完成這篇博客, 希望能幫助到想要了解ThreadPoolExecutor線程池源碼和原理的同學.
?

1. 使用線程池的好處. 為什么要使用線程池?

  1. 避免頻繁創建、銷毀線程的開銷; 復用創建的線程.

  2. 及時響應提交的任務; 提交一個任務,不再是每次都需要創建新的線程.

  3. 避免每次提交的任務都新建線程, 造成服務器資源耗盡,?線程頻繁上下文切換等服務器資源開銷.

  4. 更容易監控、管理線程; 可以統計出已完成的任務數, 活躍的線程數, 等待的任務數等, 可以重寫hook方法beforeExecute,?afterExecute,?terminated?, 重寫之后, 結合具體的業務進行處理.
    ?

2. 線程池核心參數介紹

參數意義
corePoolSize線程池中的核心線程數
workQueue存放提交的task
maximumPoolSize線程池中允許的最大線程數
threadFactory線程工廠, 用來創建線程, 由Executors#defaultThreadFactory實現
keepAliveTime空閑線程存活時間(默認是臨時線程, 也可設置為核心線程)
unit空閑線程存活時間單位枚舉

下面將結合線程池中的任務提交流程加深理解.
?

3. 提交任務到線程池中的流程

3.1 ThreadPoolExecutor#execute方法整體流程

這里以java.util.concurrent.ThreadPoolExecutor#execute方法為例, 畫一個簡單的圖:


上圖中的worker可簡單理解為線程池中的一個線程,?workers.size()即使線程池中的線程數;

  1. workers.size()小于corePoolSize時, 創建新的線程執行提交的task.
  2. workers.size()大于corePoolSize時, 并且workQueue沒有滿, 將task添加到workQueue.
  3. workers.size()大于corePoolSize時, 并且workQueue已經滿了, 但是workers.size()<maximumPoolSize, 就創建一個臨時線程處理task.
  4. workers.size()大于corePoolSize時, 并且workQueue已經滿了, 但是workers.size()>=maximumPoolSize, 執行拒絕策略.

后續會有對ThreadPoolExecutor#execute方法的詳細解讀:?execute方法源碼: 提交task到線程池.

4種默認的拒絕策略:?ThreadPoolExecutor默認實現的4種拒絕策略.
?

3.2 排隊恰火鍋的場景

這里我們可以想像一個場景: 去海底撈吃火鍋;

下午4點晚市正式開始排隊, 假如店內一共有16張桌子, 陸續光臨的16組客人將店內坐滿;

店外一共有20組客人座位, 則第17~36組客人坐在店外排隊;

第37組客人來了, 啟動臨時餐桌供客人吃飯.

所以, 這里的店內16張桌子則是corePoolSize, 店外一共有20組座位則為BlockingQueue, 而臨時餐桌數量即maximumPoolSize-corePoolSize.

上面的例子并非絕對完美, 僅僅是為了便于我們理解線程池的各個參數, 以及加深印象.

?

4. ThreadPoolExecutor線程池源碼及其原理

有了上面對線程池的總體了解后, 下面結合源碼來看看線程池的底層原理吧!
?

4.1 從創建ThreadPoolExecutor開始: 線程池構造函數的源碼

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue) {this(corePoolSize, maximumPoolSize, keepAliveTime, unit, workQueue,Executors.defaultThreadFactory(), defaultHandler);
}

上面是ThreadPoolExecutor參數最少的一個構造方法, 默認的ThreadFactoryExecutors.defaultThreadFactory(), 默認的?RejectedExecutionHandlerdefaultHandler = new AbortPolicy();

public ThreadPoolExecutor(int corePoolSize,int maximumPoolSize,long keepAliveTime,TimeUnit unit,BlockingQueue<Runnable> workQueue,ThreadFactory threadFactory,RejectedExecutionHandler handler) {if (corePoolSize < 0 ||maximumPoolSize <= 0 ||maximumPoolSize < corePoolSize ||keepAliveTime < 0)throw new IllegalArgumentException();if (workQueue == null || threadFactory == null || handler == null)throw new NullPointerException();this.acc = System.getSecurityManager() == null ?null :AccessController.getContext();this.corePoolSize = corePoolSize;this.maximumPoolSize = maximumPoolSize;this.workQueue = workQueue;this.keepAliveTime = unit.toNanos(keepAliveTime);this.threadFactory = threadFactory;this.handler = handler;
}

上面是ThreadPoolExecutor參數最多的一個構造方法, 其他構造方法都是傳入參數調用這個構造方法, 默認的線程工廠見默認的線程工廠Executors#defaultThreadFactory, 各個參數在線程池核心參數介紹已經介紹.
?

4.2 ThreadPoolExecutor中的一些重要的屬性

對一些重要屬性有基礎的認知, 有助于后面我們更容易看懂源碼流程.

4.2.1 線程池的運行狀態

private static final int COUNT_BITS = Integer.SIZE - 3;
private static final int CAPACITY   = (1 << COUNT_BITS) - 1;// runState is stored in the high-order bits
private static final int RUNNING    = -1 << COUNT_BITS;
private static final int SHUTDOWN   =  0 << COUNT_BITS;
private static final int STOP       =  1 << COUNT_BITS;
private static final int TIDYING    =  2 << COUNT_BITS;
private static final int TERMINATED =  3 << COUNT_BITS;

根據上面源碼可知,?COUNT_BITS的值為29,?CAPACITY的值為2的29次方-1, 二進制表示為: "00011111111111111111111111111111"(明顯29個1);

上面的源碼中線程池的運行狀態的二進制表示:

狀態二進制意義
RUNNING11100000000000000000000000000000接受新execute的task, 執行已入隊的task
SHUTDOWN0不接受新execute的task, 但執行已入隊的task, 中斷所有空閑的線程
STOP00100000000000000000000000000000不接受新execute的task, 不執行已入隊的task, 中斷所有的線程
TIDYING01000000000000000000000000000000所有線程停止,?workerCount數量為0, 將執行hook方法: terminated()
TERMINATED01100000000000000000000000000000terminated()方法執行完畢

可以看出, 線程池的狀態由32位int整型的二進制的前三位表示.

下圖根據Javadoc所畫:


?

4.2.2 核心屬性ctl源碼(線程池狀態和有效線程數)

private final AtomicInteger ctl = new AtomicInteger(ctlOf(RUNNING, 0));

核心屬性ctl, 數據類型是AtomicInteger, 表示了兩個含義:

  1. 線程池運行狀態(runState)
  2. 線程池中的有效線程數(workerCount)

那是如何做到一個屬性表示兩個含義的呢? 那就要看看ctlOf方法

private static int ctlOf(int rs, int wc) { return rs | wc; }

ctlOf方法在線程池內部用來更新線程池的ctl屬性,比如ctl初始化的時候:?ctl = new AtomicInteger(ctlOf(RUNNING, 0)), 調用ThreadPoolExecutor#shutdown方法等;

rs表示runState,?wc表示workerCount;

將?runStateworkerCount按位或運算得到ctl的值;

runStateworkerCount的值由下面兩個方法packing和unpacking, 這里的形參c就是ctl.get()的值;

// Packing and unpacking ctl
private static int runStateOf(int c)     { return c & ~CAPACITY; }
private static int workerCountOf(int c)  { return c & CAPACITY; }

下面用表格更清晰理解:

方法方法體帶入CAPACITY的值
runStateOfc & ~CAPACITYc & 11100000000000000000000000000000
workerCountOfc & CAPACITYc & 00011111111111111111111111111111

按位與運算, 相同位置, 同1才為1, 其余為0;

結合表格看,?runStateOf方法取ctl前3位表示runState,?workerCountOf方法取第4~32位的值表示workerCount;

相信大家已經明白runStateworkerCount如何被packing和unpacking, 這就是為什么ctl能即表示runState又能表示wokerCount.

Note: 眾所周知, 與2的整數次冪-1進行按位與運算結果等于取余運算的結果, 而位運算效率高于取余運算, 與Java8及其之后的HashMap的散列方式有同曲同工之妙, 見:https://www.cnblogs.com/theRhyme/p/9404082.html#_lab2_1_16.
?

4.2.3 線程池中的mainLock鎖

private final ReentrantLock mainLock = new ReentrantLock();

這把可重入鎖, 在線程池的很多地方會被用到;

比如要對workers(線程池中的線程集合)操作的時候(如添加一個worker到工作中), interrupt所有的?workers, 調用shutdown方法等.
?

4.2.4 線程池中的線程集合

private final HashSet<Worker> workers = new HashSet<Worker>();

用來保存當前線程池中的所有線程;

可通過該集合對線程池中的線程進行中斷,?遍歷等;

創建新的線程時, 要添加到該集合, 移除線程, 也要從該集合中移除對應的線程;

對該集合操作都需要mainLock鎖.
?

4.2.5 mainLock的Condition()對象

private final Condition termination = mainLock.newCondition();

主要是為了讓tryTerminate方法與awaitTermination方法結合使用;

tryTerminate又被shutdownshutdownNowprocessWorkerExit等方法調用;

Condition對象termination的作用就是當線程池中的狀態表示的值小于TERMINATED的值3時, 當前調用了awaitTermination方法的線程就會wait對應的時間;

等到過了指定的wait時間, 或者線程池狀態等于或大于TERMINATED, wait的線程被喚醒, 就繼續執行;

如果不清楚wait(long)wait()的區別可參考:?Object#wait()與Object#wait(long)的區別.
?

4.2.6 線程池中曾經達到的最大線程數

private int largestPoolSize;

用作監控, 查看當前線程池, 線程數最多的時候的數量是多少, 見方法ThreadPoolExecutor#getLargestPoolSize;

mainLock保證其可見性和原子性.
?

4.2.7 線程池中已完成的任務數

private long completedTaskCount;

通過方法ThreadPoolExecutor#getCompletedTaskCount獲取.
?

4.2.8 核心線程池中的空閑線程

private volatile boolean allowCoreThreadTimeOut;

默認情況下, 只有臨時線程超過了keepAliveTime的時間會被回收;

allowCoreThreadTimeOut默認為false, 如果設置為true, 則會通過中斷或getTask的結果為null的方式停止超過keepAliveTime核心線程, 具體見getTask方法, 后續會詳細介紹.
?

5. ThreadPoolExecutor一些重要的方法源碼及其原理解析

5.1 execute方法源碼: 提交task到線程池

public void execute(Runnable command) {// 如果task為null, 拋出NPEif (command == null)throw new NullPointerException();// 獲得ctl的int值int c = ctl.get();// workerCount小于corePoolSizeif (workerCountOf(c) < corePoolSize) {// 添加一個新的worker, 作為核心線程池的線程if (addWorker(command, true))// 添加worker作為核心線程成功, execute方法退出return;// 添加worker作為核心線程失敗, 重新獲取ctl的int值c = ctl.get();}// 線程池是RUNNING狀態并且task入阻塞隊列成功if (isRunning(c) && workQueue.offer(command)) {// double-check, 再次獲取ctl的值int recheck = ctl.get();// 線程池不是RUNNING狀態并且當前task從workerQueue被移除成功if (! isRunning(recheck) && remove(command))// 執行拒絕策略reject(command);// 線程池中的workerCount為0else if (workerCountOf(recheck) == 0)// 啟動一個非核心線程, 由于這里的task參數為null, 該線程會從workerQueue拉去任務addWorker(null, false);}// 添加一個非核心線程執行提交的taskelse if (!addWorker(command, false))// 添加一個非核心線程失敗, 執行拒絕策略reject(command);
}

結合上面代碼中的注釋和提交任務到線程池中的流程, 相信我們已經對這個execute方法提交task到線程池的流程的源碼更加清晰了.
?

5.2 addWorker方法源碼: 創建線程并啟動, 執行提交的task

private boolean addWorker(Runnable firstTask, boolean core) {retry:for (;;) {int c = ctl.get();// 線程池運行狀態int rs = runStateOf(c);// 如果線程池運行狀態大于等于SHUTDOWN, 提交的firstTask為null, workQueue為null,返回falseif (rs >= SHUTDOWN &&! (rs == SHUTDOWN &&firstTask == null &&! workQueue.isEmpty()))return false;for (;;) {// workerCountint wc = workerCountOf(c);// 線程數大于了2的29次方-1, 或者想要添加為核心線程但是核心線程池滿, 或者想要添加為臨時線程, 但是workerCount等于或大于了最大的線程池線程數maximumPoolSize, 返回falseif (wc >= CAPACITY ||wc >= (core ? corePoolSize : maximumPoolSize))return false;// CAS的方式讓workerCount數量增加1,如果成功, 終止循環if (compareAndIncrementWorkerCount(c))break retry;c = ctl.get();// 再次檢查runState, 如果被更改, 重頭執行retry代碼if (runStateOf(c) != rs)continue retry;// 其他的, 上面的CAS如果由于workerCount被其他線程改變而失敗, 繼續內部的for循環}}// 標志位workerStarted, workerAddedboolean workerStarted = false;boolean workerAdded = false;Worker w = null;try {// 傳入task對象, 創建Worker對象w = new Worker(firstTask);// 從worker對象中回去Thread對象final Thread t = w.thread;if (t != null) {final ReentrantLock mainLock = this.mainLock;// 獲取mainLock鎖mainLock.lock();try {// 獲取mainLock鎖之后, 再次檢查runStateint rs = runStateOf(ctl.get());// 如果是RUNNING狀態, 或者是SHUTDOWN狀態并且傳入的task為null(執行workQueue中的task)if (rs < SHUTDOWN ||(rs == SHUTDOWN && firstTask == null)) {// 線程已經被啟動, 拋出IllegalThreadStateExceptionif (t.isAlive()) throw new IllegalThreadStateException();// 將worker對象添加到HashSetworkers.add(w);int s = workers.size();// 線程池中曾經達到的最大線程數(上面4.2.6提到過)if (s > largestPoolSize)largestPoolSize = s;// worker被添加成功workerAdded = true;}} finally {// 釋放mainLock鎖mainLock.unlock();}// 如果worker被添加成功, 啟動線程, 執行對應的taskif (workerAdded) {t.start();workerStarted = true;}}} finally {// 如果線程啟動失敗, 執行addWorkerFailed方法if (! workerStarted)addWorkerFailed(w);}return workerStarted;
}

每行代碼都有詳細的對應的注釋, 相信我們已經明白了addWorker方法的過程.
?

5.3 Worker類源碼: 線程是如何執行提交到線程池中的task?

上面的addWorker方法中, 獲得Worker對象中的Thread對象(final Thread t = w.thread;), 并調用線程的start方法啟動線程執行Worker中的run方法.
?

5.3.1 Worker 的定義

繼承了AQS(AbstractQueuedSynchronizer), 重寫了部分方法, 這里的主要作用主要是通過tryLock或isLocked方法判斷當前線程是否正在執行Worker中的run方法, 如果返回false, 則線程沒有正在執行或沒有處于active, 反之, 處于;

結合getActiveCount方法源碼理解;

實現了Runnable接口, 是一個線程可執行的任務.

private final class Workerextends AbstractQueuedSynchronizerimplements Runnable{...
}

?

5.3.2 Worker中的屬性

屬性意義
final Thread thread線程對象, worker會被提交到該線程
Runnable firstTask提交到線程池中的task, 可能為null, 比如方法ThreadPoolExecutor#prestartCoreThread
volatile long completedTasks每個線程完成的任務數

?

5.3.3 Worker的構造方法

首先設置初始狀態state為-1, 這里的setState方法是AQS中的方法;

提交的task賦值給firstTask屬性;

利用ThreadFactory, 傳入當前Worker對象(為了執行當前Worker中的run方法), 創建Thread對象.

Worker(Runnable firstTask) {setState(-1); // inhibit interrupts until runWorkerthis.firstTask = firstTask;this.thread = getThreadFactory().newThread(this);
}

?

5.3.4 Worker中的run方法

Worker對象的run方法, 直接調用了ThreadPoolExecutor的runWorker方法.

public void run() {runWorker(this);
}

?

5.3.5 Worker中的重寫AQS的方法tryAcquire, tryRelease, isHeldExclusively

5.3.5.1 tryAcquire方法

嘗試將state從0設置為1, 成功后把當前持有鎖的線程設置為當前線程;

形參unused沒有用到.

protected boolean tryAcquire(int unused) {if (compareAndSetState(0, 1)) {setExclusiveOwnerThread(Thread.currentThread());return true;}return false;
}

?

5.3.5.2 tryRelease方法

直接將當前持有鎖的線程設置為null, 將state設置為1;

形參unused沒有用到.

protected boolean tryRelease(int unused) {setExclusiveOwnerThread(null);setState(0);return true;
}

?

5.3.5.3 isHeldExclusively方法

判斷當前線程是否已經獲取了Worker的鎖;
如果getState() == 0, 則沒有線程獲取了該鎖, 可以嘗試獲取鎖, 將state設置為1;
如果getState() == 1, 已經有線程獲取了該鎖, 互斥, 此時無法獲取該鎖.

protected boolean isHeldExclusively() {return getState() != 0;
}

?

5.3.6 lock方法

獲取鎖, 直到獲取到鎖為止(具體見AbstractQueuedSynchronizer#acquireQueued方法);

public void lock()        { acquire(1); }

?

5.3.7 tryLock方法

tryLock, 嘗試獲取鎖, 獲取到返回true, 否則返回false.

public boolean tryLock()  { return tryAcquire(1); }

?

5.3.8 isLocked方法

isLocked方法, 如果當前有線程持有該鎖, 則返回true, 否則返回false.

public boolean isLocked() { return isHeldExclusively(); }

?

5.3.9 interruptIfStarted方法

線程啟動會調用unlock方法(ThreadPoolExecutor.java第1131行), 將state設置為0;

如果線程已經啟動, 并且沒有被中斷, 調用線程的中斷方法.

void interruptIfStarted() {Thread t;if (getState() >= 0 && (t = thread) != null && !t.isInterrupted()) {try {t.interrupt();} catch (SecurityException ignore) {}}
}

?

5.3.10 unlock方法

底層調用worker的tryRelease方法, 設置state為0.

public void unlock()      { release(1); }

?

5.4 runWorker方法源碼: 線程池中線程被復用的關鍵

執行提交的task或死循環從BlockingQueue獲取task.

final void runWorker(Worker w) {Thread wt = Thread.currentThread();Runnable task = w.firstTask;w.firstTask = null;w.unlock();boolean completedAbruptly = true;try {// 當傳入的task不為null, 或者task為null但是從BlockingQueue中獲取的task不為nullwhile (task != null || (task = getTask()) != null) {// 執行任務之前先獲取鎖w.lock();// 線程池狀態如果為STOP, 或者當前線程是被中斷并且線程池是STOP狀態, 或者當前線程不是被中斷;// 則調用interrupt方法中斷當前線程if ((runStateAtLeast(ctl.get(), STOP) ||(Thread.interrupted() &&runStateAtLeast(ctl.get(), STOP))) &&!wt.isInterrupted())wt.interrupt();try {// beforeExecute hook方法beforeExecute(wt, task);Throwable thrown = null;try {// 真正執行提交的task的run方法task.run();} catch (RuntimeException x) {thrown = x; throw x;} catch (Error x) {thrown = x; throw x;} catch (Throwable x) {thrown = x; throw new Error(x);} finally {// afterExecute hook方法afterExecute(task, thrown);}} finally {// task賦值為null, 下次從BlockingQueue中獲取tasktask = null;w.completedTasks++;w.unlock();}}completedAbruptly = false;} finally {processWorkerExit(w, completedAbruptly);}

?

5.5 getTask方法源碼: 從BlockingQueue中獲取task

private Runnable getTask() {// BlockingQueue的poll方法是否已經超時boolean timedOut = false; for (;;) {int c = ctl.get();int rs = runStateOf(c);// 如果線程池狀態>=SHUTDOWN,并且BlockingQueue為null;// 或者線程池狀態>=STOP// 以上兩種情況都減少工作線程的數量, 返回的task為nullif (rs >= SHUTDOWN && (rs >= STOP || workQueue.isEmpty())) {decrementWorkerCount();return null;}int wc = workerCountOf(c);// 當前線程是否需要被淘汰boolean timed = allowCoreThreadTimeOut || wc > corePoolSize;if ((wc > maximumPoolSize || (timed && timedOut))&& (wc > 1 || workQueue.isEmpty())) {if (compareAndDecrementWorkerCount(c))return null;continue;}try {// BlockingQueue的poll方法超時會直接返回null// BlockingQueue的take方法, 如果隊列中沒有元素, 當前線程會wait, 直到其他線程提交任務入隊喚醒當前線程.Runnable r = timed ?workQueue.poll(keepAliveTime, TimeUnit.NANOSECONDS) :workQueue.take();if (r != null)return r;timedOut = true;} catch (InterruptedException retry) {timedOut = false;}}
}

?

5.6 shutdown方法源碼: 中斷所有空閑的線程

public void shutdown() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {checkShutdownAccess();// 死循環將線程池狀態設置為SHUTDOWNadvanceRunState(SHUTDOWN);// 中斷所有空閑的線程interruptIdleWorkers();// hook函數, 比如ScheduledThreadPoolExecutor對該方法的重寫onShutdown();} finally {mainLock.unlock();}tryTerminate();}

?

5.7 shutdownNow方法源碼: 中斷所有空閑的線程, 刪除并返回BlockingQueue中所有的task

public List<Runnable> shutdownNow() {List<Runnable> tasks;final ReentrantLock mainLock = this.mainLock;mainLock.lock(www.xincheng0001.cn);try {checkShutdownAccess(www.huizhongdl.cn);// 死循環將線程池狀態設置為STOPadvanceRunState(STOP);// 中斷所有空閑的線程interruptWorkers(www.shangdu2zc.cn);// 刪除并返回BlockingQueue中所有的tasktasks = drainQueue(www.feihongyul.cn );} finally {mainLock.unlock(www.yuanyangyul.com);}tryTerminate(www.youy2zhuce.cn);// 返回BlockingQueue中所有的taskreturn tasks;}

?

6. ThreadPoolExecutor一些其他的方法和屬性介紹

6.1 默認的線程工廠Executors#defaultThreadFactory

默認的線程工廠的兩個重要作用就是創建線程初始化線程名前綴.

創建DefaultThreadFactory對象.

public static ThreadFactory defaultThreadFactory() {return new DefaultThreadFactory();
}

DefaultThreadFactory默認構造方法, 初始化ThreadGroup和創建出的線程名前綴namePrefix.

static class DefaultThreadFactory implements ThreadFactory {private static final AtomicInteger poolNumber = new AtomicInteger(1);private final ThreadGroup group;private final AtomicInteger threadNumber = new AtomicInteger(1);private final String namePrefix;DefaultThreadFactory(www.yongshiyule178.com) {SecurityManager s = System.getSecurityManager();group = (s www.jucaiyle.cn!= null) ? s.getThreadGroup() :Thread.currentThread().getThreadGroup();namePrefix = "pool-" +poolNumber.getAndIncrement() +"-thread-"www.jujinyule.com;}public Thread newThread(Runnable r) www.tengyao3zc.cn{Thread t = new Thread(group, r,namePrefix + threadNumber.getAndIncrement(),0);if (t.isDaemon(www.jintianxuesha.com))// 非daemon線程, 不會隨父線程的消亡而消亡t.setDaemon(false);if (t.getPriority(www.tyyleapp.com) != Thread.NORM_PRIORITY)t.setPriority(Thread.NORM_PRIORITY);return t;}
}

?

6.2 ThreadPoolExecutor默認實現的4種拒絕策略

6.2.1 CallerRunsPolicy

如果線程池狀態不是SHUTDOWN, 由提交任務到線程池中(如調用ThreadPoolExecutor#execute方法)的線程執行該任務;

如果線程池狀態是SHUTDOWN, 則該任務會被直接丟棄掉,?不會再次入隊被任何線程執行.

public static class CallerRunsPolicy implements RejectedExecutionHandler {public CallerRunsPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown()) {r.run();}}
}

?

6.2.2 AbortPolicy

在調用提交任務到線程池中(如調用ThreadPoolExecutor#execute方法)的線程中直接拋出RejectedExecutionException異常, 當然任務也不會被執行, 提交任務的線程如果未捕獲異常會因此停止.

public static class AbortPolicy implements RejectedExecutionHandler {public AbortPolicy(www.javachenglei.com) { www.zhuyngyule.cn}public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {throw new RejectedExecutionException("Task " + r.toString(www.chuancenpt.com) +" rejected from " +e.toString());}
}

?

6.2.3 DiscardPolicy

直接丟棄掉這個任務, 不做任何事情.

public static class DiscardPolicy implements RejectedExecutionHandler {public DiscardPolicy() { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {}
}

?

6.2.4 DiscardOldestPolicy

線程池如果不是SHUTDOWN狀態, 丟棄最老的任務, 即workQueue隊頭的任務, 將當前任務execute提交到線程池;

CallerRunsPolicy一樣, 如果線程池狀態是SHUTDOWN, 則該任務會被直接丟棄掉, 不會再次入隊或被任何線程執行.

public static class DiscardOldestPolicy implements RejectedExecutionHandler {public DiscardOldestPolicy(www.letianhuanchao.cn) { }public void rejectedExecution(Runnable r, ThreadPoolExecutor e) {if (!e.isShutdown(www.shentuylgw.cn)) {e.getQueue().poll(www.fengminpt.cn);e.execute(r);}}

?

6.3 addWorkerFailed方法源碼: 移除啟動線程失敗的worker

private void addWorkerFailed(Worker w) {final ReentrantLock mainLock = this.mainLock;// 獲取mainLock鎖mainLock.lock();try {// 如果worker不為null, 從HashSet中移除workerif (w != null)workers.remove(w);// 循環執行CAS操作直到讓workerCount數量減少1decrementWorkerCount();// 執行tryTerminate方法tryTerminate();} finally {mainLock.unlock();}
}

?

6.4 tryTerminate方法源碼: 嘗試更改runState, workerCount, 嘗試關閉線程池

final void tryTerminate() {for (;;) {// 獲取ctl, runState和workerCountint c = ctl.get();// 當前線程池狀態是否是RUNNING, 或者是否是TIDYING或TERMINATED狀態, 或者是否是SHUTDOWN狀態并且workQueue不為空(需要被線程執行), return結束方法if (isRunning(c) ||runStateAtLeast(c, TIDYING) ||(runStateOf(c) == SHUTDOWN && ! workQueue.isEmpty()))return;// workerCount如果不為0, 隨機中斷一個空閑的線程, return結束方法if (workerCount如果不為0,(c) != 0) {interruptIdleWorkers(ONLY_ONE);return;}final ReentrantLock mainLock = this.mainLock;// 獲取mainLock鎖mainLock.lock();try {// CAS方式設置當前線程池狀態為TIDYING, workerCount為0if (ctl.compareAndSet(c, ctlOf(TIDYING, 0))) {try {// 執行hook方法terminatedterminated();} finally {// 設置當前線程池狀態為TERMINATED, workerCount為0ctl.set(ctlOf(TERMINATED, 0));// 喚醒調用了awaitTermination方法的線程termination.signalAll();}return;}} finally {mainLock.unlock();}// 當CAS失敗, 循環重試}
}

?

6.5 awaitTermination方法源碼: 等待指定時間后, 線程池是否已經關閉

死循環判斷, 如果當前線程池狀態小于TERMINATED, 則wait對應的時間;

如果過了wait的時間(nanos <= 0), 線程池狀態大于等于TERMINATED則循環終止, 函數返回true, 否則返回false.

public boolean awaitTermination(long timeout, TimeUnit unit)throws InterruptedException {long nanos = unit.toNanos(timeout);final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {for (;;) {if (runStateAtLeast(ctl.get(), TERMINATED))return true;if (nanos <= 0)return false;nanos = termination.awaitNanos(nanos);}} finally {mainLock.unlock();}
}

?

6.6 prestartCoreThread方法源碼: 預啟動一個核心線程

如果當前線程池中的核心線程數小于corePoolSize, 則增加一個核心線程(提交的task為null).

public boolean prestartCoreThread() {return workerCountOf(ctl.get()) < corePoolSize &&addWorker(null, true);
}

?

6.7 prestartAllCoreThreads方法源碼: 預先啟動線程池中的所有核心線程

啟動所有的核心線程.

public int prestartAllCoreThreads() {int n = 0;while (addWorker(null, true))++n;return n;}

?

6.8 getActiveCount方法源碼: 獲得當前線程池中活躍的線程

獲得當前線程池中活躍的線程(即正在執行task沒有wait的線程, [runWorker](#5.4 runWorker方法源碼: 線程池中線程被復用的關鍵)方法中的同步代碼塊).

public int getActiveCount() {final ReentrantLock mainLock = this.mainLock;mainLock.lock();try {int n = 0;for (Worker w : workers)if (w.isLocked())++n;return n;} finally {mainLock.unlock();}
}

?

總結

通過介紹ThreadPoolExecutor的構造方法, 重要屬性,?execute方法, 引出Worker類, 以及真正的線程處理提交到線程池中的task的源碼和流程, 對ThreadPoolExecutor整體結構有了清晰的認知;

線程池ThreadPoolExecutor使用BlockingQueue實現線程間的等待-通知機制, 當然也可以自己手動實現;

復用線程體現在runWorker方法中, 死循環+BlockingQueue的特性.

https://www.nshth.com/java/338903.html
>

相关文章:

  • java多線程面試題及答案
  • JAVA線程池實現原理
  • 多線程CC源碼
  • java實現線程池
  • java Thread
  • ASYNCTASK源碼
  • java線程池源碼深度解析
  • 線程池threadlocal
  • pdf去水印軟件免費版,java批量去除pdf簽名,刪除簽名圖標
  • java多線程面試題及答案,JAVA8線程池THREADPOOLEXECUTOR底層原理及其源碼解析
  • java編程,java.lang.Class:是反射的源頭
  • java基礎面試題及答案,HTML CSS 基礎 面試題
  • java編寫軟件工具,Xson:Java對象序列化和反序列化工具
  • nlp預訓練模型,NLP-D62-nlp比賽D31刷題D15
  • kafka如何使用,kafka javax.management.InstanceAlreadyExistsException: kafka.consumer:
  • ssm畢設項目企業部門報銷管理g9d62(java+VUE+Mybatis+Maven+Mysql+sprnig)
  • java小游戲合集,java 煙花_Java 美麗的煙花
  • table列合并,poi操作excel之列合并
  • 找不到指定模塊怎么辦,在烏版圖安裝軟件包時候報錯:E:無法定位軟件包
  • 學云計算好就業嗎,對不起,云計算技術又走錯路了
  • 數電模電基礎知識總結,數電模電實驗課程
  • java的基礎知識,「JavaSE」-面向對象
  • 擴展內存,Java編程內存分析簡要
  • java多線程面試題及答案,【階段一】java之面向對象上
  • java 工作流框架,Activiti工作流使用之SpringBoot整合Activiti
  • 模型的應用形態包括哪些,模型設計準則
  • c語言程序設計培訓班南寧,南寧從零開始學習編程
  • 服務器,Spring Security oAuth2創建認證服務器模塊
  • Java jdk14.0.1安裝簡單步驟
  • 安裝ug12.0當前頁面的腳本發生錯誤,ug提示找不到html文件,[圖文教程] 以下文件無法加載,導致打開操作失敗: 使用當前搜索選項找不到文件,部件已卸載.
  • java執行cmd命令找不到指定文件,java編譯找不到文件_解決cmd運行java程序“找不到文件”提示的方案
  • 線上學畫畫的機構排名,拍樂云推出業內首個「線上美術教學音視頻方案」,打造極致互動體驗
  • day18-java
  • 取兩者中較小值函數,求兩個數中的較大值
  • java多線程面試題及答案,python中的多任務-多線程和多進程
  • 關于Arthas如何遠程監視Java程序
  • Java8 Stream流中的 collect() 方法,遠比你想象中的強大
  • 劉德華《天若有情》,天若有情