簡單實現(xiàn)并發(fā):python concurrent模塊
技術(shù)支持服務(wù)電話:15308000360 【7x24提供運維服務(wù),解決各類系統(tǒng)/軟硬件疑難技術(shù)問題】
可以使用python 3中的concurrent模塊
如果python環(huán)境是2.7的話,需要下載https://pypi.python.org/packages/source/f/futures/futures-2.1.6.tar.gz#md5=cfab9ac3cd55d6c7ddd0546a9f22f453
此futures包即可食用concurrent模塊。
官方文檔:http://pythonhosted.org//futures/
對于python來說,作為解釋型語言,Python的解釋器必須做到既安全又高效。我們都知道多線程編程會遇到的問題,解釋器要留意的是避免在不同的線程操作內(nèi)部共享的數(shù)據(jù),同時它還要保證在管理用戶線程時保證總是有最大化的計算資源。而python是通過使用全局解釋器鎖來保護數(shù)據(jù)的安全性:
python代碼的執(zhí)行由python虛擬機來控制,即Python先把代碼(.py文件)編譯成字節(jié)碼(字節(jié)碼在Python虛擬機程序里對應(yīng)的是PyCodeObject對象,.pyc文件是字節(jié)碼在磁盤上的表現(xiàn)形式),交給字節(jié)碼虛擬機,然后虛擬機一條一條執(zhí)行字節(jié)碼指令,從而完成程序的執(zhí)行。python在設(shè)計的時候在虛擬機中,同時只能有一個線程執(zhí)行。同樣地,雖然python解釋器中可以運行多個線程,但在任意時刻,只有一個線程在解釋器中運行。而對python虛擬機的訪問由全局解釋器鎖來控制,正是這個鎖能保證同一時刻只有一個線程在運行。在多線程的環(huán)境中,python虛擬機按一下方式執(zhí)行:
1,設(shè)置GIL(global interpreter lock).
2,切換到一個線程執(zhí)行。
3,運行:
a,指定數(shù)量的字節(jié)碼指令。
b,線程主動讓出控制(可以調(diào)用time.sleep(0))。
4,把線程設(shè)置為睡眠狀態(tài)。
5,解鎖GIL.
6,再次重復(fù)以上步驟。
GIL的特性,也就導(dǎo)致了python不能充分利用多核cpu。而對面向I/O的(會調(diào)用內(nèi)建操作系統(tǒng)C代碼的)程序來說,GIL會在這個I/O調(diào)用之前被釋放,以允許其他線程在這個線程等待I/O的時候運行。如果線程并為使用很多I/O操作,它會在自己的時間片一直占用處理器和GIL。這也就是所說的:I/O密集型python程序比計算密集型的程序更能充分利用多線程的好處。
總之,不要使用python多線程,使用python多進程進行并發(fā)編程,就不會有GIL這種問題存在,并且也能充分利用多核cpu。
一,提供的功能
提供了多線程和多進程的并發(fā)功能
二,基本方法
class concurrent.futures.Executor (注:Executor為ThreadPoolExecutor或者ProcessPoolExecutor)
提供的方法如下:
submit(fn, *args, **kwargs)
fn:為需要異步執(zhí)行的函數(shù)
args,kwargs:為給函數(shù)傳遞的參數(shù)
例:
#!/bin/env python
#coding:utf-8
import time,re
import os,datetime
from concurrent import futures
def wait_on_b():
print 5
time.sleep(2)
def wait_on_a():
print 6
time.sleep(2)
ex = futures.ThreadPoolExecutor(max_workers=2)
ex.submit(wait_on_b)
ex.submit(wait_on_a)
wait_on_a和wait_on_b函數(shù)會同時執(zhí)行,因為使用了2個worker
#####################################
map(func, *iterables, timeout=None)
此map函數(shù)和python自帶的map函數(shù)功能類似,只不過concurrent模塊的map函數(shù)從迭代器獲得參數(shù)后異步執(zhí)行。并且,每一個異步操作,能用timeout參數(shù)來設(shè)置超時時間,timeout的值可以是int或float型,如果操作timeout的話,會raisesTimeoutError。如果timeout參數(shù)不指定的話,則不設(shè)置超時間。
func:為需要異步執(zhí)行的函數(shù)
iterables:可以是一個能迭代的對象,例如列表等。每一次func執(zhí)行,會從iterables中取參數(shù)。
timeout:設(shè)置每次異步操作的超時時間
例:
#!/bin/env python
#coding:utf-8
import time,re
import os,datetime
from concurrent import futures
data = [‘1‘,‘2‘]
def wait_on(argument):
print argument
time.sleep(2)
return ‘ok‘
ex = futures.ThreadPoolExecutor(max_workers=2)
for i in ex.map(wait_on,data):
print i
map函數(shù)異步執(zhí)行完成之后,結(jié)果也是list,數(shù)據(jù)需要從list中取出
######################################
submit函數(shù)和map函數(shù),根據(jù)需要,選一個使用即可。
shutdown(wait=True)
此函數(shù)用于釋放異步執(zhí)行操作后的系統(tǒng)資源。
If wait is True then this method will not return until all the pending futures are done executing and the resources associated with the executor have been freed. If wait is False then this method will return immediately and the resources associated with the executor will be freed when all pending futures are done executing. Regardless of the value of wait, the entire Python program will not exit until all pending futures are done executing.
You can avoid having to call this method explicitly if you use the with statement, which will shutdown the Executor (waiting as if Executor.shutdown() were called with wait set to True):
with ThreadPoolExecutor(max_workers=4) as e:e.submit(shutil.copy, ‘src1.txt‘, ‘dest1.txt‘)
三,完整的concurrent例子:
#!/bin/env python
#coding:utf-8
import time,re,fcntl
import os,datetime
from concurrent import futures
count_list = list()
MinuteNum = 1
StartTime = datetime.datetime(2014, 4, 16, 19, 31, 0, 484870)
NowTime = datetime.datetime.now()
os.system(‘:>new.txt‘)
f_new = open(‘new.txt‘,‘a‘)
def test(CountTimeFormat):
f = open(‘push_slave.stdout‘,‘r‘)
for line in f.readlines():
if re.search(CountTimeFormat,line):
#獲得文件專用鎖
fcntl.flock(f_new, fcntl.LOCK_EX)
f_new.writelines(line)
f_new.flush()
#釋放文件鎖
fcntl.flock(f_new, fcntl.LOCK_UN)
break
while 1:
AfterOneMinute = datetime.timedelta(minutes=MinuteNum)
CountTime = AfterOneMinute+StartTime
CountTimeFormat = CountTime.strftime(‘%Y-%m-%d %H:%M‘)
MinuteNum = MinuteNum+1
count_list.append(CountTimeFormat)
if CountTimeFormat == "2014-04-23 16:00":
break
def exec_cmd():
with futures.ProcessPoolExecutor(max_workers=24) as executor:
dict(( executor.submit(test, times), times) for times in count_list)
if __name__ == ‘__main__‘:
exec_cmd()
f_new.close()