Question

Je suis en essais avec le module asyncio, cependant j'ai besoin d'un conseil / suggesstion comment extraire les grands e-mails dans un async façon.

J'ai une liste avec des noms d'utilisateur et mots de passe pour les comptes de messagerie.

data = [
    {'usern': 'foo@bar.de', 'passw': 'x'},
    {'usern': 'foo2@bar.de', 'passw': 'y'},
    {'usern': 'foo3@bar.de', 'passw': 'z'} (...)
]

J'ai pensé à:

loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait([get_attachment(d) for d in data]))
loop.close()

Toutefois, le long de la partie est de télécharger les pièces jointes.

E-mail:

@asyncio.coroutine
def get_attachment(d):
    username = d['usern']
    password = d['passw']

    connection = imaplib.IMAP4_SSL('imap.bar.de')
    connection.login(username, password)
    connection.select()

    # list all available mails
    typ, data = connection.search(None, 'ALL')

    for num in data[0].split():
        # fetching each mail
        typ, data = connection.fetch(num, '(RFC822)')
        raw_string = data[0][1].decode('utf-8')
        msg = email.message_from_string(raw_string)
        for part in msg.walk():
            if part.get_content_maintype() == 'multipart':
                continue

            if part.get('Content-Disposition') is None:
                continue

            if part.get_filename():
                body = part.get_payload(decode=True)
                # do something with the body, async?

    connection.close()
    connection.logout()

Comment pourrais-je traiter tous (téléchargement des pièces jointes) mails dans un async façon?

Était-ce utile?

La solution

Si vous n'avez pas d'I/O asynchrone basée sur imap bibliothèque, vous pouvez simplement utiliser un concurrent.futures.ThreadPoolExecutor pour faire le I/O dans les threads.Python release le GIL au cours de la I/O, de sorte que vous obtenez une vraie concurrence:

def init_connection(d):    
    username = d['usern']
    password = d['passw']

    connection = imaplib.IMAP4_SSL('imap.bar.de')
    connection.login(username, password)
    connection.select()
    return connection

local = threading.local() # We use this to get a different connection per thread
def do_fetch(num, d, rfc):
    try:
        connection = local.connection
    except AttributeError:
        connnection = local.connection = init_connection(d)
    return connnection.fetch(num, rfc)

@asyncio.coroutine
def get_attachment(d, pool):
    connection = init_connection(d)    
    # list all available mails
    typ, data = connection.search(None, 'ALL')

    # Kick off asynchronous tasks for all the fetches
    loop = asyncio.get_event_loop()
    futs = [asyncio.async(loop.run_in_executor(pool, do_fetch, num, d, '(RFC822)'))
                for num in data[0].split()]

    # Process each fetch as it completes
    for fut in asyncio.as_completed(futs):
        typ, data = yield from fut
        raw_string = data[0][1].decode('utf-8')
        msg = email.message_from_string(raw_string)
        for part in msg.walk():
            if part.get_content_maintype() == 'multipart':
                continue

            if part.get('Content-Disposition') is None:
                continue

            if part.get_filename():
                body = part.get_payload(decode=True)
                # do something with the body, async?

    connection.close()
    connection.logout()    


loop = asyncio.get_event_loop()
pool = ThreadPoolExecutor(max_workers=5)  # You can probably increase max_workers, because the threads are almost exclusively doing I/O.
loop.run_until_complete(asyncio.wait([get_attachment(d, pool) for d in data]))
loop.close()

Ce n'est pas tout à fait aussi agréable qu'un véritable asynchronous I/O à base de solution, parce que vous avez encore la surcharge de créer les threads, ce qui limite l'évolutivité et ajoute plus de surcharge de la mémoire.Vous aussi vous faire obtenir certains GIL ralentissement en raison de tout le code d'emballage les e/S réelles des appels.Encore, si vous avez affaire à moins que des milliers de mails, il doit encore effectuer ok.

Nous utilisons run_in_executor l'utilisation de la ThreadPoolExecutor dans le cadre de la asyncio boucle d'événements, asyncio.async enveloppez la coroutine objet renvoyé en asyncio.Future, et as_completed pour itérer sur les contrats à terme dans l'ordre qu'ils accomplissent.

Modifier:

Il semble imaplib n'est pas thread-safe.J'ai édité ma réponse à l'utilisation de la thread-local de stockage par l'intermédiaire d' threading.local, ce qui nous permet de créer un objet de connexion par fil, qui peut être ré-utilisé pour toute la durée de vie du fil (ce qui signifie que vous créez num_workers les objets de connexion, plutôt que d'une nouvelle connexion pour chaque fetch).

Autres conseils

J'ai eu les mêmes besoins: récupérer des courriels avec Python 3 entièrement asynchronisé.Si d'autres ici sont intéressés, j'ai poussé une asyncio imap lib ici: https://github.com/bamthomas/aioimaplib

Vous pouvez l'utiliser comme ceci:

import asyncio
from aioimaplib import aioimaplib

@asyncio.coroutine
def wait_for_new_message(host, user, password):
    imap_client = aioimaplib.IMAP4(host=host)
    yield from imap_client.wait_hello_from_server()

    yield from imap_client.login(user, password)
    yield from imap_client.select()

    asyncio.async(imap_client.idle())
    id = 0
    while True:
        msg = yield from imap_client.wait_server_push()
        print('--> received from server: %s' % msg)
        if 'EXISTS' in msg:
            id = msg.split()[0]
            imap_client.idle_done()
            break

    result, data = yield from imap_client.fetch(id, '(RFC822)')
    email_message = email.message_from_bytes(data[0])

    attachments = []
    body = ''
    for part in email_message.walk():
        if part.get_content_maintype() == 'multipart':
            continue
        if part.get_content_maintype() == 'text' and 'attachment' not in part.get('Content-Disposition', ''):
            body = part.get_payload(decode=True).decode(part.get_param('charset', 'ascii')).strip()
        else:
            attachments.append(
                {'type': part.get_content_type(), 'filename': part.get_filename(), 'size': len(part.as_bytes())})

    print('attachments : %s' % attachments)
    print('body : %s' % body)
    yield from imap_client.logout()



if __name__ == '__main__':
    loop = asyncio.get_event_loop()
    loop.run_until_complete(wait_for_new_message('my.imap.server', 'user', 'pass'))

Les grands courriels avec des pièces jointes sont également téléchargés avec Asyncio.

Licencié sous: CC-BY-SA avec attribution
Non affilié à StackOverflow
scroll top