2009年1月23日金曜日

download mass images 3

download mass images 2のつづき

前回で問題としてあがったこと。
1. タプル
2. 出力の乱れ

まず、q.put((download, urljoin(url, a[-1].get('href'))))を書き換えます。
今は引数ひとつしか受け入れられないので、
q.put(func, args, kwargs)

func, args, kwargs = q.get()
になればいいですね。

  1. q = Queue()  
  2. def worker(func):  
  3.     def inner(*args, **kwargs):  
  4.         print func, args, kwargs  
  5.         q.put((func, args, kwargs))  
  6.     return inner  
  7.  
  8. @worker  
  9. def download(url):  
  10.     print 'down  '+url  
  11.     urllib.urlretrieve(url, url[url.rfind('/')+1:])  
  12.  
  13. @worker  
  14. def parse(url):  
  15.     print 'parse '+url  
  16.     soup = BeautifulSoup(urllib.urlopen(url).read())  
  17.     a = soup('a', href=re.compile(r'^/images/pub/\d+/\w+\.jpg$'))  
  18.     download(urljoin(url, a[-1].get('href')))  
  19.  
  20. @worker  
  21. def images(url):  
  22.     print 'image '+url  
  23.     soup = BeautifulSoup(urllib.urlopen(url).read())  
  24.     a = soup('a', {'class':'image_a'}, href=re.compile(r'^/backgrounds/\d+$'))  
  25.     map(lambda a:parse(urljoin(url, a.get('href'))), a[:4])  
  26.   
  27. def pages(url):  
  28.     print 'pages '+url  
  29.     soup = BeautifulSoup(urllib.urlopen(url).read())  
  30.     a = soup('a', href=re.compile(r'^/desktop/\w+\.php$'))  
  31.     map(lambda a:images(urljoin(url, a.get('href'))), a[:4])  
  32.   
  33. if __name__ == '__main__':  
  34.     def loop():  
  35.         while 1:  
  36.             func, args, kwargs = q.get()  
  37.             try:  
  38.                 func(*args, **kwargs)  
  39.             except Exception, e:  
  40.                 print e  
  41.             finally:  
  42.                 q.task_done()  
  43.     for i in range(THREAD_MAX):  
  44.         w = Thread(target=loop)  
  45.         w.daemon = True  
  46.         w.start()  
  47.     pages(sys.argv[1])  
  48.     q.join()  

なんとなく思いついたので、デコレータにしてみました。
これなら、呼び出しがシングルスレッドと同じになります。
引数も可変です。
読みやすいですね。
欠点はシングルスレッドと同じと言うことでしょう。
似たようなものは似た書き方をすべきではないかもしれません。

表示の乱れに関しては、
  1. from threading import Lock  
  2. lock = Lock()  
  3. def p(s):  
  4.     lock.acquire()  
  5.     try:  
  6.         print s  
  7.     finally:  
  8.         lock.release()  

ロックされた関数pを使って表示するようにします。
あとは、ご存知loggingモジュールはスレッドセーフなので、
loggingを使うのがもっとよい方法です。

ここまでを統合して、Workerクラスを書くとすれば、
このあたりでしょうか。
  1. class Worker:  
  2.     def __init__(self, num):  
  3.         self.q = q = Queue()  
  4.         def loop():  
  5.             while 1:  
  6.                 func, args, kwargs = q.get()  
  7.                 try:  
  8.                     func(*args, **kwargs)  
  9.                 except Exception, e:  
  10.                     logging.exception(e)  
  11.                 finally:  
  12.                     q.task_done()  
  13.         for i in range(num):  
  14.             w = Thread(target=loop)  
  15.             w.daemon = True  
  16.             w.start()  
  17.     def put(self, func, *args, **kwargs):  
  18.         logging.debug(e)  
  19.         self.q.put((func, args, kwargs))  
  20.     def join(self):  
  21.         self.q.join()  
  22.   
  23. workers = Worker(THREAD_MAX)  

workersになにかputして作業が始まったらjoinで終了を待ってください。

これも大きな欠点があります。
Ctrl+Cで終了させることができません。

さてと、最後にeventletバージョン
  1. from eventlet import coros, httpc, util  
  2. util.wrap_socket_with_coroutine_socket()  
  3. pages = coros.CoroutinePool(max_size=THREAD_MAX)  
  4. def download(url):  
  5.     a = httpc.get(url)  
  6.     # aにデータ入ってます。  
  7. pages.execute(alljpg, n, depth - 1)  
  8. pages.wait_all()  


ごめん、最後は面倒なんで手抜き。

0 件のコメント: