Domanda

Sto provando con il modulo Asyncio, tuttavia ho bisogno di un suggerimento / suggerimento come recuperare le e-mail di grandi dimensioni in un modo Async.

Ho una lista con nomi utente e password per gli account di posta.

data = [
    {'usern': 'foo@bar.de', 'passw': 'x'},
    {'usern': 'foo2@bar.de', 'passw': 'y'},
    {'usern': 'foo3@bar.de', 'passw': 'z'} (...)
]
.

Ho pensato a:

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([get_attachment(d) for d in data]))
loop.close()
.

Tuttavia, la parte lunga è scaricare gli allegati e-mail.

Email:

@asyncio.coroutine
def get_attachment(d):
    username = d['usern']
    password = d['passw']

    connection = imaplib.IMAP4_SSL('imap.bar.de')
    connection.login(username, password)
    connection.select()

    # list all available mails
    typ, data = connection.search(None, 'ALL')

    for num in data[0].split():
        # fetching each mail
        typ, data = connection.fetch(num, '(RFC822)')
        raw_string = data[0][1].decode('utf-8')
        msg = email.message_from_string(raw_string)
        for part in msg.walk():
            if part.get_content_maintype() == 'multipart':
                continue

            if part.get('Content-Disposition') is None:
                continue

            if part.get_filename():
                body = part.get_payload(decode=True)
                # do something with the body, async?

    connection.close()
    connection.logout()
.

Come posso elaborare tutti (allegati di download) Mail in un modo Async?

È stato utile?

Soluzione

Se non si dispone di una libreria Imap basata su I / O asincrona, è possibile utilizzare un concurrent.futures.ThreadPoolExecutor per eseguire l'I / O in thread. Python rilascerà il Gil durante l'I / O, quindi avrai una vera concorrenza:

def init_connection(d):    
    username = d['usern']
    password = d['passw']

    connection = imaplib.IMAP4_SSL('imap.bar.de')
    connection.login(username, password)
    connection.select()
    return connection

local = threading.local() # We use this to get a different connection per thread
def do_fetch(num, d, rfc):
    try:
        connection = local.connection
    except AttributeError:
        connnection = local.connection = init_connection(d)
    return connnection.fetch(num, rfc)

@asyncio.coroutine
def get_attachment(d, pool):
    connection = init_connection(d)    
    # list all available mails
    typ, data = connection.search(None, 'ALL')

    # Kick off asynchronous tasks for all the fetches
    loop = asyncio.get_event_loop()
    futs = [asyncio.async(loop.run_in_executor(pool, do_fetch, num, d, '(RFC822)'))
                for num in data[0].split()]

    # Process each fetch as it completes
    for fut in asyncio.as_completed(futs):
        typ, data = yield from fut
        raw_string = data[0][1].decode('utf-8')
        msg = email.message_from_string(raw_string)
        for part in msg.walk():
            if part.get_content_maintype() == 'multipart':
                continue

            if part.get('Content-Disposition') is None:
                continue

            if part.get_filename():
                body = part.get_payload(decode=True)
                # do something with the body, async?

    connection.close()
    connection.logout()    


loop = asyncio.get_event_loop()
pool = ThreadPoolExecutor(max_workers=5)  # You can probably increase max_workers, because the threads are almost exclusively doing I/O.
loop.run_until_complete(asyncio.wait([get_attachment(d, pool) for d in data]))
loop.close()
.

Questo non è bello come una soluzione basata su I / O davvero asincrona, perché hai ancora il sovraccarico di creare i fili, che limita la scalabilità e aggiunge un sovraccarico di memoria extra. Ottieni anche alcuni Gil Slowdown a causa di tutto il codice che avvolge le chiamate I / O effettivi. Tuttavia, se hai a che fare con meno di migliaia di posta, dovrebbe ancora eseguire OK.

Utilizziamo run_in_executor per utilizzare il ThreadPoolExecutor come parte del loop evento Asyncio, asyncio.async a Avvolgere l'oggetto Coroutine restituito in un asyncio.Future e as_completed per iterando attraverso i futures nell'ordine che completano.

Modifica :

Sembra che imaplib non sia al sicuro. Ho modificato la mia risposta all'utilizzo di archiviazione filo-locale tramite threading.local < / a>, che ci consente di creare un oggetto di connessione per thread per thread, che può essere riutilizzato per l'intera durata del thread (il che significa che si crea solo oggetti di connessione num_workers, piuttosto che una nuova connessione per ogni fetch). .

Altri suggerimenti

Ho avuto le stesse esigenze: recupero di e-mail con Python 3 completamente asinc.Se gli altri qui sono interessati, ho spinto un Asyncio imap lib qui: https://github.com/bamthomas/aioimaraplib 4./a>

Puoi usarlo come questo:

import asyncio
from aioimaplib import aioimaplib

@asyncio.coroutine
def wait_for_new_message(host, user, password):
    imap_client = aioimaplib.IMAP4(host=host)
    yield from imap_client.wait_hello_from_server()

    yield from imap_client.login(user, password)
    yield from imap_client.select()

    asyncio.async(imap_client.idle())
    id = 0
    while True:
        msg = yield from imap_client.wait_server_push()
        print('--> received from server: %s' % msg)
        if 'EXISTS' in msg:
            id = msg.split()[0]
            imap_client.idle_done()
            break

    result, data = yield from imap_client.fetch(id, '(RFC822)')
    email_message = email.message_from_bytes(data[0])

    attachments = []
    body = ''
    for part in email_message.walk():
        if part.get_content_maintype() == 'multipart':
            continue
        if part.get_content_maintype() == 'text' and 'attachment' not in part.get('Content-Disposition', ''):
            body = part.get_payload(decode=True).decode(part.get_param('charset', 'ascii')).strip()
        else:
            attachments.append(
                {'type': part.get_content_type(), 'filename': part.get_filename(), 'size': len(part.as_bytes())})

    print('attachments : %s' % attachments)
    print('body : %s' % body)
    yield from imap_client.logout()



if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(wait_for_new_message('my.imap.server', 'user', 'pass'))
.

Le e-mail di grandi dimensioni con allegati vengono scaricate con Asyncio.

Autorizzato sotto: CC-BY-SA insieme a attribuzione
Non affiliato a StackOverflow
scroll top