假設(shè)我們的系統(tǒng)在運(yùn)行的過程中,源源不斷的有新的任務(wù)需要處理(比如訂單處理),而且這些任務(wù)的處理是相互獨(dú)立的,沒有前后順序依賴性(順序依賴性是指,必須在任務(wù)
A
處理結(jié)束后才可開始
B
任務(wù)),那么我們就可以使用多個(gè)線程來同時(shí)處理多個(gè)任務(wù)。每個(gè)處理任務(wù)的線程稱為“工作者(線程)”。
我設(shè)計(jì)了
ESBasic.Threading.Engines.IWorkerEngine
工作者引擎,其目的就是使用多個(gè)線程來并行處理任務(wù),提高系統(tǒng)的吞吐能力。

2. 適用場(chǎng)合:
設(shè)計(jì)工作者引擎 ESBasic.Threading.Engines.IWorkerEngine 的主要目的是為了解決類似下面的問題:
(1) 充分利用多 CPU 、多核計(jì)算資源。
(2) 減少因高速設(shè)備與低速設(shè)備之間速度差而產(chǎn)生計(jì)算資源浪費(fèi)。
(3) 對(duì)于突發(fā)的大批量的任務(wù)(比如訂單系統(tǒng)經(jīng)常在其它時(shí)段接受的訂單很少,但在某高峰期會(huì)有突發(fā)性的大量的訂單進(jìn)來)進(jìn)行緩沖處理,并最大限度地利用現(xiàn)有資源進(jìn)行處理。
3 .設(shè)計(jì)思想與實(shí)現(xiàn)
IWorkerEngine 的設(shè)計(jì)思路是這樣的:我們使用一個(gè)隊(duì)列來存放需要處理的任務(wù),新來的任務(wù)都會(huì)排隊(duì)到這個(gè)隊(duì)列中,然后有 N 個(gè)工作者線程不斷地從隊(duì)列中取出任務(wù)去處理,每個(gè)線程處理完當(dāng)前任務(wù)后,又從隊(duì)列中取出下一個(gè)任務(wù) …… ,如此循環(huán)。
IWorkerEngine 接口的源碼對(duì)應(yīng)如下:
{
/// <summary>
/// IdleSpanInMSecs當(dāng)沒有工作要處理時(shí),工作者線程休息的時(shí)間間隔。默認(rèn)為10ms
/// </summary>
int IdleSpanInMSecs{ get ; set ;}
/// <summary>
/// WorkerThreadCount工作者線程的數(shù)量。默認(rèn)值為1。
/// </summary>
int WorkerThreadCount{ get ; set ;}
/// <summary>
/// WorkProcesser用于處理任務(wù)的處理器。
/// </summary>
IWorkProcesser < T > WorkProcesser{ set ;}
/// <summary>
/// WorkCount當(dāng)前任務(wù)隊(duì)列中的任務(wù)數(shù)。
/// </summary>
int WorkCount{ get ;}
/// <summary>
/// MaxWaitWorkCount歷史中最大的處于等待狀態(tài)的任務(wù)數(shù)量。
/// </summary>
int MaxWaitWorkCount{ get ;}
void Initialize();
void Start();
void Stop();
/// <summary>
/// AddWork添加任務(wù)。
/// </summary>
void AddWork(Twork);
}
由于任務(wù)的類型不是固定的,所以我們使用的泛型參數(shù) T 來表示要處理任務(wù)的類型。
所有的任務(wù)的具體執(zhí)行都是由 IWorkProcesser 完成的:
{
void Process(Twork);
}
實(shí)現(xiàn)這個(gè)
IWorkerEngine
接口的時(shí)候要注意以下幾點(diǎn):
(1) AddWork 方法會(huì)在多線程的環(huán)境中被調(diào)用,所以必須保證其是線程安全的。
(2) 每個(gè)工作者線程實(shí)際上就是一個(gè)我們前面介紹的循環(huán)引擎 ICycleEngine ,只不過將其 DetectSpanInSecs 設(shè)為 0 即可,表示不間斷地執(zhí)行任務(wù)。 WorkerEngine 便是使用了 N 個(gè) AgileCycleEngine 實(shí)例來作為工作者的。這些 AgileCycleEngine 實(shí)例在 Initialize 方法中被實(shí)例化。
(3) 所有的工作者最終都是執(zhí)行私有的 DoWork 方法,這個(gè)方法就是從任務(wù)隊(duì)列中取出任務(wù)并且調(diào)用 IWorkProcesser 來處理任務(wù),如果任務(wù)隊(duì)列為空,則等待 IdleSpanInMSecs 秒鐘后再重試。
(4) MaxWaitWorkCount 屬性用于記錄自從引擎運(yùn)行以來最大的等待任務(wù)的數(shù)量,通過這個(gè)屬性我們可以推測(cè)任務(wù)量與任務(wù)處理速度之間的差距。
(5)
通過
Start
、
Stop
方法我們可以隨時(shí)停止、啟動(dòng)工作者引擎,并可重復(fù)調(diào)用。
4. 使用時(shí)的注意事項(xiàng)
(1) 當(dāng)引擎已經(jīng)啟動(dòng)并正在運(yùn)行時(shí),如果要修改 WorkerThreadCount 的值并使其生效,則必須先調(diào)用 Stop 方法停止引擎,然后重新調(diào)用 Initialize 方法初始化引擎,再調(diào)用 Start 方法啟動(dòng)引擎。
(2)
關(guān)于工作者線程的個(gè)數(shù)
N
的設(shè)置的問題。這個(gè)數(shù)字不是越大越好,因?yàn)槭褂玫木€程越多,而
CPU
跟不上的話,那么消耗在線程切換上的浪費(fèi)就越嚴(yán)重。所以,為了達(dá)到最好的性能,需要為工作者線程個(gè)數(shù)設(shè)置一個(gè)合適的值。
通常,這個(gè)值跟
CPU
的個(gè)數(shù)、
CPU
核的個(gè)數(shù)、任務(wù)的復(fù)雜度、慢速設(shè)備與快速設(shè)備之間的速度差以及它們的吞吐量有關(guān)。我們可以通過足夠的測(cè)試來發(fā)現(xiàn)適合我們系統(tǒng)的
N
值。
一般情況下的推薦值為:
CPU
個(gè)數(shù)
*
單個(gè)
CPU
的核數(shù)
*2 + 1
。
5. 擴(kuò)展
( 1 )“一次性”的工作者引擎: BriefWorkerEngine
假設(shè)我們的系統(tǒng)可能會(huì)偶爾有一批任務(wù)要處理(也許永遠(yuǎn)也不會(huì)有這樣的任務(wù)出現(xiàn)),我們希望只有當(dāng)任務(wù)到來時(shí),才使用一個(gè)工作者引擎實(shí)例來多線程處理它,處理完后,該引擎就可以釋放掉。
ESBasic.Threading.Engines.BriefWorkerEngine ,精簡(jiǎn)的工作者引擎,便是為這一目的而設(shè)計(jì)的。它使用多線程處理一批任務(wù),當(dāng)這批任務(wù)處理結(jié)束后,工作者線程會(huì)被自動(dòng)釋放,而該引擎實(shí)例也就可以被結(jié)束了。
為了方便使用,我將 BriefWorkerEngine 設(shè)計(jì)為從構(gòu)造函數(shù)注入引擎運(yùn)行所需要的參數(shù),包括任務(wù)處理器、工作者線程個(gè)數(shù)、以及要處理的任務(wù)集合。在引擎實(shí)例被構(gòu)造成功的同時(shí),內(nèi)部的循環(huán)引擎已經(jīng)準(zhǔn)備好了。注意, BriefWorkerEngine 實(shí)現(xiàn)了 IDisposable 接口,這表明當(dāng)引擎被釋放時(shí),內(nèi)部所有的循環(huán)引擎都會(huì)停止運(yùn)行,從而不再占有后臺(tái)線程池中的線程。
我們可以這樣來使用 BriefWorkerEngine :
IList < MyTask > taskList = ... ;
BriefWorkerEngine < MyTask > engine = new BriefWorkerEngine < MyTask > (processer, 5 ,taskList);
engine.Start();
while ( ! engine.IsFinished())
{
System.Threading.Thread.Sleep( 100 );
}
engine.Dispose();
// 執(zhí)行到這里,表示所有任務(wù)已經(jīng)處理完畢,引擎實(shí)例即將被釋放。
我們可以通過它的
IsFinished
方法來檢測(cè)執(zhí)行是否已經(jīng)完成。當(dāng)
IsFinished
方法返回
true
時(shí),引擎實(shí)例就可以被銷毀了。
(
2
)永不停止的工作者引擎
我們同樣可以考慮一個(gè)類似于循環(huán)引擎的擴(kuò)展的情況,假設(shè)我們的系統(tǒng)要求在啟動(dòng)時(shí)就將工作者引擎運(yùn)行起來,而且在整個(gè)運(yùn)行的生命周期中,都不需要停止引擎,那么我們就不想將 Start 方法、 Stop 方法暴露出來以免意外的調(diào)用 Stop 方法而導(dǎo)致引擎停止運(yùn)行,那這個(gè)時(shí)候我們可以使用相同的技巧來做到:
{
private IWorkerEngine < MyTask > workerEngine;
public void Initialize()
{
this .workerEngine = new WorkerEngine < MyTask > ();
this .workerEngine.WorkerThreadCount = 5 ;
// this.workerEngine.WorkProcesser=

this .workerEngine.Initialize();
this .workerEngine.Start();
}
}
public class MyTask {}
其道理與循環(huán)引擎的擴(kuò)展是一樣的。
注:ESBasic源碼可到
http://esbasic.codeplex.com/
下載。
ESBasic討論:37677395
ESBasic開源前言
更多文章、技術(shù)交流、商務(wù)合作、聯(lián)系博主
微信掃碼或搜索:z360901061

微信掃一掃加我為好友
QQ號(hào)聯(lián)系: 360901061
您的支持是博主寫作最大的動(dòng)力,如果您喜歡我的文章,感覺我的文章對(duì)您有幫助,請(qǐng)用微信掃描下面二維碼支持博主2元、5元、10元、20元等您想捐的金額吧,狠狠點(diǎn)擊下面給點(diǎn)支持吧,站長(zhǎng)非常感激您!手機(jī)微信長(zhǎng)按不能支付解決辦法:請(qǐng)將微信支付二維碼保存到相冊(cè),切換到微信,然后點(diǎn)擊微信右上角掃一掃功能,選擇支付二維碼完成支付。
【本文對(duì)您有幫助就好】元
