2009年1月23日金曜日

download mass images 3

download mass images 2のつづき

前回で問題としてあがったこと。
1. タプル
2. 出力の乱れ

まず、q.put((download, urljoin(url, a[-1].get('href'))))を書き換えます。
今は引数ひとつしか受け入れられないので、
q.put(func, args, kwargs)

func, args, kwargs = q.get()
になればいいですね。


q = Queue()
def worker(func):
def inner(*args, **kwargs):
print func, args, kwargs
q.put((func, args, kwargs))
return inner

@worker
def download(url):
print 'down '+url
urllib.urlretrieve(url, url[url.rfind('/')+1:])

@worker
def parse(url):
print 'parse '+url
soup = BeautifulSoup(urllib.urlopen(url).read())
a = soup('a', href=re.compile(r'^/images/pub/\d+/\w+\.jpg$'))
download(urljoin(url, a[-1].get('href')))

@worker
def images(url):
print 'image '+url
soup = BeautifulSoup(urllib.urlopen(url).read())
a = soup('a', {'class':'image_a'}, href=re.compile(r'^/backgrounds/\d+$'))
map(lambda a:parse(urljoin(url, a.get('href'))), a[:4])

def pages(url):
print 'pages '+url
soup = BeautifulSoup(urllib.urlopen(url).read())
a = soup('a', href=re.compile(r'^/desktop/\w+\.php$'))
map(lambda a:images(urljoin(url, a.get('href'))), a[:4])

if __name__ == '__main__':
def loop():
while 1:
func, args, kwargs = q.get()
try:
func(*args, **kwargs)
except Exception, e:
print e
finally:
q.task_done()
for i in range(THREAD_MAX):
w = Thread(target=loop)
w.daemon = True
w.start()
pages(sys.argv[1])
q.join()

なんとなく思いついたので、デコレータにしてみました。
これなら、呼び出しがシングルスレッドと同じになります。
引数も可変です。
読みやすいですね。
欠点はシングルスレッドと同じと言うことでしょう。
似たようなものは似た書き方をすべきではないかもしれません。

表示の乱れに関しては、

from threading import Lock
lock = Lock()
def p(s):
lock.acquire()
try:
print s
finally:
lock.release()

ロックされた関数pを使って表示するようにします。
あとは、ご存知loggingモジュールはスレッドセーフなので、
loggingを使うのがもっとよい方法です。

ここまでを統合して、Workerクラスを書くとすれば、
このあたりでしょうか。

class Worker:
def __init__(self, num):
self.q = q = Queue()
def loop():
while 1:
func, args, kwargs = q.get()
try:
func(*args, **kwargs)
except Exception, e:
logging.exception(e)
finally:
q.task_done()
for i in range(num):
w = Thread(target=loop)
w.daemon = True
w.start()
def put(self, func, *args, **kwargs):
logging.debug(e)
self.q.put((func, args, kwargs))
def join(self):
self.q.join()

workers = Worker(THREAD_MAX)

workersになにかputして作業が始まったらjoinで終了を待ってください。

これも大きな欠点があります。
Ctrl+Cで終了させることができません。

さてと、最後にeventletバージョン

from eventlet import coros, httpc, util
util.wrap_socket_with_coroutine_socket()
pages = coros.CoroutinePool(max_size=THREAD_MAX)
def download(url):
a = httpc.get(url)
# aにデータ入ってます。
pages.execute(alljpg, n, depth - 1)
pages.wait_all()



ごめん、最後は面倒なんで手抜き。

0 件のコメント: