亚洲免费在线-亚洲免费在线播放-亚洲免费在线观看-亚洲免费在线观看视频-亚洲免费在线看-亚洲免费在线视频

JAVA線程池代碼淺析

系統(tǒng) 2085 0

1. ?????? ExecutorService
JAVA線程池代碼淺析
?
Java 1.5 開始正式提供了并發(fā)包 , 而這個(gè)并發(fā)包里面除了原子變量 ,synchronizer, 并發(fā)容器 , 另外一個(gè)非常重要的特性就是線程池 . 對(duì)于線程池的意義 , 我們這邊不再多說 .

上圖是線程池的主體類圖 ,ThreadPoolExecutor 是應(yīng)用最為廣泛的一個(gè)線程池實(shí)現(xiàn) ( 我也將在接下來的文字中詳細(xì)描述我對(duì)這個(gè)類的理解和執(zhí)行機(jī)制 ),ScheduledThreadPoolExecutor 則在 ThreadPoolExecutor 上提供了定時(shí)執(zhí)行的等附加功能 , 這個(gè)可以從 ScheduledExecutorService 接口的定義中看出來 .Executors 則類似工廠方法 , 提供了幾個(gè)非常常用的線程池初始化方法 .

ThreadPoolExecutor

這個(gè)類繼承了 AbstractExecutorService 抽象類 , AbstractExecutorService 主要的職責(zé)有 2 部分 , 一部分定義和實(shí)現(xiàn)提交任務(wù)的方法 (3 個(gè) submit 方法的實(shí)現(xiàn) ) , 實(shí)例化 FutureTask 并且交給子類執(zhí)行 , 另外一部分實(shí)現(xiàn) invokeAny,invokeAll 方法 . 留給子類的方法為 execute 方法 , 也就是 Executor 接口定義的方法 .

// 實(shí)例化一個(gè)FutureTask,交給子類的execute方法執(zhí)行.這種設(shè)計(jì)能夠保證callable和runnable的執(zhí)行接口方法的一致性(FutureTask包裝了這個(gè)差別)
public ? < T > ?Future < T > ?submit(Runnable?task,?T?result)? {
????
if ?(task? == ? null )? throw ? new ?NullPointerException();
????RunnableFuture
< T > ?ftask? = ?newTaskFor(task,?result);
????execute(ftask);
????
return ?ftask;
}


protected ? < T > ?RunnableFuture < T > ?newTaskFor(Runnable?runnable,?T?value)? {
????
return ? new ?FutureTask < T > (runnable,?value);
}

關(guān)于 FutureTask 這個(gè)類的實(shí)現(xiàn) , 我在前面的 JAVA LOCK 代碼淺析有講過其實(shí)現(xiàn)原理 , 主要的思想就是關(guān)注任務(wù)完成與未完成的狀態(tài) , 任務(wù)提交線程 get() 結(jié)果時(shí)被 park , 等待任務(wù)執(zhí)行完成被喚醒 , 任務(wù)執(zhí)行線程在任務(wù)執(zhí)行完畢后設(shè)置結(jié)果 , 并且 unpark 對(duì)應(yīng)線程并且讓其得到執(zhí)行結(jié)果 .

回到 ThreadPoolExecutor .ThreadPoolExecutor 需要實(shí)現(xiàn)除了我們剛才說的 execute(Runnable command) 方法外 , 還得實(shí)現(xiàn) ExecutorService 接口定義的部分方法 . ThreadPoolExecutor 所提供的不光是這些 , 以下根據(jù)我的理解來列一下它所具有的特性
1. ?????? execute 流程
2. ??????
3. ?????? 工作隊(duì)列
4. ?????? 飽和拒絕策略
5. ?????? 線程工廠
6. ?????? beforeExecute afterExecute 擴(kuò)展

