多核處理器越來越普及,那有沒有一種簡單的辦法,能夠讓我們寫的軟件釋放多核的威力?答案是:Yes。隨著Golang, Erlang, Scale等為并發(fā)設計的程序語言的興起,新的并發(fā)模式逐漸清晰。正如過程式編程和面向對象一樣,一個好的編程模式需要有一個極其簡潔的內(nèi)核,還有在此之 上豐富的外延,可以解決現(xiàn)實世界中各種各樣的問題。本文以GO語言為例,解釋其中內(nèi)核、外延。
并發(fā)模式之內(nèi)核
這種并發(fā)模式的內(nèi)核只需要協(xié)程和通道就夠了。其中協(xié)程負責執(zhí)行代碼,通道負責在協(xié)程之間傳遞事件。
并發(fā)編程一直以來都是個非常困難的工作。要想編寫一個良好的并發(fā)程序,我們不得不了解線程, 鎖,semaphore,barrier甚至CPU更新高速緩存的方式,而且他們個個都有怪脾氣,處處是陷阱。筆者除非萬不得以,決不會自己操作這些底層 并發(fā)元素。一個簡潔的并發(fā)模式不需要這些復雜的底層元素,只需協(xié)程和通道就夠了。
協(xié)程是輕量級的線程。在過程式編程中,當調用一個過程的時候,需要等待其執(zhí)行完才返回。而調用一個協(xié)程的時候,不需要等待其執(zhí)行完,會立即返回。協(xié)程十分 輕量,Go語言可以在一個進程中執(zhí)行有數(shù)以十萬計的協(xié)程,依舊保持高性能。而對于普通的平臺,一個進程有數(shù)千個線程,其CPU會忙于上下文切換,性能急劇 下降。隨意創(chuàng)建線程可不是一個好主意,但是我們可以大量使用的協(xié)程。
通道是協(xié)程之間的數(shù)據(jù)傳輸通道。通道可以在眾多的協(xié)程之間傳遞數(shù)據(jù),具體可以值也可以是個引用。通道有兩種使用方式。
· 協(xié)程可以試圖向通道放入數(shù)據(jù),如果通道滿了,會掛起協(xié)程,直到通道可以為他放入數(shù)據(jù)為止。
· 協(xié)程可以試圖向通道索取數(shù)據(jù),如果通道沒有數(shù)據(jù),會掛起協(xié)程,直到通道返回數(shù)據(jù)為止。
如此,通道就可以在傳遞數(shù)據(jù)的同時,控制協(xié)程的運行。有點像事件驅動,也有點像阻塞隊列。這兩個概念非常的簡單,各個語言平臺都會有相應的實現(xiàn)。在Java和C上也各有庫可以實現(xiàn)兩者。
只要有協(xié)程和通道,就可以優(yōu)雅的解決并發(fā)的問題。不必使用其他和并發(fā)有關的概念。那如何用這兩把利刃解決各式各樣的實際問題呢?
并發(fā)模式之外延
協(xié)程相較于線程,可以大量創(chuàng)建。打開這扇門,我們拓展出新的用法,可以做生成器,可以讓函數(shù)返回“服務”,可以讓循環(huán)并發(fā)執(zhí)行,還能共享變量。但是出現(xiàn)新 的用法的同時,也帶來了新的棘手問題,協(xié)程也會泄漏,不恰當?shù)氖褂脮绊懶阅。下面會逐一介紹各種用法和問題。演示的代碼用GO語言寫成,因為其簡潔明 了,而且支持全部功能。
生成器
有的時候,我們需要有一個函數(shù)能不斷生成數(shù)據(jù)。比方說這個函數(shù)可以讀文件,讀網(wǎng)絡,生成自增長序列,生成隨機數(shù)。這些行為的特點就是,函數(shù)的已知一些變量,如文件路徑。然后不斷調用,返回新的數(shù)據(jù)。
下面生成隨機數(shù)為例,以讓我們做一個會并發(fā)執(zhí)行的隨機數(shù)生成器。
非并發(fā)的做法是這樣的:
// 函數(shù)rand_generator_1 ,返回 int
funcrand_generator_1() int {
return rand.Int()
}
上面是一個函數(shù),返回一個int。假如rand.Int()這個函數(shù)調用需要很長時間等待,那該函數(shù)的調用者也會因此而掛起。所以我們可以創(chuàng)建一個協(xié)程,專門執(zhí)行rand.Int()。
// 函數(shù)rand_generator_2,返回通道(Channel)
funcrand_generator_2() chan int {
// 創(chuàng)建通道
out := make(chan int)
// 創(chuàng)建協(xié)程
go func() {
for {
//向通道內(nèi)寫入數(shù)據(jù),如果無人讀取會等待
out <- rand.Int()
}
}()
return out
}
funcmain() {
// 生成隨機數(shù)作為一個服務
rand_service_handler :=rand_generator_2()
// 從服務中讀取隨機數(shù)并打印
fmt.Printf("%d\n",<-rand_service_handler)
}
上面的這段函數(shù)就可以并發(fā)執(zhí)行了rand.Int()。有一點值得注意到函數(shù)的返回可以理解為一個“服務”。但我們需要獲取隨機數(shù)據(jù)時候,可以隨時向這個 服務取用,他已經(jīng)為我們準備好了相應的數(shù)據(jù),無需等待,隨要隨到。如果我們調用這個服務不是很頻繁,一個協(xié)程足夠滿足我們的需求了。但如果我們需要大量訪 問,怎么辦?我們可以用下面介紹的多路復用技術,啟動若干生成器,再將其整合成一個大的服務。
調用生成器,可以返回一個“服務”?梢杂迷诔掷m(xù)獲取數(shù)據(jù)的場合。用途很廣泛,讀取數(shù)據(jù),生成ID,甚至定時器。這是一種非常簡潔的思路,將程序并發(fā)化。
多路復用
多路復用是讓一次處理多個隊列的技術。Apache使用處理每個連接都需要一個進程,所以其并發(fā)性能不是很好。而Nginx使用多路復用的技術,讓一 個進程處理多個連接,所以并發(fā)性能比較好。同樣,在協(xié)程的場合,多路復用也是需要的,但又有所不同。多路復用可以將若干個相似的小服務整合成一個大服務。
那么讓我們用多路復用技術做一個更高并發(fā)的隨機數(shù)生成器吧。
// 函數(shù)rand_generator_3 ,返回通道(Channel)
funcrand_generator_3() chan int {
// 創(chuàng)建兩個隨機數(shù)生成器服務
rand_generator_1 := rand_generator_2()
rand_generator_2 := rand_generator_2()
//創(chuàng)建通道
out := make(chan int)
//創(chuàng)建協(xié)程
go func() {
for {
//讀取生成器1中的數(shù)據(jù),整合
out <-<-rand_generator_1
}
}()
go func() {
for {
//讀取生成器2中的數(shù)據(jù),整合
out <-<-rand_generator_2
}
}()
return out
}
上面是使用了多路復用技術的高并發(fā)版的隨機數(shù)生成器。通過整合兩個隨機數(shù)生成器,這個版本的能力是剛才的兩倍。雖然協(xié)程可以大量創(chuàng)建,但是眾多協(xié)程還是會 爭搶輸出的通道。Go語言提供了Select關鍵字來解決,各家也有各家竅門。加大輸出通道的緩沖大小是個通用的解決方法。
多路復用技術可以用來整合多個通道。提升性能和操作的便捷。配合其他的模式使用有很大的威力。
Future技術
Future是一個很有用的技術,我們常常使用Future來操作線程。我們可以在使用線程的時候,可以創(chuàng)建一個線程,返回Future,之后可以通過它等待結果。 但是在協(xié)程環(huán)境下的Future可以更加徹底,輸入?yún)?shù)同樣可以是Future的。
調用一個函數(shù)的時候,往往是參數(shù)已經(jīng)準備好了。調用協(xié)程的時候也同樣如此。但是如果我們將傳入的參 數(shù)設為通道,這樣我們就可以在不準備好參數(shù)的情況下調用函數(shù)。這樣的設計可以提供很大的自由度和并發(fā)度。函數(shù)調用和函數(shù)參數(shù)準備這兩個過程可以完全解耦。 下面舉一個用該技術訪問數(shù)據(jù)庫的例子。
//一個查詢結構體
typequery struct {
//參數(shù)Channel
sql chan string
//結果Channel
result chan string
}
//執(zhí)行Query
funcexecQuery(q query) {
//啟動協(xié)程
go func() {
//獲取輸入
sql := <-q.sql
//訪問數(shù)據(jù)庫,輸出結果通道
q.result <- "get" + sql
}()
}
funcmain() {
//初始化Query
q :=
query{make(chan string, 1),make(chan string, 1)}
//執(zhí)行Query,注意執(zhí)行的時候無需準備參數(shù)
execQuery(q)
//準備參數(shù)
q.sql <- "select * fromtable"
//獲取結果
fmt.Println(<-q.result)
}
上面利用Future技術,不單讓結果在Future獲得,參數(shù)也是在Future獲取。準備好參數(shù)后,自動執(zhí)行。Future和生成器的區(qū)別在 于,F(xiàn)uture返回一個結果,而生成器可以重復調用。還有一個值得注意的地方,就是將參數(shù)Channel和結果Channel定義在一個結構體里面作為 參數(shù),而不是返回結果Channel。這樣做可以增加聚合度,好處就是可以和多路復用技術結合起來使用。
Future技術可以和各個其他技術組合起來用?梢酝ㄟ^多路復用技術,監(jiān)聽多個結果Channel,當有結果后,自動返回。也可以和生成器組合使用,生 成器不斷生產(chǎn)數(shù)據(jù),F(xiàn)uture技術逐個處理數(shù)據(jù)。Future技術自身還可以首尾相連,形成一個并發(fā)的pipe filter。這個pipe filter可以用于讀寫數(shù)據(jù)流,操作數(shù)據(jù)流。
Future是一個非常強大的技術手段?梢栽谡{用的時候不關心數(shù)據(jù)是否準備好,返回值是否計算好的問題。讓程序中的組件在準備好數(shù)據(jù)的時候自動跑起來。
并發(fā)循環(huán)
循環(huán)往往是性能上的熱點。如果性能瓶頸出現(xiàn)在CPU上的話,那么九成可能性熱點是在一個循環(huán)體內(nèi)部。所以如果能讓循環(huán)體并發(fā)執(zhí)行,那么性能就會提高很多。
要并發(fā)循環(huán)很簡單,只有在每個循環(huán)體內(nèi)部啟動協(xié)程。協(xié)程作為循環(huán)體可以并發(fā)執(zhí)行。調用啟動前設置一個計數(shù)器,每一個循環(huán)體執(zhí)行完畢就在計數(shù)器上加一個元素,調用完成后通過監(jiān)聽計數(shù)器等待循環(huán)協(xié)程全部完成。
//建立計數(shù)器
sem :=make(chan int, N);
//FOR循環(huán)體
for i,xi:= range data {
//建立協(xié)程
go func (i int, xi float) {
doSomething(i,xi);
//計數(shù)
sem <- 0;
} (i, xi);
}
// 等待循環(huán)結束
for i := 0; i < N; ++i { <-sem }
上面是一個并發(fā)循環(huán)例子。通過計數(shù)器來等待循環(huán)全部完成。如果結合上面提到的Future技術的話,則不必等待?梢缘鹊秸嬲枰慕Y果的地方,再去檢查數(shù)據(jù)是否完成。
通過并發(fā)循環(huán)可以提供性能,利用多核,解決CPU熱點。正因為協(xié)程可以大量創(chuàng)建,才能在循環(huán)體中如此使用,如果是使用線程的話,就需要引入線程池之類的東西,防止創(chuàng)建過多線程,而協(xié)程則簡單的多。
ChainFilter技術
前面提到了Future技術首尾相連,可以形成一個并發(fā)的pipe filter。這種方式可以做很多事情,如果每個Filter都由同一個函數(shù)組成,還可以有一種簡單的辦法把他們連起來。
由于每個Filter協(xié)程都可以并發(fā)運行,這樣的結構非常有利于多核環(huán)境。下面是一個例子,用這種模式來產(chǎn)生素數(shù)。
// Aconcurrent prime sieve
packagemain
// Sendthe sequence 2, 3, 4, ... to channel 'ch'.
funcGenerate(ch chan<- int) {
for i := 2; ; i++ {
ch<- i // Send 'i' to channel 'ch'.
}
}
// Copythe values from channel 'in' to channel 'out',
//removing those divisible by 'prime'.
funcFilter(in <-chan int, out chan<- int, prime int) {
for {
i := <-in // Receive valuefrom 'in'.
if i%prime != 0 {
out <- i // Send'i' to 'out'.
}
}
}
// Theprime sieve: Daisy-chain Filter processes.
funcmain() {
ch := make(chan int) // Create a newchannel.
go Generate(ch) // Launch Generate goroutine.
for i := 0; i < 10; i++ {
prime := <-ch
print(prime, "\n")
ch1 := make(chan int)
go Filter(ch, ch1, prime)
ch = ch1
}
}
上面的程序創(chuàng)建了10個Filter,每個分別過濾一個素數(shù),所以可以輸出前10個素數(shù)。
Chain-Filter通過簡單的代碼創(chuàng)建并發(fā)的過濾器鏈。這種辦法還有一個好處,就是每個通道只有兩個協(xié)程會訪問,就不會有激烈的競爭,性能會比較好。
共享變量
協(xié)程之間的通信只能夠通過通道。但是我們習慣于共享變量,而且很多時候使用共享變量能讓代碼更簡潔。比如一個Server有兩個狀態(tài)開和關。其他僅僅希望獲取或改變其狀態(tài),那又該如何做呢?梢詫⑦@個變量至于0通道中,并使用一個協(xié)程來維護。
下面的例子描述如何用這個方式,實現(xiàn)一個共享變量。
//共享變量有一個讀通道和一個寫通道組成
typesharded_var struct {
reader chan int
writer chan int
}
//共享變量維護協(xié)程
funcsharded_var_whachdog(v sharded_var) {
go func() {
//初始值
var value int = 0
for {
//監(jiān)聽讀寫通道,完成服務
select {
case value =<-v.writer:
case v.reader <-value:
}
}
}()
}
funcmain() {
//初始化,并開始維護協(xié)程
v := sharded_var{make(chan int),make(chan int)}
sharded_var_whachdog(v)
//讀取初始值
fmt.Println(<-v.reader)
//寫入一個值
v.writer <- 1
//讀取新寫入的值
fmt.Println(<-v.reader)
}
這樣,就可以在協(xié)程和通道的基礎上實現(xiàn)一個協(xié)程安全的共享變量了。定義一個寫通道,需要更新變量的時候,往里寫新的值。再定義一個讀通道,需要讀的時候,從里面讀。通過一個單獨的協(xié)程來維護這兩個通道。保證數(shù)據(jù)的一致性。
一般來說,協(xié)程之間不推薦使用共享變量來交互,但是按照這個辦法,在一些場合,使用共享變量也是可取的。很多平臺上有較為原生的共享變量支持,到底用那種 實現(xiàn)比較好,就見仁見智了。另外利用協(xié)程和通道,可以還實現(xiàn)各種常見的并發(fā)數(shù)據(jù)結構,如鎖等等,就不一一贅述。
協(xié)程泄漏
協(xié)程和內(nèi)存一樣,是系統(tǒng)的資源。對于內(nèi)存,有自動垃圾回收。但是對于協(xié)程,沒有相應的回收機制。會不會若干年后,協(xié)程普及了,協(xié)程泄漏和內(nèi)存泄漏一樣成為 程序員永遠的痛呢?一般而言,協(xié)程執(zhí)行結束后就會銷毀。協(xié)程也會占用內(nèi)存,如果發(fā)生協(xié)程泄漏,影響和內(nèi)存泄漏一樣嚴重。輕則拖慢程序,重則壓垮機器。
C和C++都是沒有自動內(nèi)存回收的程序設計語言,但只要有良好的編程習慣,就能解決規(guī)避問題。對于協(xié)程是一樣的,只要有好習慣就可以了。
只有兩種情況會導致協(xié)程無法結束。一種情況是協(xié)程想從一個通道讀數(shù)據(jù),但無人往這個通道寫入數(shù)據(jù),或許這個通道已經(jīng)被遺忘了。還有一種情況是程想往一個通道寫數(shù)據(jù),可是由于無人監(jiān)聽這個通道,該協(xié)程將永遠無法向下執(zhí)行。下面分別討論如何避免這兩種情況。
對于協(xié)程想從一個通道讀數(shù)據(jù),但無人往這個通道寫入數(shù)據(jù)這種情況。解決的辦法很簡單,加入超時機制。對于有不確定會不會返回的情況,必須加入超時,避免出 現(xiàn)永久等待。另外不一定要使用定時器才能終止協(xié)程。也可以對外暴露一個退出提醒通道。任何其他協(xié)程都可以通過該通道來提醒這個協(xié)程終止。
對于協(xié)程想往一個通道寫數(shù)據(jù),但通道阻塞無法寫入這種情況。解決的辦法也很簡單,就是給通道加緩沖。但前提是這個通道只會接收到固定數(shù)目的寫入。比方說, 已知一個通道最多只會接收N次數(shù)據(jù),那么就將這個通道的緩沖設置為N。那么該通道將永遠不會堵塞,協(xié)程自然也不會泄漏。也可以將其緩沖設置為無限,不過這 樣就要承擔內(nèi)存泄漏的風險了。等協(xié)程執(zhí)行完畢后,這部分通道內(nèi)存將會失去引用,會被自動垃圾回收掉。
funcnever_leak(ch chan int) {
//初始化timeout,緩沖為1
timeout := make(chan bool, 1)
//啟動timeout協(xié)程,由于緩存為1,不可能泄露
go func() {
time.Sleep(1 * time.Second)
timeout <- true
}()
//監(jiān)聽通道,由于設有超時,不可能泄露
select {
case <-ch:
// a read from ch hasoccurred
case <-timeout:
// the read from ch has timedout
}
}
上面是個避免泄漏例子。使用超時避免讀堵塞,使用緩沖避免寫堵塞。
和內(nèi)存里面的對象一樣,對于長期存在的協(xié)程,我們不用擔心泄漏問題。一是長期存在,二是數(shù)量較少。要警惕的只有那些被臨時創(chuàng)建的協(xié)程,這些協(xié)程數(shù)量大且生 命周期短,往往是在循環(huán)中創(chuàng)建的,要應用前面提到的辦法,避免泄漏發(fā)生。協(xié)程也是把雙刃劍,如果出問題,不但沒能提高程序性能,反而會讓程序崩潰。但就像 內(nèi)存一樣,同樣有泄漏的風險,但越用越溜了。
并發(fā)模式之實現(xiàn)
在并發(fā)編程大行其道的今天,對協(xié)程和通道的支持成為各個平臺比不可少的一部分。雖然各家有各家的叫法,但都能滿足協(xié)程的基本要求—并發(fā)執(zhí)行和可大量創(chuàng)建。筆者對他們的實現(xiàn)方式總結了一下。
下面列舉一些已經(jīng)支持協(xié)程的常見的語言和平臺。
GoLang 和Scala作為最新的語言,一出生就有完善的基于協(xié)程并發(fā)功能。Erlang最為老資格的并發(fā)編程語言,返老還童。其他二線語言則幾乎全部在新的版本中加入了協(xié)程。
令人驚奇的是C/C++和Java這三個世界上最主流的平臺沒有在對協(xié)程提供語言級別的原生支持。他們都背負著厚重的歷史,無法改變,也無需改變。但他們還有其他的辦法使用協(xié)程。
Java平臺有很多方法實現(xiàn)協(xié)程:
· 修改虛擬機:對JVM打補丁來實現(xiàn)協(xié)程,這樣的實現(xiàn)效果好,但是失去了跨平臺的好處
· 修改字節(jié)碼:在編譯完成后增強字節(jié)碼,或者使用新的JVM語言。稍稍增加了編譯的難度。
· 使用JNI:在Jar包中使用JNI,這樣易于使用,但是不能跨平臺。
· 使用線程模擬協(xié)程:使協(xié)程重量級,完全依賴JVM的線程實現(xiàn)。
其中修改字節(jié)碼的方式比較常見。因為這樣的實現(xiàn)辦法,可以平衡性能和移植性。最具代表性的JVM語言Scale就能很好的支持協(xié)程并發(fā)。流行的Java Actor模型類庫akka也是用修改字節(jié)碼的方式實現(xiàn)的協(xié)程。
對于C語言,協(xié)程和線程一樣?梢允褂酶鞣N各樣的系統(tǒng)調用來實現(xiàn)。協(xié)程作為一個比較高級的概念,實現(xiàn)方式實在太多,就不討論了。比較主流的實現(xiàn)有l(wèi)ibpcl, coro,lthread等等。
對于C++,有Boost實現(xiàn),還有一些其他開源庫。還有一門名為μC++語言,在C++基礎上提供了并發(fā)擴展。
可見這種編程模型在眾多的語言平臺中已經(jīng)得到了廣泛的支持,不再小眾。如果想使用的話,隨時可以加到自己的工具箱中。
結語
本文探討了一個極其簡潔的并發(fā)模型。在只有協(xié)程和通道這兩個基本元件的情況下?梢蕴峁┴S富的功能,解決形形色色實際問題。而且這個模型已經(jīng)被廣泛的實 現(xiàn),成為潮流。相信這種并發(fā)模型的功能遠遠不及此,一定也會有更多更簡潔的用法出現(xiàn);蛟S未來CPU核心數(shù)目將和人腦神經(jīng)元數(shù)目一樣多,到那個時候,我們 又要重新思考并發(fā)模型了。