前回で問題としてあがったこと。
1. タプル
2. 出力の乱れ
まず、q.put((download, urljoin(url, a[-1].get('href'))))を書き換えます。
今は引数ひとつしか受け入れられないので、
q.put(func, args, kwargs)
と
func, args, kwargs = q.get()
になればいいですね。
- q = Queue()
- def worker(func):
- def inner(*args, **kwargs):
- print func, args, kwargs
- q.put((func, args, kwargs))
- return inner
- @worker
- def download(url):
- print 'down '+url
- urllib.urlretrieve(url, url[url.rfind('/')+1:])
- @worker
- def parse(url):
- print 'parse '+url
- soup = BeautifulSoup(urllib.urlopen(url).read())
- a = soup('a', href=re.compile(r'^/images/pub/\d+/\w+\.jpg$'))
- download(urljoin(url, a[-1].get('href')))
- @worker
- def images(url):
- print 'image '+url
- soup = BeautifulSoup(urllib.urlopen(url).read())
- a = soup('a', {'class':'image_a'}, href=re.compile(r'^/backgrounds/\d+$'))
- map(lambda a:parse(urljoin(url, a.get('href'))), a[:4])
- def pages(url):
- print 'pages '+url
- soup = BeautifulSoup(urllib.urlopen(url).read())
- a = soup('a', href=re.compile(r'^/desktop/\w+\.php$'))
- map(lambda a:images(urljoin(url, a.get('href'))), a[:4])
- if __name__ == '__main__':
- def loop():
- while 1:
- func, args, kwargs = q.get()
- try:
- func(*args, **kwargs)
- except Exception, e:
- print e
- finally:
- q.task_done()
- for i in range(THREAD_MAX):
- w = Thread(target=loop)
- w.daemon = True
- w.start()
- pages(sys.argv[1])
- q.join()
なんとなく思いついたので、デコレータにしてみました。
これなら、呼び出しがシングルスレッドと同じになります。
引数も可変です。
読みやすいですね。
欠点はシングルスレッドと同じと言うことでしょう。
似たようなものは似た書き方をすべきではないかもしれません。
表示の乱れに関しては、
- from threading import Lock
- lock = Lock()
- def p(s):
- lock.acquire()
- try:
- print s
- finally:
- lock.release()
ロックされた関数pを使って表示するようにします。
あとは、ご存知loggingモジュールはスレッドセーフなので、
loggingを使うのがもっとよい方法です。
ここまでを統合して、Workerクラスを書くとすれば、
このあたりでしょうか。
- class Worker:
- def __init__(self, num):
- self.q = q = Queue()
- def loop():
- while 1:
- func, args, kwargs = q.get()
- try:
- func(*args, **kwargs)
- except Exception, e:
- logging.exception(e)
- finally:
- q.task_done()
- for i in range(num):
- w = Thread(target=loop)
- w.daemon = True
- w.start()
- def put(self, func, *args, **kwargs):
- logging.debug(e)
- self.q.put((func, args, kwargs))
- def join(self):
- self.q.join()
- workers = Worker(THREAD_MAX)
workersになにかputして作業が始まったらjoinで終了を待ってください。
これも大きな欠点があります。
Ctrl+Cで終了させることができません。
さてと、最後にeventletバージョン
- from eventlet import coros, httpc, util
- util.wrap_socket_with_coroutine_socket()
- pages = coros.CoroutinePool(max_size=THREAD_MAX)
- def download(url):
- a = httpc.get(url)
- # aにデータ入ってます。
- pages.execute(alljpg, n, depth - 1)
- pages.wait_all()
ごめん、最後は面倒なんで手抜き。
0 件のコメント:
コメントを投稿