execute 方法的實(shí)現(xiàn)有個(gè)機(jī)制非常重要 , 當(dāng)當(dāng)前線程池線程數(shù)量小于 corePoolSize, 那么生成一個(gè)新的 worker 并把提交的任務(wù)置為這個(gè)工作線程的頭一個(gè)執(zhí)行任務(wù) , 如果大于 corePoolSize, 那么會(huì)試著將提交的任務(wù)塞到 workQueue 里面供線程池里面的worker稍后執(zhí)行 , 并不是直接再起一個(gè) worker, 但是當(dāng) workQueue 也滿 , 并且當(dāng)前線程池小于 maxPoolSize, 那么起一個(gè)新的 worker 并將該任務(wù)設(shè)為該 worker 執(zhí)行的第一個(gè)任務(wù)執(zhí)行 , 大于 maxPoolSize,workQueue 也滿負(fù)荷 , 那么調(diào)用飽和策略里面的行為 .

worker 線程在執(zhí)行完一個(gè)任務(wù)之后并不會(huì)立刻關(guān)閉 , 而是嘗試著去 workQueue 里面取任務(wù) , 如果取不到 , 根據(jù)策略關(guān)閉或者保持空閑狀態(tài) . 所以 submit 任務(wù)的時(shí)候 , 提交的順序?yàn)? 核心線程池 ------ 工作隊(duì)列 ------ 擴(kuò)展線程池 .

池包括核心池
, 擴(kuò)展池 (2 者的線程在同一個(gè) hashset 中,這里只是為了方便才這么稱呼,并不是分離的 ), 核心池在池內(nèi) worker 沒有用完的情況下 , 只要有任務(wù)提交都會(huì)創(chuàng)建新的線程 , 其代表線程池正常處理任務(wù)的能力 . 擴(kuò)展池 , 是在核心線程池用完 , 并且工作隊(duì)列也已排滿任務(wù)的情況下才會(huì)開始初始化線程 , 其代表的是線程池超出正常負(fù)載時(shí)的解決方案 , 一旦任務(wù)完成 , 并且試圖從 workQueue 取不到任務(wù) , 那么會(huì)比較當(dāng)前線程池與核心線程池的大小 , 大于核心線程池?cái)?shù)的 worker 將被銷毀 .

Runnable?getTask()? {
????
for ?(;;)? {
????????
try ? {
????????????
int ?state? = ?runState;
????????????
// >SHUTDOWN就是STOP或者TERMINATED
????????????
// 直接返回
???????????? if ?(state? > ?SHUTDOWN)
????????????????
return ? null ;
????????????Runnable?r;
????????????
// 如果是SHUTDOWN狀態(tài),那么取任務(wù),如果有
??????????????
// 將剩余任務(wù)執(zhí)行完畢,否則就結(jié)束了
???????????? if ?(state? == ?SHUTDOWN)?? // ?Help?drain?queue
????????????????r? = ?workQueue.poll();
????????????
// 如果不是以上狀態(tài)的(也就是RUNNING狀態(tài)的),那么如果當(dāng)前池大于核心池?cái)?shù)量,
????????????
// 或者允許核心線程池取任務(wù)超時(shí)就可以關(guān)閉,那么從任務(wù)隊(duì)列取任務(wù),
????????????
// 如果超出keepAliveTime,那么就返回null了,也就意味著這個(gè)worker結(jié)束了
???????????? else ? if ?(poolSize? > ?corePoolSize? || ?allowCoreThreadTimeOut)
????????????????r?
= ?workQueue.poll(keepAliveTime,?TimeUnit.NANOSECONDS);
????????????
// 如果當(dāng)前池小于核心池,并且不允許核心線程池取任務(wù)超時(shí)就關(guān)閉,那么take(),直到拿到任務(wù)或者被interrupt
???????????? else
????????????????r?
= ?workQueue.take();
????????????
// 如果經(jīng)過以上判定,任務(wù)不為空,那么返回任務(wù)
???????????? if ?(r? != ? null )
????????????????
return ?r;
????????????
// 如果取到任務(wù)為空,那么判定是否可以退出
???????????? if ?(workerCanExit())? {
????????????????
// 如果整個(gè)線程池狀態(tài)變?yōu)镾HUTDOWN或者TERMINATED,那么將所有worker?interrupt?(如果正在執(zhí)行,那繼續(xù)讓其執(zhí)行)
???????????????? if ?(runState? >= ?SHUTDOWN)? // ?Wake?up?others
????????????????????interruptIdleWorkers();
????????????????
return ? null ;
????????????}

????????????
// ?Else?retry
????????}
? catch ?(InterruptedException?ie)? {
????????????
// ?On?interruption,?re-check?runState
????????}

}

????}


