前不久參加了個Opnstack的Meetup,其中有一個來自EasyStack的大大就Nova中的協同并發做了一番講解,有所感觸,本想當天就總結一下,但是由于前段時間工作上比較忙,加上為了履行諾言每天幾更的來寫設計模式系列性文章,故而拖到今天才寫此次的總結。好吧,其實歸根結底還是自己太懶了,趁著閑時在補新番小籠包之類的。廢話就此打住,開始正文。
Python中協程的介紹
在此之前,先介紹下Python中的并發,在Python中,并發有三種,分別是:
進程:Python中一般使用multiprocessing/subprocess來實現
線程:threading/thread是Python中用來實現多線程的模塊
協程(Coroutines):Python中用于處理協程的模塊倒是比較多,有eventlet、Twisted、Tulip、asyncio
有關進程、協程、線程中的關系圖如下所示(圖來自EasyStack的大大):
想必大家對進程以及線程那是相當的熟悉了,所以就重點介紹下協程:協程源自 Simula 和 Modula-2 語言,但也有其他語言支持。協程更適合于用來實現彼此熟悉的程序組件,如合作式多任務,迭代器,無限列表和管道。 協程最初在1963年被提出。那么協程又有什么特點呢?
每個協程都有自己的私有stack以及局部變量。
線程我們都知道可以多個同時運行,也就是所謂的多線程,但是同一時間只有一個協程在運行,所以就無須對某些共享變量加鎖。
由于協程比較輕量級,所以一個線程中可以有多個協程。
協程之間的執行順序,完全由程序來控制。
其實協程也就僅僅是一種概念罷了,非操作系統可見,在多種語言中都有實現,一會詳細介紹的eventlet就是在Python中實現的一種。
Eventlet的介紹
eventlet其實就是對greenlet的一個封裝,對其進行簡單的封裝之后,就成了所謂的greenthread,greenlet是一個稱為協程(coroutine)的東西。下面上一個greenlet的例子來介紹一下greenlet:
1from greenletimportgreenlet23def test1:4print 125gr2.switch6print 3478def test2:9print 5610gr1.switch11print 781213gr1 =greenlet(test1)14gr2 =greenlet(test2)15gr1.switch執行結果是:
也就是說在這里先定義了兩個函數test1,test2以及兩個協程gr1,gr2,最后一行g1.switch跳轉到 test1 ,它打印12,然后執行gr2.switch,跳轉到 test2 ,打印56,然后又執行了gr1.switch跳轉回 test1 ,打印34,然后 test1 就結束,gr1死掉,回到父greenlet,不會再切換到test2,所以不會打印78。在上面的例子中main greenlet就是它們的父 greenlet。
eventlet是一個用來處理和網絡相關的python庫函數,而且可以通過協程來實現并發,在eventlet里,把“協程”叫做greenthread(綠色線程)。所謂并發,就是開啟了多個greenthread,并且對這些greenthread進行管理,以實現非阻塞式的I/O。比如說用eventlet可以很方便的寫一個性能很好的web服務器,或者是一個效率很高的網頁爬蟲,這都歸功于eventlet的“綠色線程”,以及對“綠色線程”的管理機制。更讓人不可思議的是,eventlet為了實現“綠色線程”,竟然對python的和網絡相關的幾個標準庫函數進行了改寫,并且可以以補丁(patch)的方式導入到程序中,因為python的庫函數只支持普通的線程,而不支持協程,eventlet稱之為“綠化”。Eventlet庫在OpenStack服務中上鏡率很高,尤其是在服務的多線程和WSGI Server并發處理請求的情況下。
主要API如下:
Greenthread 產 :
spawn(func, *args, **kwargs): 創建一個綠色線程去運行func這個函數,后面的參數是傳遞給這個函數的參數。返回值是一個eventlet.GreenThread對象,這個對象可以用來接受func函數運行的返回值。在綠色線程池還沒有滿的情況下,這個綠色線程一被創建就立刻被執行。其實,用這種方法去創建線程也是可以理解的,線程被創建出來,肯定是有一定的任務要去執行,這里直接把函數當作參數傳遞進去,去執行一定的任務,就好像標準庫中的線程用run方法去執行任務一樣。
spawn_n(func, *args, **kwargs): 這個函數和spawn類似,不同的就是它沒有返回值,因而更加高效,這種特性,使它也有存在的價值。
spawn_after(seconds, func, *args, **kwargs): 這個函數和spawn基本上一樣,都有一樣的返回值,不同的是它可以限定在什么時候執行這個綠色線程,即在seconds秒之后,啟動這個綠色線程。
Greenthread 控制:
sleep(seconds=0):中止當前綠色線程,以允許其它綠色線程執行。
eventlet.GreenPool: 這是一個類,在這個類中用set集合來容納所創建的綠色線程,并且可以指定容納線程的最大數量(默認是1000個),它的內部是用Semaphore和Event這兩個類來對池進行控制的,這樣就構成了線程池,下面有一些重要的方法:
running:返回當前池中的綠色線程數free:返回當前池中可容納的綠色線程數spawn:創建新的綠色線程spawn_n:同上[page]接著談談Openstack中Nova對其的應用。
eventlet
在nova/cmd/__init__.py中,就直接調用了eventlet的方法,代碼如下:
from nova import debugger if debugger.enabled(): eventlet.monkey_patch(os=False, thread=False) else: eventlet.monkey_patch(os=False)這里在調試器被啟動后,關閉線程,然后啟用遠程調試器。這個就是eventlet.monkey_patch()的方法。這里僅僅是因為dnspython無法支持IPV6,所以使用eventlet的monkeypatch檢測一下環境變量的設置是否符合。
greenthread
在虛機遷移過程中如果看過我寫的源碼分析,相信對于下面的代碼不會陌生:
greenthread.spawn(self._live_migration, context, instance, dest, post_method, recover_method, block_migration, migrate_data)這個是熱遷移中所使用的所調用的由eventlet所封裝而成的綠色線程,調用了spawn(func,*args, kwargs)的函數,創建了一個綠色線程去運行live_migration也就是熱遷移的函數,返回值是一個eventlet.greenthread的對象,這個對象可以用來接受live_migration運行的返回值。在綠色線程池未滿的情況下,就可以直接執行熱遷移的函數。
greenthread.sleep
然后Nova中用到的最多的綠色線程的栗子可能就是time.sleep了吧,下面隨便找了幾個用到的例子:
for cnt in range(max_retry): try: self.plug_vifs(instance, network_info) break except processutils.ProcessExecutionError: if cnt == max_retry - 1: raise else: LOG.warn(_('plug_vifs() failed %(cnt)d. Retry up to ' '%(max_retry)d.'), {'cnt': cnt, 'max_retry': max_retry}, instance=instance) greenthread.sleep(1)這個是調用plug_vifs的函數中的greenthread.sleep()的函數調用,這個函數多次的發送請求。
except exception.InstanceNotFound: pass greenthread.sleep(0) return disk_over_committed_size像這樣的栗子還有好多,一般情況下,greenthread.sleep()綠色線程的函數是為了中止當前的線程,用來給其它的線程一個執行的機會。其實說的通俗點就是傳說中的孔融讓梨了,不過此處的梨就是CPU、內存等等一些資源了,綠色池中的空間了之類的,突然發現程序也是那么的有人情味啊~
loopingcall
接下來談談用loopingcall實現固定時間間隔運行的函數:
def _wait_for_reboot(): state = self.get_info(instance)['state'] if state == power_state.RUNNING: LOG.info(_("Instance rebooted successfully."), instance=instance) raise loopingcall.LoopingCallDone() timer = loopingcall.FixedIntervalLoopingCall(_wait_for_reboot) timer.start(interval=0.5).wait()這個函數是等待虛機重啟的函數,每隔0.5s調用一次函數,檢查虛機狀態,直到虛機重新啟動。此函數通過拋出LoopingCallDone來異常退出。
好了,對于虛機中的協同并發就到此結束了。
博文地址:http://www.cnblogs.com/voidy/