Asyncio IMAP Fetch Mails Python3
-
21-12-2019 - |
Domanda
Sto provando con il modulo Asyncio, tuttavia ho bisogno di un suggerimento / suggerimento come recuperare le e-mail di grandi dimensioni in un modo Async.
Ho una lista con nomi utente e password per gli account di posta.
data = [
{'usern': 'foo@bar.de', 'passw': 'x'},
{'usern': 'foo2@bar.de', 'passw': 'y'},
{'usern': 'foo3@bar.de', 'passw': 'z'} (...)
]
.
Ho pensato a:
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([get_attachment(d) for d in data]))
loop.close()
.
Tuttavia, la parte lunga è scaricare gli allegati e-mail.
Email:
@asyncio.coroutine
def get_attachment(d):
username = d['usern']
password = d['passw']
connection = imaplib.IMAP4_SSL('imap.bar.de')
connection.login(username, password)
connection.select()
# list all available mails
typ, data = connection.search(None, 'ALL')
for num in data[0].split():
# fetching each mail
typ, data = connection.fetch(num, '(RFC822)')
raw_string = data[0][1].decode('utf-8')
msg = email.message_from_string(raw_string)
for part in msg.walk():
if part.get_content_maintype() == 'multipart':
continue
if part.get('Content-Disposition') is None:
continue
if part.get_filename():
body = part.get_payload(decode=True)
# do something with the body, async?
connection.close()
connection.logout()
.
Come posso elaborare tutti (allegati di download) Mail in un modo Async?
Soluzione
Se non si dispone di una libreria Imap basata su I / O asincrona, è possibile utilizzare un concurrent.futures.ThreadPoolExecutor
per eseguire l'I / O in thread. Python rilascerà il Gil durante l'I / O, quindi avrai una vera concorrenza:
def init_connection(d):
username = d['usern']
password = d['passw']
connection = imaplib.IMAP4_SSL('imap.bar.de')
connection.login(username, password)
connection.select()
return connection
local = threading.local() # We use this to get a different connection per thread
def do_fetch(num, d, rfc):
try:
connection = local.connection
except AttributeError:
connnection = local.connection = init_connection(d)
return connnection.fetch(num, rfc)
@asyncio.coroutine
def get_attachment(d, pool):
connection = init_connection(d)
# list all available mails
typ, data = connection.search(None, 'ALL')
# Kick off asynchronous tasks for all the fetches
loop = asyncio.get_event_loop()
futs = [asyncio.async(loop.run_in_executor(pool, do_fetch, num, d, '(RFC822)'))
for num in data[0].split()]
# Process each fetch as it completes
for fut in asyncio.as_completed(futs):
typ, data = yield from fut
raw_string = data[0][1].decode('utf-8')
msg = email.message_from_string(raw_string)
for part in msg.walk():
if part.get_content_maintype() == 'multipart':
continue
if part.get('Content-Disposition') is None:
continue
if part.get_filename():
body = part.get_payload(decode=True)
# do something with the body, async?
connection.close()
connection.logout()
loop = asyncio.get_event_loop()
pool = ThreadPoolExecutor(max_workers=5) # You can probably increase max_workers, because the threads are almost exclusively doing I/O.
loop.run_until_complete(asyncio.wait([get_attachment(d, pool) for d in data]))
loop.close()
.
Questo non è bello come una soluzione basata su I / O davvero asincrona, perché hai ancora il sovraccarico di creare i fili, che limita la scalabilità e aggiunge un sovraccarico di memoria extra. Ottieni anche alcuni Gil Slowdown a causa di tutto il codice che avvolge le chiamate I / O effettivi. Tuttavia, se hai a che fare con meno di migliaia di posta, dovrebbe ancora eseguire OK.
Utilizziamo run_in_executor
per utilizzare il ThreadPoolExecutor
come parte del loop evento Asyncio, asyncio.async
a Avvolgere l'oggetto Coroutine restituito in un asyncio.Future
e as_completed
per iterando attraverso i futures nell'ordine che completano.
Modifica :
Sembra che imaplib
non sia al sicuro. Ho modificato la mia risposta all'utilizzo di archiviazione filo-locale tramite threading.local
< / a>, che ci consente di creare un oggetto di connessione per thread per thread, che può essere riutilizzato per l'intera durata del thread (il che significa che si crea solo oggetti di connessione num_workers
, piuttosto che una nuova connessione per ogni fetch
). .
Altri suggerimenti
Ho avuto le stesse esigenze: recupero di e-mail con Python 3 completamente asinc.Se gli altri qui sono interessati, ho spinto un Asyncio imap lib qui: https://github.com/bamthomas/aioimaraplib 4./a>
import asyncio
from aioimaplib import aioimaplib
@asyncio.coroutine
def wait_for_new_message(host, user, password):
imap_client = aioimaplib.IMAP4(host=host)
yield from imap_client.wait_hello_from_server()
yield from imap_client.login(user, password)
yield from imap_client.select()
asyncio.async(imap_client.idle())
id = 0
while True:
msg = yield from imap_client.wait_server_push()
print('--> received from server: %s' % msg)
if 'EXISTS' in msg:
id = msg.split()[0]
imap_client.idle_done()
break
result, data = yield from imap_client.fetch(id, '(RFC822)')
email_message = email.message_from_bytes(data[0])
attachments = []
body = ''
for part in email_message.walk():
if part.get_content_maintype() == 'multipart':
continue
if part.get_content_maintype() == 'text' and 'attachment' not in part.get('Content-Disposition', ''):
body = part.get_payload(decode=True).decode(part.get_param('charset', 'ascii')).strip()
else:
attachments.append(
{'type': part.get_content_type(), 'filename': part.get_filename(), 'size': len(part.as_bytes())})
print('attachments : %s' % attachments)
print('body : %s' % body)
yield from imap_client.logout()
if __name__ == '__main__':
loop = asyncio.get_event_loop()
loop.run_until_complete(wait_for_new_message('my.imap.server', 'user', 'pass'))
.
Le e-mail di grandi dimensioni con allegati vengono scaricate con Asyncio.