// worker從workQueue中取不到數(shù)據(jù)的時(shí)候調(diào)用此方法,以決定自己是否跳出取任務(wù)的無限循環(huán),從而結(jié)束此worker的運(yùn)行
private ? boolean ?workerCanExit()? {
????
final ?ReentrantLock?mainLock? = ? this .mainLock;
????mainLock.lock();
????
boolean ?canExit;
????
try ? {
????????
/**/ /*
????????*線程池狀態(tài)為stop或者terminated,
????????*或者任務(wù)隊(duì)列里面任務(wù)已經(jīng)為空,
????????*或者允許線程池線程空閑超時(shí)(實(shí)現(xiàn)方式是從工作隊(duì)列拿最多keepAliveTime的任務(wù),超過這個(gè)時(shí)間就返回null了)并且
?????????*當(dāng)前線程池大于corePoolSize(>1)
????????*那么允許線程結(jié)束
????????*static?final?int?RUNNING????=?0;
????????*static?final?int?SHUTDOWN???=?1;
????????*static?final?int?STOP???????=?2;
????????*static?final?int?TERMINATED?=?3;
????????
*/

????????canExit?
= ?runState? >= ?STOP? ||
????????workQueue.isEmpty()?
||
???????(allowCoreThreadTimeOut?
&&
????????poolSize?
> ?Math.max( 1 ,corePoolSize));
????}
? finally ? {
????????mainLock.unlock();
????}

????
return ?canExit;
}


當(dāng)提交任務(wù)是 , 線程池都已滿 , 并且工作隊(duì)列也無空閑位置的情況下 ,ThreadPoolExecutor 會(huì)執(zhí)行 reject 操作 ,JDK 提供了四種 reject 策略 , 包括 AbortPolicy( 直接拋 RejectedException Exception),CallerRunsPolicy( 提交任務(wù)線程自己執(zhí)行 , 當(dāng)然這時(shí)剩余任務(wù)也將無法提交 ),DiscardOldestPolicy( 將線程池的 workQueue 任務(wù)隊(duì)列里面最老的任務(wù)剔除 , 將新任務(wù)丟入 ),DiscardPolicy( 無視 , 忽略此任務(wù) , 并且立即返回 ). 實(shí)例化 ThreadPoolExecutor 時(shí) , 如果不指定任何飽和策略 , 默認(rèn)將使用 AbortPolicy.

個(gè)人認(rèn)為這些飽和策略并不十分理想
, 特別是在應(yīng)用既要保證快速 , 又要高可用的情況下 , 我的想法是能夠加入超時(shí)等待策略 , 也就是提交線程時(shí)線程池滿 , 能夠 park 住提交任務(wù)的線程 , 一旦有空閑 , 能在第一時(shí)間通知到等待線程 . 這個(gè)實(shí)際上和主線程執(zhí)行相似 , 但是主線程執(zhí)行期間即使線程池有大量空閑也不會(huì)立即可以提交任務(wù) , 效率上后者可能會(huì)比較低 , 特別是執(zhí)行慢速任務(wù) .

實(shí)例化 Worker 的時(shí)候會(huì)調(diào)用 ThreadFactory addThread(Runnable r) 方法返回一個(gè) Thread, 這個(gè)線程工廠是可以在 ThreadPoolExecutor 實(shí)例化的時(shí)候指定的 , 如果不指定 , 那么將會(huì)使用 DefaultThreadFactory, 這個(gè)也就是提供給使用者命名線程 , 線程歸組 , 是否是 demon 等線程相關(guān)屬性設(shè)置的機(jī)會(huì) .

beforeExecute afterExecute 是提供給使用者擴(kuò)展的 , 這兩個(gè)方法會(huì)在 worker runTask 之前和 run 完畢之后分別調(diào)用 .JDK 注釋里 Doug Lea(concurrent 包作者 ) 展示了 beforeExecute 一個(gè)很有趣的示例 . 代碼如下 .

class ?PausableThreadPoolExecutor? extends ?ThreadPoolExecutor? {
????
private ? boolean ?isPaused;
????
private ?ReentrantLock?pauseLock? = ? new ?ReentrantLock();
????
private ?Condition?unpaused? = ?pauseLock.newCondition();
?
public ?PausableThreadPoolExecutor( )? {? super ( );?}

protected ? void ?beforeExecute(Thread?t,?Runnable?r)? {
????
super .beforeExecute(t,?r);
????pauseLock.lock();
????
try ? {
????????
while ?(isPaused)?unpaused.await();
????}
? catch ?(InterruptedException?ie)? {
????????t.interrupt();
????}
? finally ? {
????????pauseLock.unlock();
????}

}

?
public ? void ?pause()? {
????pauseLock.lock();
????
try ? {
????????isPaused?
= ? true ;
????}
? finally ? {
????????pauseLock.unlock();
????}

}


public ? void ?resume()? {
????pauseLock.lock();
????
try ? {
????????isPaused?
= ? false ;
????????unpaused.signalAll();
????}
? finally ? {
????????pauseLock.unlock();
????}

}

??}

使用這個(gè)線程池 , 用戶可以隨時(shí)調(diào)用 pause 中止剩余任務(wù)執(zhí)行 , 當(dāng)然也可以使用 resume 重新開始執(zhí)行剩余任務(wù) .

ScheduledThreadPoolExecutor

ScheduledThreadPoolExecutor
是一個(gè)很實(shí)用的類 , 它的實(shí)現(xiàn)核心是基于 DelayedWorkQueue. ScheduledThreadPoolExecutor 的繼承結(jié)構(gòu)上來看 , 各位應(yīng)該能夠看出些端倪來 , 就是 ScheduledThreadPoolExecutor ThreadPoolExecutor 中的任務(wù)隊(duì)列設(shè)置成了 DelayedWorkQueue, 這也就是說 , 線程池 Worker 從任務(wù)隊(duì)列中取的一個(gè)任務(wù) , 需要等待這個(gè)隊(duì)列中最短超時(shí)任務(wù)的超時(shí) , 也就是實(shí)現(xiàn)定時(shí)的效果 . 所以 ScheduledThreadPoolExecutor 所做的工作其實(shí)是比較少的 . 主要就是實(shí)現(xiàn)任務(wù)的實(shí)例化并加入工作隊(duì)列 , 以及支持 scheduleAtFixedRate scheduleAtFixedDelay 這種周期性任務(wù)執(zhí)行 .

public ?ScheduledThreadPoolExecutor( int ?corePoolSize,ThreadFactory?threadFactory)? {
???????????
super (corePoolSize,?Integer.MAX_VALUE,? 0 ,?TimeUnit.NANOSECONDS, new ?DelayedWorkQueue(),?threadFactory);
}

對(duì)于 scheduleAfFixedRate scheduleAtFiexedDelay 這種周期性任務(wù)支持 , 是由 ScheduledThreadPoolExecutor 內(nèi)部封裝任務(wù)的 ScheduledFutureTask 來實(shí)現(xiàn)的 . 這個(gè)類在執(zhí)行任務(wù)后 , 對(duì)于周期性任務(wù) , 它會(huì)處理周期時(shí)間 , 并將自己再次丟入線程池的工作隊(duì)列 , 從而達(dá)到周期執(zhí)行的目的 .
private ? void ?runPeriodic()? {
?????????
boolean ?ok? = ?ScheduledFutureTask. super .runAndReset();
???????? ?
boolean ?down? = ?isShutdown();
?????????
// ?Reschedule?if?not?cancelled?and?not?shutdown?or?policy?allows
????? if ?(ok? && ?( ! down? || (getContinueExistingPeriodicTasksAfterShutdownPolicy()? && ? ! isStopped())))? {
???????????????
long ?p? = ?period;
???????????????
if ?(p? > ? 0 )
??????????? ????????? time?
+= ?p;
???????????????
else
??????????? ????????? time?
= ?triggerTime( - p);
?????
??????????????? ScheduledThreadPoolExecutor.
super .getQueue().add( this );
?????????}

????????
// ?This?might?have?been?the?final?executed?delayed
???????
// ?task.??Wake?up?threads?to?check.
??????? else ? if ?(down)
???????????? ?interruptIdleWorkers();
}

?

2. ?????? CompletionService

JAVA線程池代碼淺析
ExecutorCompletionService

CompletionService 定義了線程池執(zhí)行任務(wù)集 , 可以依次拿到任務(wù)執(zhí)行完畢的 Future,ExecutorCompletionService 是其實(shí)現(xiàn)類 , 先舉個(gè)例子 , 如下代碼 , 這個(gè)例子中 , 需要注意 ThreadPoolExecutor 核心池一定保證能夠讓任務(wù)提交并且馬上執(zhí)行 , 而不是放到等待隊(duì)列中去 , 那樣次序?qū)?huì)無法控制 ,CompletionService 也將失去效果 ( 其實(shí)核心池中的任務(wù)完成順序還是準(zhǔn)確的 ).

public ? static ? void ?main(String[]?args)? throws ?InterruptedException,?ExecutionException {
????ThreadPoolExecutor?es
= new ?ThreadPoolExecutor( 10 ,? 15 ,? 2000 ,?TimeUnit.MILLISECONDS,? new ?ArrayBlockingQueue < Runnable > ( 10 ), new ?ThreadPoolExecutor.AbortPolicy());
????CompletionService
< String > ?cs = new ?ExecutorCompletionService < String > (es);????
????cs.submit(
new ?Callable < String > ()? {
?????@Override
?????
public ?String?call()? throws ?Exception? Codehi
分享到:
評(píng)論

JAVA線程池代碼淺析


更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主

微信掃碼或搜索:z360901061

微信掃一掃加我為好友

QQ號(hào)聯(lián)系: 360901061

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。

【本文對(duì)您有幫助就好】

您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描上面二維碼支持博主2元、5元、10元、自定義金額等您想捐的金額吧,站長(zhǎng)會(huì)非常 感謝您的哦!!!

發(fā)表我的評(píng)論
最新評(píng)論 總共0條評(píng)論
主站蜘蛛池模板: 在线观看av片永久免费 | 中文字幕亚洲综合久久2 | 九九视频免费在线观看 | 亚洲精品美女久久久久 | 天天综合天天做 | 美女bbxx美女bbb | 伊人国产在线播放 | 91av最新地址 | 免费一级a毛片在线播出 | 中国性xxxxx极品奶水 | 午夜久久免影院欧洲 | 精品精品国产欧美在线观看 | 亚洲欧洲一区二区三区在线 | 天天操夜夜嗨 | 99热成人精品免费久久 | 成人免费视频网 | 美女视频免费在线观看 | 国产亚洲一级精品久久 | 特级黄色| 欧美一级成人一区二区三区 | 婷婷色国产 | 不一样的天空在线高清观看 | 亚洲小说春色综合另类网蜜桃 | 四虎影院.com | 国产精品婷婷久青青原 | 四虎院影永久在线观看 | 5060网午夜一级毛片在线看 | 欧美成人久久久免费播放 | 中文字幕久精品免费视频蜜桃视频 | 高h粗大强行撑开紧窄的嫩缝 | 久久国产免费观看 | 国产日韩片 | 日本中文字幕不卡 | 免费看成人播放毛片 | 免费看成人国产一区二区三区 | 久草免费资源视频 | 国产精品久线观看视频 | 日本视频中文字幕一区二区 | 欧美成人免费高清网站 | 国产亚洲一区呦系列 | 91婷婷|