asyncio - I/O Asincrono, Ciclo di Eventi e Strumenti per la Gestione della Concorrenza

Scopo: Una infrastruttura per I/O asincrono e concorrenza

Il modulo asyncio fornisce strumenti per costruire applicazioni concorrenti usando coroutine. Mentre il modulo threading implementa la concorrenza attraverso thread di applicazione e multiprocessing implementa la concorrenza usando processi di sistema, asyncio usa un approccio a thread singolo e processo singolo nel quale parti di un'applicazione cooperano per commutare compiti esplicitamente con tempistiche ottimali. Molto spesso questa commutazione di contesto accade quando il programma sarebbe altrimenti bloccato in attesa di leggere o scrivere dati, ma asyncio include anche il supporto per pianificare l'esecuzione di codice a una specifico spazio temporale futuro, per abilitare una coroutine ad attendere che un'altra si completi, per gestire segnali di sistema e per riconoscere altri eventi che possano costituire una ragione per un'applicazione per modificare quello su cui sta lavorando.

Concetti di Concorrenza Asincrona

La maggior parte dei programmi che usano altri modelli di concorrenza sono scritti linearmente, e fanno affidamento sulla gestione del thread o del processo del linguaggio in fase di esecuzione o del sistema operativo per cambiare contesto quando appropriato. Una applicazione basata su asyncio richiede che il suo codice gestisca esplicitamente i cambi di contesto, e usi tecniche per fare questo correttamente dipende dalla comprensione di parecchi concetti interdipendenti.

L'infrastruttura fornita da asyncio è centrata su di un ciclo di eventi (event loop), un oggetto di prima classe responsabile per la gestione efficiente degli eventi I/O, eventi di sistema e cambiamenti di contesto di applicazioni. Sono fornite parecchie implementazioni del ciclo per trarre vantaggio con efficienza della capacità del sistema operativo. Mentre una impostazione predefinita ragionevole è di solito selezionata automaticamente, è anche possibile scegliere una implementazione particolare del ciclo di eventi all'interno dell'applicazione. Questo è utile sotto Windows, ad esempio, dove alcune classi di ciclo aggiungono supporto per processi esterni in un modo nel quale ne potrebbe beneficiare un ambiente I/O di rete.

Una applicazione interagisce con il ciclo di eventi in modo esplicito registrando il codice da eseguirsi, e lascia che il ciclo di eventi faccia le chiamate necessarie all'interno del codice dell'applicazione quando le risorse sono disponibili. Ad esempio un server di rete apre dei socket, quindi li registra per potere essere notificato quando su di essi si manifestano degli eventi di input. Il ciclo di eventi allerta il codice del server quando vi è una connessione in arrivo o quando ci sono dati da leggere. Ci si attende che il codice dell'applicazione ceda il controllo nuovamente dopo un breve periodo di tempo quando non c'è più lavoro da fare nel contesto corrente. Ad esempio se non ci sono più dati da leggere da un socket il server dovrebbe riaffidare il controllo al ciclo di eventi.

Il meccanismo per restituire il controllo al ciclo di eventi dipende dalle coroutine di Python, esse sono funzioni speciali per restituire il controllo al chiamante senza perdere il proprio stato. Le coroutine sono simili alle funzioni generatore, a in effetti si possono usare generatori per implementare le coroutine in versioni di Python inferiori alla 3.5 senza il supporto nativo degli oggetti di coroutine. asyncio fornisce anche uno strato di astrazione basato su classi per protocolli e trasporti per scrivere codice usando callback invece di scrivere direttamente coroutine. In entrambi i modelli basati su classi e modelli di coroutine modificando esplicitamente il contesto ritornando nel ciclo di eventi equivale a una implicita implementazione dei threading di Python per il cambio di contesto.

Un future è una struttura dati che rappresenta il risultato di un lavoro che non è ancora stato completato. Il ciclo di eventi può monitorare un oggetto Future per vedere quando viene impostato come ultimato, consentendo a una parte di applicazione di attendere che un'altra parte finisca un lavoro. A parte i future, asyncio fornisce altri primitivi di concorrenzialità come i bloccaggi (locks) e i semafori.

Un Task (compito) è una sottoclasse di Future che sa come impacchettare e gestire l'esecuzione per una coroutine. I task possono essere pianificati con un ciclo di eventi per essere eseguiti quando le risorse a essi necessarie sono disponibili, e per produrre un risultato che può essere consumato da altre coroutine.

Multitasking Cooperativo con Coroutine

Le coroutine sono un costrutto di linguaggio progettato per operazioni concorrenti. Una funzione coroutine crea un oggetto coroutine quando chiamata, poi il chiamante può eseguire il codice della funzione usando il metodo della coroutine send(). Una coroutine può mettere in pausa l'esecuzione usando la parola chiave await con un'altra coroutine. Mentre è il pausa, lo stato della coroutine viene mantenuto, in modo che possa essere ripreso dove era stato lasciato la prossima volta che viene chiamata in causa.

Fare Partire una Coroutine

Ci sono alcuni modi diversi per fare in modo che un ciclo di eventi asyncio faccia partire una coroutine. Quello più semplice è usare run_until_complete(), passandogli la coroutine direttamente.

# asyncio_coroutine.py

import asyncio


async def coroutine():
    print('nella coroutine')


event_loop = asyncio.get_event_loop()
try:
    print('coroutine in partenza')
    coro = coroutine()
    print('entrata nel ciclo di eventi')
    event_loop.run_until_complete(coro)
finally:
    print('chiusura del ciclo di eventi')
    event_loop.close()

Il primo passo è ottenere un riferimento al ciclo di eventi. Può essere usato il tipo di ciclo predefinito, oppure può essere istanziata una classe di ciclo specifica. In questo esempio, si usa il ciclo predefinito. Il metodo run_until_complete() fa partire il ciclo con l'oggetto coroutine e interrompe il ciclo quando la coroutine esce ritornando.

$ python3 asyncio_coroutine.py

coroutine in partenza
entrata nel ciclo di eventi
nella coroutine
chiusura del ciclo di eventi
Ritornare Valori da Coroutine

Il valore di ritorno di una coroutine viene passato al codice che la fa partire e lo attende.

# asyncio_coroutine_return.py

import asyncio


async def coroutine():
    print('nella coroutine')
    return 'risultato'


event_loop = asyncio.get_event_loop()
try:
    return_value = event_loop.run_until_complete(
        coroutine()
    )
    print('ritornato: {!r}'.format(return_value))
finally:
    event_loop.close()

In questo caso run_until_complete ritorna anche il risultato che la coroutine sta attendendo.

$ python3 asyncio_coroutine_return.py

nella coroutine
ritornato: 'risultato'
Concatenare Coroutine

Una coroutine può far partire un'altra coroutine e attenderne il risultato. Questo facilita la suddivisione di un compito in parti riutilizzabili. L'esempio seguente ha due fasi che devono essere eseguite in ordine, ma che possono essere eseguite concorrenzialmente con altre operazioni.

# asyncio_coroutine_chain.py

import asyncio


async def outer():
    print('all\'esterno')
    print('in attesa di result1')
    result1 = await phase1()
    print('in attesa di result2')
    result2 = await phase2(result1)
    return (result1, result2)


async def phase1():
    print('in phase1')
    return 'result1'


async def phase2(arg):
    print('in phase2')
    return 'result2 deriva da {}'.format(arg)


event_loop = asyncio.get_event_loop()
try:
    return_value = event_loop.run_until_complete(outer())
    print('valore ritornato: {!r}'.format(return_value))
finally:
    event_loop.close()

La parola chiave await viene usata invece di aggiungere le nuove coroutine al ciclo, poichè il flusso di controllo è già all'interno di una coroutine che è gestita dal ciclo non è necessario dire al ciclo di gestire le nuove coroutine.

$ python3 asyncio_coroutine_chain.py

all'esterno
in attesa di result1
in phase1
in attesa di result2
in phase2
valore ritornato: ('result1', 'result2 deriva da result1')
Generatori Invece di Coroutine

Le funzioni coroutine sono una componente chiave della progettazione di asyncio. Esse forniscono un costrutto di linguaggio per interrompere l'esecuzione di parti di un programma, preservando lo stato di quella chiamata, e rientrando in quello stato successivamente, tutte importanti capacità per una infrastruttura di concorrenzialità.

Python 3.5 ha introdotto nuove caratteristiche di linguaggio per definire nativamente dette coroutine usando async def e per cedere il controllo usando await. Gli esempi per asyncio traggono vantaggio della nuova caratteristica. Versioni precedenti di Python 3 possono usare funzioni generatore impacchettate con il decoratore asyncio.coroutine() e yield from per ottenere lo stesso effetto.

# asyncio_generator.py

import asyncio


@asyncio.coroutine
def outer():
    print('all\'esterno')
    print('in attesa di result1')
    result1 = yield from phase1()
    print('in attesa di result2')
    result2 = yield from phase2(result1)
    return (result1, result2)


@asyncio.coroutine
def phase1():
    print('in phase1')
    return 'result1'


@asyncio.coroutine
def phase2(arg):
    print('in phase2')
    return 'result2 derived from {}'.format(arg)


event_loop = asyncio.get_event_loop()
try:
    return_value = event_loop.run_until_complete(outer())
    print('valore ritornato: {!r}'.format(return_value))
finally:
    event_loop.close()

L'esempio precedente riproduce asyncio_coroutine_chain.py usando funzioni generatore in luogo di coroutine native.

$ python3 asyncio_generator.py

all'esterno
in attesa di result1
in phase1
in attesa di result2
in phase2
valore ritornato: ('result1', 'result2 derived from result1')

Pianificare Chiamate a Funzioni Normali

Oltre a gestire coroutine e callback I/O, il ciclo di eventi di asyncio può pianificare chiamate a funzioni normali in base al valore di temporizzazione conservato nel ciclo.

Pianificare un Callback "Presto"

Se la tempistica del callback non importa, call_soon() può essere usato per pianificare la chiamata per la successiva iterazione del ciclo. Qualunque altro argomento posizionale dopo la funzione viene passato al callback quando viene invocato. Per passare argomenti nominali al callback si usi partial() dal modulo functools.

# asyncio_call_soon.py

import asyncio
import functools


def callback(arg, *, kwarg='default'):
    print('callback chiamato con {} e {}'.format(arg, kwarg))


async def main(loop):
    print('registrazione callbacks')
    loop.call_soon(callback, 1)
    wrapped = functools.partial(callback, kwarg='non default')
    loop.call_soon(wrapped, 2)

    await asyncio.sleep(0.1)


event_loop = asyncio.get_event_loop()
try:
    print('entrata nel ciclo di eventi')
    event_loop.run_until_complete(main(event_loop))
finally:
    print('chiusura del ciclo di eventi')
    event_loop.close()

I callback sono invocati nell'ordine nel quale sono pianificati.

$ python3 asyncio_call_soon.py

entrata nel ciclo di eventi
registrazione callbacks
callback chiamato con 1 e default
callback chiamato con 2 e non default
chiusura del ciclo di eventi
Pianificare un Callback con un Differimento

Per posporre un callback in un tempo futuro, si usi call_later(). Il primo argomento è il differimento in secondi e il secondo argomento è il callback.

# asyncio_call_later.py

import asyncio


def callback(n):
    print('callback {} invocato'.format(n))


async def main(loop):
    print('registrazione callbacks')
    loop.call_later(0.2, callback, 1)
    loop.call_later(0.1, callback, 2)
    loop.call_soon(callback, 3)

    await asyncio.sleep(0.4)


event_loop = asyncio.get_event_loop()
try:
    print('entrata nel ciclo di eventi')
    event_loop.run_until_complete(main(event_loop))
finally:
    print('chiusura del ciclo di eventi')
    event_loop.close()

In questo esempio, la stessa funzione callback viene pianificata per parecchie volte con argomenti diversi. L'istanza finale, usando call_soon() risulta nel callback invocato con argomento 3 prima di qualsiasi altra istanza pianificata, mostrando che "soon" in genere risulta in un differimento minimale.

$ python3 asyncio_call_later.py

entrata nel ciclo di eventi
registrazione callbacks
callback 3 invocato
callback 2 invocato
callback 1 invocato
chiusura del ciclo di eventi
Pianificare un Callback per un Orario Specifico

E' anche possibile pianificare una chiamata per un orario specifico. Il ciclo usa un orologio monotonico, invece che un orologio "da muro", per assicurarsi che il valore di "adesso" non regredisca mai. Per scegliere un orario per un callback pianificato, è necessario partire dallo stato interno di quell'orologio usando il metodo del ciclo time().

# asyncio_call_at.py

import asyncio
import time


def callback(n, loop):
    print('callback {} chiamato a {}'.format(n, loop.time()))


async def main(loop):
    now = loop.time()
    print('orario dell\'orologio: {}'.format(time.time()))
    print('orario del ciclo: {}'.format(now))

    print('registrazione dei callback')
    loop.call_at(now + 0.2, callback, 1, loop)
    loop.call_at(now + 0.1, callback, 2, loop)
    loop.call_soon(callback, 3, loop)

    await asyncio.sleep(1)


event_loop = asyncio.get_event_loop()
try:
    print('entrata nel ciclo di eventi')
    event_loop.run_until_complete(main(event_loop))
finally:
    print('chiusura del ciclo di eventi')
    event_loop.close()

Si noti che il tempo secondo il ciclo non corrisponde al valore ritornato da time.time().

$ python3 asyncio_call_at.py

entrata nel ciclo di eventi
orario dell'orologio: 1578425160.2366767
orario del ciclo: 9354.619629288
registrazione dei callback
callback 3 chiamato a 9354.619700045
callback 2 chiamato a 9354.719899194
callback 1 chiamato a 9354.820147853
chiusura del ciclo di eventi

Produrre Risultati In Modo Asincrono

Un Future rappresenta il risultato di un lavoro che non è ancora stato completato. Il ciclo di eventi può osservare lo stato di un oggetto Future per verificare quando questo è terminato, consentendo a una parte di applicazione di attendere che un'altra finisca una qualche attività.

# asyncio_future_event_loop.py

import asyncio


def mark_done(future, result):
    print('Impostazione del risultato future a {!r}'.format(result))
    future.set_result(result)


event_loop = asyncio.get_event_loop()
try:
    all_done = asyncio.Future()

    print('pianificazione mark_done')
    event_loop.call_soon(mark_done, all_done, 'il risultato')

    print('entrata nel ciclo di eventi')
    result = event_loop.run_until_complete(all_done)
    print('risultato restituito: {!r}'.format(result))
finally:
    print('chiusura del ciclo di eventi')
    event_loop.close()

print('risultato future: {!r}'.format(all_done.result()))

Lo stato di un Future cambia a completato quando set_result() viene chiamato e l'istanza di Future trattiene il risultato dato al metodo per recuperarlo successivamente.

$ python3 asyncio_future_event_loop.py

pianificazione mark_done
entrata nel ciclo di eventi
Impostazione del risultato future a 'il risultato'
risultato restituito: 'il risultato'
chiusura del ciclo di eventi
risultato future: 'il risultato'

Un Future può anche essere usato con la parola chiave await, come in questo esempio.

# asyncio_future_await.py

import asyncio


def mark_done(future, result):
    print('impostazione del risultato del future a {!r}'.format(result))
    future.set_result(result)


async def main(loop):
    all_done = asyncio.Future()

    print('pianificazione di mark_done')
    loop.call_soon(mark_done, all_done, 'il risultato')

    result = await all_done
    print('risultato ritornato: {!r}'.format(result))


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

Il risultato del Future viene ritornato da await, quindi è spesso possibile avere lo stesso codice che funziona sia con una coroutine normale che con una istanza di Future.

$ python3 asyncio_future_await.py

pianificazione di mark_done
impostazione del risultato del future a 'il risultato'
risultato ritornato: 'il risultato'
Callback di Future

Un Future, oltre che lavorare come una coroutine, può invocare callback quando è completato. I callback sono invocati nell'ordine nel quale sono registrati.

# asyncio_future_callback.py

import asyncio
import functools


def callback(future, n):
    print('{}: future completato: {}'.format(n, future.result()))


async def register_callbacks(all_done):
    print('registrazione dei callbacks sul future')
    all_done.add_done_callback(functools.partial(callback, n=1))
    all_done.add_done_callback(functools.partial(callback, n=2))


async def main(all_done):
    await register_callbacks(all_done)
    print('impostazione risultato del future')
    all_done.set_result('il risultato')


event_loop = asyncio.get_event_loop()
try:
    all_done = asyncio.Future()
    event_loop.run_until_complete(main(all_done))
finally:
    event_loop.close()

I callback dovrebbero attendersi un argomento, l'istanza di Future. Per passare argomenti addizionali, si usi functools.partial() per inglobarli.

$ python3 asyncio_future_callback.py

registrazione dei callbacks sul future
impostazione risultato del future
1: future completato: il risultato
2: future completato: il risultato

Eseguire Task in Concomitanza

I task (attività) costituiscono uno dei modi principali per interagire con il ciclo di eventi. I task inglobano coroutine e rilevano quando esse vengono completate. I Task sono sottoclassi di Future quindi le altre coroutine possono attenderli e ciascuno di esse ha un risultato che può essere recuperato dopo che l'attività viene completata.

Far Partire un task

Per far partire un task si usi create_task() per creare un istanza di Task. L'attività risultante verrà eseguita come parte delle operazioni concomitanti gestite dal ciclo di eventi fino a quando il ciclo è in esecuzione e la coroutine non ritorna.

# asyncio_create_task.py

import asyncio


async def task_func():
    print('in task_func')
    return 'il risultato'


async def main(loop):
    print('creazione task')
    task = loop.create_task(task_func())
    print('in attesa di {!r}'.format(task))
    return_value = await task
    print('attività completato {!r}'.format(task))
    print('valore ritornato: {!r}'.format(return_value))


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

Questo esempio attende che l'attività ritorni un risultato prima che la funzione main() esca.

$ python3 asyncio_create_task.py

creazione task
in attesa di <Task pending coro=<task_func() running at asyncio_create_task.py:6>>
in task_func
attività completato <Task finished coro=<task_func() done, defined at asyncio_create_task.py:6> result='il risultato'>
valore ritornato: 'il risultato'
Cancellare un Task

Mantenendo l'oggetto Task ritornato da create_task() è possibile cancellare l'operazione dell'attività prima che si completi.

# asyncio_cancel_task.py

import asyncio


async def task_func():
    print('in task_func')
    return 'il risultato'


async def main(loop):
    print('creazione task')
    task = loop.create_task(task_func())

    print('cancellazione task')
    task.cancel()

    print('task cancellato {!r}'.format(task))
    try:
        await task
    except asyncio.CancelledError:
        print('catturato errore dal task cancellato')
    else:
        print('task result: {!r}'.format(task.result()))


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

Questo esempio crea, quindi cancella, un task prima di far partire il ciclo di eventi. Il risultato è una eccezione CancelledError da run_until_complete().

$ python3 asyncio_cancel_task.py

creazione task
cancellazione task
task cancellato <Task cancelling coro=<task_func() running at asyncio_cancel_task.py:6>>
catturato errore dal task cancellato

Se l'attività è cancellata mentre sta attendendo un'altra operazione concomitante, viene notificata della sua cancellazione sollevando una eccezione CancelledError al punto di attesa.

# asyncio_cancel_task2.py

import asyncio


async def task_func():
    print('in task_func, in pausa')
    try:
        await asyncio.sleep(1)
    except asyncio.CancelledError:
        print('task_func è stata cancellata')
        raise
    return 'the result'


def task_canceller(t):
    print('in task_canceller')
    t.cancel()
    print('cancellazione del task')


async def main(loop):
    print('creazione task')
    task = loop.create_task(task_func())
    loop.call_soon(task_canceller, task)
    try:
        await task
    except asyncio.CancelledError:
        print('anche main() vede il task come cancellato')


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

Catturando l'eccezione si ha una opportunità di pulire il lavoro già fatto, se necessario.

$ python3 asyncio_cancel_task2.py

creazione task
in task_func, in pausa
in task_canceller
cancellazione del task
task_func è stata cancellata
anche main() vede il task come cancellato
Creare Task da Coroutine

La funzione ensure_future() ritorna un Task legato all'esecuzione di una coroutine. Quell'istanza di Task può essere quindi passata ad altro codice, il quale può attenderla senza sapere come la coroutine originale sia stata costruita o chiamata.

# asyncio_ensure_future.py

import asyncio


async def wrapped():
    print('impacchetato')
    return 'risultato'


async def inner(task):
    print('inner: in partenza')
    print('inner: in attesa di {!r}'.format(task))
    result = await task
    print('inner: task ritornato {!r}'.format(result))


async def starter():
    print('starter: creazione task')
    task = asyncio.ensure_future(wrapped())
    print('starter: in attesa di inner')
    await inner(task)
    print('starter: inner ritornato')


event_loop = asyncio.get_event_loop()
try:
    print('in entrata nel ciclo di eventi')
    result = event_loop.run_until_complete(starter())
finally:
    event_loop.close()

Si noti che la coroutine data a ensure_future() non viene fatta partire fino a quando qualcosa usa await per consentirne l'esecuzione.

$ python3 asyncio_ensure_future.py

in entrata nel ciclo di eventi
starter: creazione task
starter: in attesa di inner
inner: in partenza
inner: in attesa di <Task pending coro=<wrapped() running at asyncio_ensure_future.py:6>>
impacchetato
inner: task ritornato 'risultato'
starter: inner ritornato

Comporre coroutine con Strutture di Controllo

Il flusso di controllo lineare tra una serie di coroutine è facile da gestire con la parola chiave built-in await. E' anche possibile tramite strumenti in asyncio che strutture più complicate consentano a una coroutine di attendere che diverse altre siano completate in parallelo.

Attendere Multiple Coroutine

E' spesso utile dividere una operazione in diverse parti, quindi eseguirle separatamente. Ad esempio, per scaricare diverse risorse remote o interrogare API remote. In situazioni nelle quali l'ordine di esecuzione non importa, e dove ci potrebbe essere un arbitrario numero di operazioni, wait() può essere usato per mettere in pausa una coroutine fino a quando le altre operazioni siano completate.

# asyncio_wait.py

import asyncio


async def phase(i):
    print('in phase {}'.format(i))
    await asyncio.sleep(0.1 * i)
    print('completato per phase {}'.format(i))
    return 'phase risulta {}'.format(i)


async def main(num_phases):
    print('partenza di main')
    phases = [
        phase(i)
        for i in range(num_phases)
    ]
    print('in attesa di completamento phase')
    completed, pending = await asyncio.wait(phases)
    results = [t.result() for t in completed]
    print('risultati: {!r}'.format(results))


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(3))
finally:
    event_loop.close()

Internamente, wait() usa un set per mantenere le istanze di Task create. Ne consegue che esse sono attivate e completate in un ordine non prevedibile. Il valore di ritorno da wait() è una tupla che contiene due insiemi che racchiudono le attività finite e in corso.

$ python3 asyncio_wait.py

in partenza main
si attende 0.1 per il completamento delle funzioni phase
in phase 0
in phase 1
in phase 2
phase 0 completata
1 completate e 2 pendenti
eliminazione dei tasks
in uscita da main
phase 1 cancellata
phase 2 cancellata

Se wait() viene usato con un valore di timeout rimarranno solo le operazioni in corso.

# asyncio_wait_timeout.py

import asyncio


async def phase(i):
    print('in phase {}'.format(i))
    try:
        await asyncio.sleep(0.1 * i)
    except asyncio.CancelledError:
        print('phase {} cancellata'.format(i))
        raise
    else:
        print('phase {} completata'.format(i))
        return 'risultato {} di phase'.format(i)


async def main(num_phases):
    print('in partenza main')
    phases = [
        phase(i)
        for i in range(num_phases)
    ]
    print('si attende 0.1 per il completamento delle funzioni phase')
    completed, pending = await asyncio.wait(phases, timeout=0.1)
    print('{} completate e {} pendenti'.format(
        len(completed), len(pending),
    ))
    # Si eliminano le attività rimanenti in modoe che non  generino errori
    # quando si esce senza completarle
    if pending:
        print('eliminazione dei tasks')
        for t in pending:
            t.cancel()
    print('in uscita da main')


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(3))
finally:
    event_loop.close()

Le operazioni rimanenti dovrebbero essere cancellate oppure si dovrebbe attenderne il completamento. Lasciarle in corso mentre il ciclo di eventi continua farà sì che esse vengano eseguite in seguito, il che potrebbe essere non desiderabile se l'operazione generale viene considerata come abortita. Se si lasciano pendenti alla fine del processo verranno generati avvertimenti.

$ python3 asyncio_wait_timeout.py

in partenza main
si attende 0.1 per il completamento delle fasi
nella fase 0
nella fase 1
nella fase 2
fase 0 completata
1 completate e 2 pendenti
eliminazione dei tasks
in uscita da main
fase 1 cancellata
fase 2 cancellata
Raccogliere i Risultati dalle Coroutine

Se le fasi sottostanti sono state ben definite, e solo i risultati di queste fasi hanno importanza, allora gather() potrebbe essere più utile per attendere operazioni multiple.

# asyncio_gather.py

import asyncio


async def phase1():
    print('in phase1')
    await asyncio.sleep(2)
    print('terminata phase1')
    return 'risultato di phase1'


async def phase2():
    print('in phase2')
    await asyncio.sleep(1)
    print('terminata phase2')
    return 'risultato di phase2'


async def main():
    print('in partenza main')
    print('in attesa del completamento delle fasi')
    results = await asyncio.gather(
        phase1(),
        phase2(),
    )
    print('results: {!r}'.format(results))


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main())
finally:
    event_loop.close()

Le attività create da gather() non sono esposte, in modo che non possano essere cancellate. Il valore di ritorno è una lista di risultati nello stesso ordine degli argomenti passati a gather(), a prescindere dall'ordine delle operazioni sottostanti effettivamente completate.

$ python3 asyncio_gather.py

in partenza main
in attesa del completamento delle fasi
in phase1
in phase2
terminata phase2
terminata phase1
results: ['risultato di phase1', 'risultato di phase2']
Gestire le Operazioni Sottostanti Mentre Finiscono

as_completed() è un generatore che gestisce l'esecuzione di una lista di coroutine fornitegli e produce i loro risultati uno alla volta non appena vengono completate. Come con wait(), l'ordine non è garantito da as_completed(), ma non è necessario attendere che tutte le operazioni sottostanti siano completate prima di intraprendere altre azioni.

# asyncio_as_completed.py

import asyncio


async def phase(i):
    print('in fase {}'.format(i))
    await asyncio.sleep(0.5 - (0.1 * i))
    print('fase {} terminata'.format(i))
    return 'risultato {} fase'.format(i)


async def main(num_phases):
    print('in partenza main')
    phases = [
        phase(i)
        for i in range(num_phases)
    ]
    print('in attesa del completamento delle fasi')
    results = []
    for next_to_complete in asyncio.as_completed(phases):
        answer = await next_to_complete
        print('risposta ricevuta {!r}'.format(answer))
        results.append(answer)
    print('resultati: {!r}'.format(results))
    return results


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(3))
finally:
    event_loop.close()

Questo esempio fa partire parecchie fasi che finiscono in ordine inverso rispetto a quello di partenza. Mentre il generatore viene consumato, il ciclo attende il risultato della coroutine usando await.

$ python3 asyncio_as_completed.py

in partenza main
in attesa del completamento delle fasi
in fase 1
in fase 2
in fase 0
fase 2 terminata
risposta ricevuta 'risultato 2 fase'
fase 1 terminata
risposta ricevuta 'risultato 1 fase'
fase 0 terminata
risposta ricevuta 'risultato 0 fase'
resultati: ['risultato 2 fase', 'risultato 1 fase', 'risultato 0 fase']

Sincronizzare i Primitivi

Sebbene le applicazioni asyncio in genere vengano eseguite in un processo a singolo thread, sono comunque costruite come applicazioni concorrenti. Ogni coroutine o task potrebbero essere eseguiti in un ordine non prevedibile, in base ai differimenti e agli interrupt da I/O e altri eventi esterni. Per supportare una concorrenzialità sicura, asyncio include implementazioni di alcuni degli stessi primitivi a basso livello che si trovano nei moduli threading e multiprocessing.

Bloccaggi (Locks)

Un bloccaggio (Lock) può essere usato per sorvegliare gli accessi a una risorsa condivisa. Solo il possessore del bloccaggio può usare la risorsa. I tentativi multipli di acquisire un bloccaggio verranno fermati in modo che ci sia un solo possessore alla volta.

# asyncio_lock.py

import asyncio
import functools


def unlock(lock):
    print('callback per rilasciare il bloccaggio')
    lock.release()


async def coro1(lock):
    print('coro1 in attesa del bloccaggio')
    async with lock:
        print('coro1 ha acquisito il bloccaggio')
    print('coro1 ha rilasciato il bloccaggio')


async def coro2(lock):
    print('coro2 in attesa del bloccaggio')
    await lock.acquire()
    try:
        print('coro2 ha acquisito il bloccaggio')
    finally:
        print('coro2 ha rilasciato il bloccaggio')
        lock.release()


async def main(loop):
    # Crea e acquisisce un bloccaggio condiviso
    lock = asyncio.Lock()
    print('acquisizione del bloccaggio prima di far partire coroutine')
    await lock.acquire()
    print('bloccaggio acquisito: {}'.format(lock.locked()))

    # Pianifica un callback per sbloccare il bloccaggio
    loop.call_later(0.1, functools.partial(unlock, lock))

    # Esegue le coroutines che vogliono usare il bloccaggio
    print('in attesa di coroutines')
    await asyncio.wait([coro1(lock), coro2(lock)]),


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

Il metodo acquire per un bloccaggio può essere chiamato direttamente, usando await chiamando poi release() quando terminato (come in colo2() in questo esempio). Possono anche essere usati come gestori di contesto asincroni con le parole chiave with await, come in coro1().

$ python3 asyncio_lock.py

acquisizione del bloccaggio prima di far partire coroutine
bloccaggio acquisito: True
in attesa di coroutines
coro1 in attesa del bloccaggio
coro2 in attesa del bloccaggio
callback per rilasciare il bloccaggio
coro1 ha acquisito il bloccaggio
coro1 ha rilasciato il bloccaggio
coro2 ha acquisito il bloccaggio
coro2 ha rilasciato il bloccaggio
Eventi

Un evento asyncio.Event è basato su threading.Event, e viene usato per consentire a molteplici consumatori di attendere che succeda qualcosa senza cercare un valore specifico da associare con la notifica.

# asyncio_event.py

import asyncio
import functools


def set_event(event):
    print('impostazione evento in callback')
    event.set()


async def coro1(event):
    print('coro1 in attesa dell\'evento')
    await event.wait()
    print('coro1 attivato')


async def coro2(event):
    print('coro2 in attesa dell\'evento')
    await event.wait()
    print('coro2 attivato')


async def main(loop):
    # Crea un evento condiviso
    event = asyncio.Event()
    print('stato di partenza evento: {}'.format(event.is_set()))

    loop.call_later(
        0.1, functools.partial(set_event, event)
    )

    await asyncio.wait([coro1(event), coro2(event)])
    print('stato di fine evento: {}'.format(event.is_set()))


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

Così come con Lock, sia coro1() che coro2() attendono che un evento sia impostato. La differenza è che entrambe possono partire non appeno lo stato dell'evento cambia, e non devono acquisire un possesso esclusivo sull'evento oggetto.

$ python3 asyncio_event.py

stato di partenza evento: False
coro1 in attesa dell'evento
coro2 in attesa dell'evento
impostazione evento in callback
coro1 attivato
coro2 attivato
stato di fine evento: True
Condizioni

Una condizione (Condition) funziona in modo simile a Event eccetto che invece che notificare a tutte le coroutine in attesa, il numero di processi attivati viene controllato con un argomento per notify().

# asyncio_condition.py

import asyncio


async def consumer(condition, n):
    async with condition:
        print('consumatore {} in attesa'.format(n))
        await condition.wait()
        print('consumatore {} attivato'.format(n))
    print('chiusura consumatore {}'.format(n))


async def manipulate_condition(condition):
    print('manipulate_condition in partenza')

    # pausa per lasciare che il consumatore parta
    await asyncio.sleep(0.1)

    for i in range(1, 3):
        async with condition:
            print('notifica {} consumatori'.format(i))
            condition.notify(n=i)
        await asyncio.sleep(0.1)

    async with condition:
        print('notifica i consumatori rimanenti')
        condition.notify_all()

    print('ending manipulate_condition')


async def main(loop):
    # Crea una condizione
    condition = asyncio.Condition()

    # Imposta attività che monitorano la condizione
    consumers = [
        consumer(condition, i)
        for i in range(5)
    ]

    # Pianifica attività per manipolare la variabile della condizione
    loop.create_task(manipulate_condition(condition))

    # Attende che il consumatore abbia terminato
    await asyncio.wait(consumers)


event_loop = asyncio.get_event_loop()
try:
    result = event_loop.run_until_complete(main(event_loop))
finally:
    event_loop.close()

Questo esempio fa partire cinque consumatori di una Condition. Ognuno di essi usa il metodo wait() per attendere via notifica che può procedere. manipulate_condition() notifica un consumatore, poi due, infine i restanti.

$ python3 asyncio_condition.py

manipulate_condition in partenza
consumatore 4 in attesa
consumatore 3 in attesa
consumatore 1 in attesa
consumatore 2 in attesa
consumatore 0 in attesa
notifica 1 consumatori
consumatore 4 attivato
chiusura consumatore 4
notifica 2 consumatori
consumatore 3 attivato
chiusura consumatore 3
consumatore 1 attivato
chiusura consumatore 1
notifica i consumatori rimanenti
ending manipulate_condition
consumatore 2 attivato
chiusura consumatore 2
consumatore 0 attivato
chiusura consumatore 0
Code

asyncio.Queue fornisce una struttura dati primo-che-entra, primo-che-esce per le coroutine come se fosse un oggetto queue. Queue fa quello per i thread quello che multiprocessing.Queue fa per i processi.

# asyncio_queue.py

import asyncio


async def consumer(n, q):
    print('consumatore {}: in partenza'.format(n))
    while True:
        print('consumatore {}: in attesa di un elemento'.format(n))
        item = await q.get()
        print('consumatore {}: ha elemento {}'.format(n, item))
        if item is None:
            # None è il segnale di arresto
            q.task_done()
            break
        else:
            await asyncio.sleep(0.01 * item)
            q.task_done()
    print('consumatore {}: in chiusura'.format(n))


async def producer(q, num_workers):
    print('produttore: in partenza')
    # Aggiunge alcuni numeri alla coda per simulare dei compiti
    for i in range(num_workers * 3):
        await q.put(i)
        print('produttore: aggiunto compito {} alla coda'.format(i))
    # Aggiunge elementi None alla coda
    # per segnalare al consumatore di uscire
    print('produttore: aggiunta di segnali di arresto alla coda')
    for i in range(num_workers):
        await q.put(None)
    print('produttore: in attesa che la coda si svuoti')
    await q.join()
    print('produttore: in chiusura')


async def main(loop, num_consumers):
    # Crea la coda con dimensione fissa in modo che il produttore
    # possa bloccare fino a che il consumatore abbia estratto qualche elemento
    q = asyncio.Queue(maxsize=num_consumers)

    # Pianifica il compito del consumatore
    consumers = [
        loop.create_task(consumer(i, q))
        for i in range(num_consumers)
    ]

    # Pianifica il compito del produttore.
    prod = loop.create_task(producer(q, num_consumers))

    # Attende che tutte le coroutine finiscano
    await asyncio.wait(consumers + [prod])


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop, 2))
finally:
    event_loop.close()

Le operazioni di aggiunta o rimozione di elementi rispettivamente con put() e get() sono entrambe asincrone, visto che la dimensione della coda potrebbe essere fissa (impedendo una aggiunta) o la coda potrebbe essere vuota (bloccando una chiamata per ottenere un elemento).

$ python3 asyncio_queue.py

consumatore 0: in partenza
consumatore 0: in attesa di un elemento
consumatore 1: in partenza
consumatore 1: in attesa di un elemento
produttore: in partenza
produttore: aggiunto compito 0 alla coda
produttore: aggiunto compito 1 alla coda
consumatore 0: ha elemento 0
consumatore 1: ha elemento 1
produttore: aggiunto compito 2 alla coda
produttore: aggiunto compito 3 alla coda
consumatore 0: in attesa di un elemento
consumatore 0: ha elemento 2
produttore: aggiunto compito 4 alla coda
consumatore 1: in attesa di un elemento
consumatore 1: ha elemento 3
produttore: aggiunto compito 5 alla coda
produttore: aggiunta di segnali di arresto alla coda
consumatore 0: in attesa di un elemento
consumatore 0: ha elemento 4
consumatore 1: in attesa di un elemento
consumatore 1: ha elemento 5
produttore: in attesa che la coda si svuoti
consumatore 0: in attesa di un elemento
consumatore 0: ha elemento None
consumatore 0: in chiusura
consumatore 1: in attesa di un elemento
consumatore 1: ha elemento None
consumatore 1: in chiusura
produttore: in chiusura

Input/Output Asincrono con Astrazioni di Classe Protocollo

Fino a questo punto gli esempi hanno tutti evitato di mischiare concorrenza e operazioni I/O per focalizzarsi su un concetto alla votla. Tuttavia lo scambio di contesti quando ci sono bloccaggi I/O è uno dei casi di uso primari per asyncio. Basata su concetti di concorrenza già introdotti, questa sezione esamina due programmi di esempio che implementano un semplice server e client che ripetono quanto ricevuto, simili agli esempi usati per socket e socketserver. Un client può connettersi al server, inviare dati, quindi ricevere gli stessi dati in risposta. Ogni volta che viene iniziata una operazione I/O, il codice in esecuzione passa il controllo al ciclo di eventi, consentendo l'esecuzione di altre attività fino a che l'I/O è pronto.

Server Che Invia Quanto Ricevuto

Il server inizia importando i moduli che gli servono per impostare asyncio e logging, quindi crea un oggetto ciclo di eventi.

# asyncio_echo_server_protocol.py

import asyncio
import logging
import sys

SERVER_ADDRESS = ('localhost', 10000)

logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
)
log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()

Poi definisce una sottoclasse di asyncio.Protocol per gestire la comunicazione con il client. I metodi dell'oggetto protocollo sono chiamati in base agli eventi associati con il socket del server.

class EchoServer(asyncio.Protocol):

Ogni nuova connessione client attiva una chiamata a connection_made(). L'argomento transport è una istanza di asyncio.Transport, che fornisce una astrazione per eseguire I/O asincrono usando il socket. Diversi tipi di comunicazione forniscono diverse implementazioni del trasporto, tutte con la stessa API. Ad esempio ci sono classi di trasporto separate per lavorare con i socket e per lavorare con pipe per i sottoprocessi. L'indirizzo del client in arrivo è disponibile dal trasporto tramite get_extra_info(), un metodo specifico dell'implementazione.

    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.log = logging.getLogger(
            'EchoServer_{}_{}'.format(*self.address)
        )
        self.log.debug('connessione accetata')

Dopo che viene stabilita una connessione, quando vengono spediti dati dal client al server, viene invocato il metodo del protocollo data_received() per passare i dati da elaborare. I dati sono passati come stringa di byte, e spetta all'applicazione la decodifica nel modo appropriato. Qui il risultato viene registrato, quindi viene ritornata una risposta immediata al client tramite transport.write().

    def data_received(self, data):
        self.log.debug('ricevuto {!r}'.format(data))
        self.transport.write(data)
        self.log.debug('inviato {!r}'.format(data))

Alcuni trasporti supportano un indicatore speciale di fine file ("EOF"). Quando viene rilevato un EOF, viene chiamato il metodo eof_received(). In questa implementazione, EOF viene restituito al client per indicare che è stato ricevuto. Visto che non tutti i trasporti supportano un EOF esplicito, questo protocollo chiede prima al trasporto se sia sicuro inviare un EOF.

    def eof_received(self):
        self.log.debug('ricevuto EOF')
        if self.transport.can_write_eof():
            self.transport.write_eof()

Quando viene chiusa una connessione, sia normalmente che a causa di un errore, il metodo del protocollo connection_lost() viene chiamato. Se si era verificato un errore, l'argomento contiene un oggetto eccezione appropriato. Altrimenti è None.

    def connection_lost(self, error):
        if error:
            self.log.error('ERRORE: {}'.format(error))
        else:
            self.log.debug('chiusura')
        super().connection_lost(error)

Ci sono due passi da compiere per far partire il server. Prima l'applicazione dice al ciclo di eventi di creare un nuovo oggetto server usando il protocollo di classe, il nome host e il socket sul quale è in ascolto. Il metodo create_server() è una coroutine, quindi i risultati devono essere elaborati dal ciclo di eventi per far veramente partire il server. Il completamento della coroutine produce una istanza di asyncio.Server legata al ciclo di eventi.

# Crea il server e lasci che il ciclo finisca la <em>coroutine</em> prima di far
# partire il vero ciclo di eventi.
factory = event_loop.create_server(EchoServer, *SERVER_ADDRESS)
server = event_loop.run_until_complete(factory)
log.debug('in partenza on {} porta {}'.format(*SERVER_ADDRESS))

Il ciclo di eventi poi deve esser eseguito per elaborare eventi e gestire le richieste del client. Per un servizio che debba restare in esecuione per lungo tempo, il metodo run_forever() è il modo più semplice per farlo. Quando viene arrestato il ciclo di eventi, sia dal codice applicativo che da un segnale inviato al processo, il server può essere chiuso per pulire correttamente il socket, quindi il ciclo di eventi può essere chiuso per terminare la gestione di altre coroutine prima che il programma esca.

# Entra nel ciclo di eventi in modo permanente per gestire tutte le connessioni
try:
    event_loop.run_forever()
finally:
    log.debug('chiusura del server')
    server.close()
    event_loop.run_until_complete(server.wait_closed())
    log.debug('chiusura del ciclo di eventi')
    event_loop.close()
Client Che Riceve Quanto Inviato

La costruzione di un client è molto simile a quella di un server. Il codice inizia sempre con l'importazione dei moduli, quindi occorre impostare asyncio e logging, quindi viene creato un oggetto per la gestione del ciclo di eventi.

# asyncio_echo_client_protocol.py

import asyncio
import functools
import logging
import sys

MESSAGES = [
    b'Questo è il messaggio. ',
    b'Sarà inviato ',
    b'in parti.',
]
SERVER_ADDRESS = ('localhost', 10000)

logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
)
log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()

La classe derivata da asyncio.Protocol definisce gli stessi metodi del server, con implementazioni differenti. Il costruttore della classe accetta due argomenti, una lista dei messaggi da inviare e una istanza di Future da usare per segnalare che il client ha completato un ciclo di lavoro in quanto ha ricevuto una risposta dal server.

class EchoClient(asyncio.Protocol):

    def __init__(self, messages, future):
        super().__init__()
        self.messages = messages
        self.log = logging.getLogger('EchoClient')
        self.f = future

Quando il client si connette con successo al server, inizia immediatamente la comunicazione. La sequenza dei messaggi viene inviata una alla vota, sebbene il codice di rete sottostante possa combinare più messaggi in un'unica trasmissione. Quando tutti i messaggi sono consumati, viene inviato un EOF.

Anche se sembra che tutti i dati siano stati inviati immediatamente, in effetti l'oggetto trasportatore parcheggia i dati in uscita e imposta un callback per inviare effettivamente i dati quando il bufferr del socket è pronto a ricevere dati. Tutto questo è gestito in modo trasparente, quindi il codice dell'applicazione può essere scritto come se le operazioni di I/O avvenissero immediatamente.

    def connection_made(self, transport):
        self.transport = transport
        self.address = transport.get_extra_info('peername')
        self.log.debug(
            'connessione a {} porta {}'.format(*self.address)
        )
        # Potrebbe essere transport.writelines() eccetto che
        # avrebbe reso più difficile mestrare ciascuna parte del messaggio
        # che sta per essere spedito..
        for msg in self.messages:
            transport.write(msg)
            self.log.debug('in invio {!r}'.format(msg))
        if transport.can_write_eof():
            transport.write_eof()

Quando viene ricevuta la risposta dal server, viene registrata.

    def data_received(self, data):
        self.log.debug('ricevuto {!r}'.format(data))

Sia che venga ricevuto un marcatore di fine file (EOF) oppure che la connessione sia chiusa lato server, l'oggetto trasportatore locale viene chiuso e l'oggetto Future viene marcato come completato, impostandone il risultato.

    def eof_received(self):
        self.log.debug('ricevuto EOF')
        self.transport.close()
        if not self.f.done():
            self.f.set_result(True)

    def connection_lost(self, exc):
        self.log.debug('il server ha chiuso la connessione')
        self.transport.close()
        if not self.f.done():
            self.f.set_result(True)
        super().connection_lost(exc)

Normalmente la classe derivata da asyncio.Protocol è passata al ciclo di eventi per creare la connessione. In questo caso, visto che il ciclo di eventi non ha un modo per passare argomenti supplementari al costruttore del protocollo, è necessario creare un partial per impacchettare la classe client e passare l'elenco dei messaggi da inviare e l'istanza di Future. Il nuovo chiamabile viene poi usato al posto dalla classe quando si chiama create_connection() per stabilire la connessione client.

client_completed = asyncio.Future()

client_factory = functools.partial(
    EchoClient,
    messages=MESSAGES,
    future=client_completed,
)
factory_coroutine = event_loop.create_connection(
    client_factory,
    *SERVER_ADDRESS,
)

Per attivare l'esecuzione del client, il ciclo di eventi viene chiamato una volta con la coroutine per creare il client, quindi nuovamente con l'istanza di Future data al client per comunicare quando ha terminato. Usando due chiamate si evita l'avere un ciclo infinito nel programma client, che probabilmente vorrebbe terminare dopo che ha finito di comunicare con il server. Se fosse usata solo la prima chiamata per attendere la coroutine per creare il client, potrebbero non essere propriamente elaborati tutti i dati di risposta e la pulizia della connessione al server.

log.debug('in attesa del client per completare')
try:
    event_loop.run_until_complete(factory_coroutine)
    event_loop.run_until_complete(client_completed)
finally:
    log.debug('chiusura del ciclo di eventi')
    event_loop.close()
Risultato

L'esecuzione del server in una finestra e del client in un'altra produce il seguente risultato.

$ python3 asyncio_echo_client_protocol.py

asyncio: Using selector: EpollSelector
main: in attesa del client per completare
EchoClient: connessione a 127.0.0.1 porta 10000
EchoClient: in invio b"Questo e' il messaggio. "
EchoClient: in invio b"Sara' inviato "
EchoClient: in invio b'in parti.'
EchoClient: ricevuto b"Questo e' il messaggio. Sara' inviato in parti."
EchoClient: ricevuto EOF
EchoClient: il server ha chiuso la connessione
main: chiusura del ciclo di eventi

$ python3 asyncio_echo_client_protocol.py
asyncio: Using selector: EpollSelector
main: in attesa del client per completare
EchoClient: connessione a 127.0.0.1 porta 10000
EchoClient: in invio b"Questo e' il messaggio. "
EchoClient: in invio b"Sara' inviato "
EchoClient: in invio b'in parti.'
EchoClient: ricevuto b"Questo e' il messaggio. Sara' inviato in parti."
EchoClient: ricevuto EOF
EchoClient: il server ha chiuso la connessione
main: chiusura del ciclo di eventi

$ python3 asyncio_echo_client_protocol.py
asyncio: Using selector: EpollSelector
main: in attesa del client per completare
EchoClient: connessione a 127.0.0.1 porta 10000
EchoClient: in invio b"Questo e' il messaggio. "
EchoClient: in invio b"Sara' inviato "
EchoClient: in invio b'in parti.'
EchoClient: ricevuto b"Questo e' il messaggio. Sara' inviato in parti."
EchoClient: ricevuto EOF
EchoClient: il server ha chiuso la connessione
main: chiusura del ciclo di eventi

Anche se il client manda sempre i messaggi separatamente, la prima volta che il client viene eseguito, il server riceve un unico comprensivo messaggio e lo ritorna al client. Questi risultati variano in esecuzioni susseguenti, in base a quanto sia sovraccarica la rete e se i bufferr di rete siano svuotati prima che tutti i dati siano preparati.

python3.7 asyncio_echo_server_protocol.py

asyncio: Using selector: EpollSelector
main: in partenza on localhost porta 10000
EchoServer_127.0.0.1_46464: connessione accettata
EchoServer_127.0.0.1_46464: ricevuto b"Questo e' il messaggio. Sara' inviato in parti."
EchoServer_127.0.0.1_46464: inviato b"Questo e' il messaggio. Sara' inviato in parti."
EchoServer_127.0.0.1_46464: ricevuto EOF
EchoServer_127.0.0.1_46464: chiusura

EchoServer_127.0.0.1_46470: connessione accettata
EchoServer_127.0.0.1_46470: ricevuto b"Questo e' il messaggio. "
EchoServer_127.0.0.1_46470: inviato b"Questo e' il messaggio. "
EchoServer_127.0.0.1_46470: ricevuto b"Sara' inviato in parti."
EchoServer_127.0.0.1_46470: inviato b"Sara' inviato in parti."
EchoServer_127.0.0.1_46470: ricevuto EOF
EchoServer_127.0.0.1_46470: chiusura

EchoServer_127.0.0.1_46472: connessione accettata
EchoServer_127.0.0.1_46472: ricevuto b"Questo e' il messaggio. Sara' inviato "
EchoServer_127.0.0.1_46472: inviato b"Questo e' il messaggio. Sara' inviato "
EchoServer_127.0.0.1_46472: ricevuto b"in parti."
EchoServer_127.0.0.1_46472: inviato b"in parti."
EchoServer_127.0.0.1_46472: ricevuto EOF
EchoServer_127.0.0.1_46472: chiusura

I/O Asincrono Usando coroutine e Canali

Questa sezione esamina versioni alternative dei due programmi di esempio che implementano un server che ritorna i dati ricevuti dal client e un client che riottiene i dati inviati al server, usando coroutine e API per i canali di asyncio in luogo delle astrazioni di classi per il protocollo e il trasporto. Questi esempi operano a un livello di astrazione inferiore rispetto all'API Protocol discussa precedentemente, ma gli eventi elaborati sono simili.

Server Che Invia Quanto Ricevuto

Il server inizia importando i moduli che servono e deve impostare asyncio e logging, quindi crea l'oggetto per il ciclo di eventi.

# asyncio_echo_server_coroutine.py

import asyncio
import logging
import sys

SERVER_ADDRESS = ('localhost', 10000)
logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
)
log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()

Quindi definisce una coroutine per gestire la comunicazione. Ogni volta che si connette un client, viene invocata una nuova istanza della coroutine in modo che all'interno della funzione il codice comunichi con un solo client alla volta. Il linguaggio in esecuzione di Python gestisce lo stato per ciascuna istanza di coroutine, in modo che il codice dell'applicazione non deve gestire strutture dati supplementari per tracciare client separati.

Gli argomenti per la coroutine sono istanze di StreamrReader e StreamWriter associate con la nuova connessione. Così come per Transport, l'indirizzo del client può essere raggiunto attraverso il metodo get_extra_info() di StreamWriter.

async def echo(reader, writer):
    address = writer.get_extra_info('peername')
    log = logging.getLogger('echo_{}_{}'.format(*address))
    log.debug('connessione accettata')

Sebbene la coroutine sia chiamata quando viene stabilita la connessione, potrebbero non esserci ancora dati da leggere. Per evitare di bloccare mentre si sta leggendo, la coroutine usa await con la chiamata di read() per consentire al ciclo di eventi di proseguire elaborando altri compiti fino a quando ci sono dati da leggere.

    while True:
        data = await reader.read(128)

Se il client invia dati, sono ritornati da await e possono essere restituiti al client passandoli all'oggetto che li scrive. Chiamate multiple a write() possono essere usate per accumulare dati in uscita, per poi usare drain() per far uscire i dati. Visto che questa operazione di I/O su rete può bloccare, ancora una volta viene usato await per riportare il controllo al ciclo di eventi, che monitora il socket di scrittura e chiama l'oggetto che scrive quando possibile per inviare ulteriori dati.

        if data:
            log.debug('ricevuto {!r}'.format(data))
            writer.write(data)
            await writer.drain()
            log.debug('inviato {!r}'.format(data))

Se il client non ha inviato dati, read() ritorna una stringa di byte vuota per indicare che la connessione è chiusa. Il server deve chiudere il socket per scrivere al client, poi la coroutine può ritornare per indicare che ha terminato.

        else:
            log.debug('in chiusura')
            writer.close()
            return

Ci sono due passi per far partire il server. Prima l'applicazione dice al ciclo di eventi di creare un nuovo oggetto server usando la coroutine, il nome host e il socket sul quale ascoltare. Il metodo start_server() è esso stesso una coroutine, quindi i risultati devono essere elaborati dal ciclo di eventi per far effettivamente partire il server. Il completamento della coroutine produce una istanza di asyncio.Server legata al ciclo di eventi.

# Crea il server e lascia che ciclo termini la coroutine prima di far
# partire il ciclo di eventi effettivo.
factory = asyncio.start_server(echo, *SERVER_ADDRESS)
server = event_loop.run_until_complete(factory)
log.debug('in partenza su {} porta {}'.format(*SERVER_ADDRESS))

Successivamente il ciclo di eventi deve essere eseguito per elaborare eventi e gestire le richieste del client. Se occorre attivare un un servizio che sia attivo per lunghi periodi, il metodo run_forever() è il modo più semplice per farlo. Quando il ciclo di eventi viene fermato, sia dal codice dell'applicazione che da un segnale inviato al processo, il server può essere chiuso per pulire correttamente il socket, quindi il ciclo di eventi può essere chiuso per finire la gestione di ogni altra coroutine prima che il programma esca.

# Entra nel ciclo di eventi permanentemente per gestire tutte le connesisoni.
try:
    event_loop.run_forever()
except KeyboardInterrupt:
    pass
finally:
    log.debug('server in chiusura')
    server.close()
    event_loop.run_until_complete(server.wait_closed())
    log.debug('chiusura del ciclo di eventi')
    event_loop.close()
Client che Riceve Quanto Inviato

La costruzione di un client usando una coroutine è molto simile alla costruzione di un server. Ancora una volta il codice inizia importando i moduli necessari per impostare asyncio e logging, quindi crea un oggetto per il ciclo di eventi.

# asyncio_echo_client_coroutine.py

import asyncio
import logging
import sys

MESSAGES = [
    b"Questo e' il messaggio. ",
    b"Sara' inviato ",
    b'in parti.',
]
SERVER_ADDRESS = ('localhost', 10000)

logging.basicConfig(
    level=logging.DEBUG,
    format='%(name)s: %(message)s',
    stream=sys.stderr,
)
log = logging.getLogger('main')

event_loop = asyncio.get_event_loop()

La coroutine echo_client riceve argomenti che indicano dove sia il server e quale messaggi spedire.

async def echo_client(address, messages):

La coroutine viene chiamata quando inizia l'attività, ma non ha connessioni attive con cui lavorare. Il primo passo, quindi, è quello di impostare al client la sua propria connessione. Usa await per evitare di bloccare altre attività mentre è in esecuzione la coroutine open_connection().

    log = logging.getLogger('echo_client')

    log.debug('connessione a {} porta {}'.format(*address))
    reader, writer = await asyncio.open_connection(*address)

La coroutine open_connection() ritorna istanze di StreamReader e StreamWriter associate con il nuovo socket. Il prossimo passo è usare l'oggetto di scrittura per inviare dati al server. Così come sul server, l'oggetto di scrittura accumulerà i dati in uscita fino a che il socket è pronto oppure venga usato drain() per forzare la fuoriuscita. Visto che questa azione di I/O sulla rete può bloccare, viene usato async ancora una volta per restituire il controllo al ciclo di eventi, il quale monitora il socket di scrittura e chiama l'oggetto che scrive quando possibile per inviare ulteriori dati.

    # Potrebbe essere writer.writelines() eccetto che
    # avrebbe reso più difficile mostrare ciascuna parte del messaggio
    # che sta per essere spedito.
    for msg in messages:
        writer.write(msg)
        log.debug('in invio {!r}'.format(msg))
    if writer.can_write_eof():
        writer.write_eof()
    await writer.drain()

Successivamente il client cerca una risposta dal server tentando di leggere i dati fino che quando non rimane più nulla da leggere. Per evitare il bloccaggio su una singola chiamata di read(), await restituisce il controllo al ciclo di eventi. Se il server ha inviato dati, vengono registrati. Se il server non ha inviato dati, read() ritorna una stringa di byte vuota per indicate che la connessione è chiusa. Il client deve chiudere il socket usato per inviare dati al server, quindi ritornare per indicare che ha terminato.

    log.debug('in attesa di risposta')
    while True:
        data = await reader.read(128)
        if data:
            log.debug('ricevuto {!r}'.format(data))
        else:
            log.debug('in chiusura')
            writer.close()
            return

Per far partire il client, il ciclo di eventi viene chiamato con la coroutine per la creazione del client. L'utilizzo di run_until_complete() evita di avere un ciclo infinito nel programma client. Al contrario dell'esempio sul protocollo visto in precedenza, non è necessario un future separato per segnalare quando la coroutine è finita, poichè echo_client() contiene esso stesso tutta la logica client e non ritorna fino a che ha ricevuto una risposta e chiuso la connessione al server.

Risultato

L'esecuzione del server in una finestra di terminale e del client in un'altra, produce il seguente risultato.

$ python3 asyncio_echo_client_coroutine.py

asyncio: Using selector: EpollSelector
echo_client: connessione a localhost porta 10000
echo_client: in invio b"Questo e' il messaggio. "
echo_client: in invio b"Sara' inviato "
echo_client: in invio b'in parti.'
echo_client: in attesa di risposta
echo_client: ricevuto b"Questo e' il messaggio. Sara' inviato in parti."
echo_client: in chiusura
main: chiusura del ciclo di eventi

$ python3 asyncio_echo_client_coroutine.py

asyncio: Using selector: EpollSelector
echo_client: connessione a localhost porta 10000
echo_client: in invio b"Questo e' il messaggio. "
echo_client: in invio b"Sara' inviato "
echo_client: in invio b'in parti.'
echo_client: in attesa di risposta
echo_client: ricevuto b"Questo e' il messaggio. Sara' inviato in parti."
echo_client: in chiusura
main: chiusura del ciclo di eventi

$ python3 asyncio_echo_client_coroutine.py
asyncio: Using selector: EpollSelector
echo_client: connessione a localhost porta 10000
echo_client: in invio b"Questo e' il messaggio. "
echo_client: in invio b"Sara' inviato "
echo_client: in invio b'in parti.'
echo_client: in attesa di risposta
echo_client: ricevuto b"Questo e' il messaggio. Sara' inviato "
echo_client: ricevuto b"in parti."
echo_client: in chiusura
main: chiusura del ciclo di eventi

Sebbene il client invii sempre i messaggi separatamente, le prime due volte che il client viene eseguito il server riceve un messaggio più grande che ripete al client. Questi risultati variano nelle esecuzioni successive, in base a quanto impegnata sia la rete e se i buffer di rete vengano svuotati prima che tutti i dati siano preparati.

$ python3 asyncio_echo_server_coroutine.py

asyncio: Using selector: EpollSelector
main: in partenza su localhost porta 10000
echo_127.0.0.1_50818: connessione accettata
echo_127.0.0.1_50818: ricevuto b"Questo e' il messaggio. Sara' inviato in parti."
echo_127.0.0.1_50818: inviato b"Questo e' il messaggio. Sara' inviato in parti."
echo_127.0.0.1_50818: in chiusura
echo_127.0.0.1_50820: connessione accettata
echo_127.0.0.1_50820: ricevuto b"Questo e' il messaggio. Sara' inviato in parti."
echo_127.0.0.1_50820: inviato b"Questo e' il messaggio. Sara' inviato in parti."
echo_127.0.0.1_50820: in chiusura
echo_127.0.0.1_50822: connessione accettata
echo_127.0.0.1_50822: ricevuto b"Questo e' il messaggio. Sara' inviato"
echo_127.0.0.1_50822: inviato b"Questo e' il messaggio. Sara' inviato"
echo_127.0.0.1_50822: ricevuto b" in parti"
echo_127.0.0.1_50822: inviato b" in parti"
echo_127.0.0.1_50822: in chiusura
main: chiusura del ciclo di eventi

Usare SSL

asyncio ha incorporato il supporto per permettere comunicazioni SSL sui socket. Il passaggio di una istanza di SSLContext alle coroutine che creano connessioni server o client abilita i supporto e fa sì che l'impostazione del protocollo SSL sia compiuta prima che il socket sia presentato come pronto all'utilizzo dall'applicazione.

I server e client basati sulle coroutine della sezione precedente possono essere aggiornati con pochi piccoli cambiamenti. Il primo passo è la creazione del certificato e dei file chiave. Un certificato auto firmato può essere creato con un comando tipo:

$ openssl req -newkey rsa:2048 -nodes -keyout pymotw.key \
-x509 -days 365 -out pymotw.crt

Il comando openssl richiederà all'utente diversi valori che verranno usati per generare il certificato, quindi produrrà i file in uscita richiesti.

L'impostazione non sicura del socket dell'esempio del server precedente usa start_server() per creare il socket in ascolto.

factory = asyncio.start_server(echo, *SERVER_ADDRESS)
server = event_loop.run_until_complete(factory)

Per aggiungere la codifica, si crei un oggetto SSLContext con il certificato e la chiave appena generati, quindi lo si passi a start_server().

# Il certificato è creato con pymotw.com come nome host,
# il che non corrisponderà quando il codice di esempio viene eseguito
# altrove, quindi si disabiliti la verifica del nome host
ssl_context = ssl.create_default_context(ssl.Purpose.CLIENT_AUTH)
ssl_context.check_hostname = False
ssl_context.load_cert_chain('pymotw.crt', 'pymotw.key')

# Crea il server e lascia che il ciclo finisca la coroutine prima di
# far partire il ciclo di eventi effettivo
factory = asyncio.start_server(echo, *SERVER_ADDRESS, ssl=ssl_context)

Simili modifiche sono necessarie nel client. La vecchia versione usa open_connection() per creare il socket connesso al server.

    reader, writer = await asyncio.open_connection(*address)

Anche qui è richiesto un SSLContext per mettere in sicurezza la parte client del socket. L'identità del client non viene forzata, quindi si deve caricare solo il certificato.

    # Il certificato è creato con pymotw.com come nome host,
    # il che non corrisponderà quando il codice di esempio viene eseguito
    # altrove, quindi si disabiliti la verifica del nome host
    ssl_context = ssl.create_default_context(
        ssl.Purpose.SERVER_AUTH,
    )
    ssl_context.check_hostname = False
    ssl_context.load_verify_locations('pymotw.crt')
    reader, writer = await asyncio.open_connection(
        *server_address, ssl=ssl_context)

Un'altra piccola modifica è richiesta nel client. Visto che la connessione SSL non supporta l'invio di un carattere di fine file (EOF), il client usa al suo posto un byte NULL.

La vecchia versione del ciclo di invio del client usa write_eof().

    # Potrebbe essere writer.writelines() eccetto che
    # avrebbe reso più difficile mestrare ciascuna parte del messaggio
    # che sta per essere spedito..
    for msg in messages:
        writer.write(msg)
        log.debug('in invio {!r}'.format(msg))
    if writer.can_write_eof():
        writer.write_eof()
    await writer.drain()

La nuova versione invia un byte zero (b'\x00').

    # Potrebbe essere writer.writelines() eccetto che
    # avrebbe reso più difficile mestrare ciascuna parte del messaggio
    # che sta per essere spedito..
    for msg in messages:
        writer.write(msg)
        log.debug('sending {!r}'.format(msg))
    # SSL non supporta EOF, quindi si invia un byte null per indicare
    # la fine del messaggio.
    writer.write(b'\x00')
    await writer.drain()

La coroutine echo() del server deve cercare il byte NULL e chiudere la connessione con il client quando esso viene ricevuto.

async def echo(reader, writer):
    address = writer.get_extra_info('peername')
    log = logging.getLogger('echo_{}_{}'.format(*address))
    log.debug('connessione accettata')

    while True:
        data = await reader.read(128)
        terminate = data.endswith(b'\x00')
        data = data.rstrip(b'\x00')
        if data:
            log.debug('ricevuto {!r}'.format(data))
            writer.write(data)
            await writer.drain()
            log.debug('inviato {!r}'.format(data))
        if not data or terminate:
            log.debug('messaggio terminato, chiusura connessione')
            writer.close()
            return

L'esecuzione del server in una finestra di terminale e del client in un'altra, produce questo risultato:

$ python3 asyncio_echo_server_ssl.py

asyncio: Using selector: EpollSelector
main: in partenza su localhost porta 10000
echo_127.0.0.1_54812: connessione accettata
echo_127.0.0.1_54812: ricevuto b"Questo e' il messaggio. Sara' inviato in parti."
echo_127.0.0.1_54812: inviato b"Questo e' il messaggio. Sara' inviato in parti."
echo_127.0.0.1_54812: messaggio terminato, chiusura connessione

$ python3.7 asyncio_echo_client_ssl.py
asyncio: Using selector: EpollSelector
echo_client: connessione a localhost porta 10000
echo_client: sending b"Questo e' il messaggio. "
echo_client: sending b"Sara' inviato "
echo_client: sending b'in parti.'
echo_client: in attesa di risposta
echo_client: ricevuto b"Questo e' il messaggio. Sara' inviato in parti."
echo_client: in chiusura
main: chiusura del ciclo di eventi

Interagire con i Servizi di Nome di Dominio (DNS)

Le applicazioni usano la rete per comunicare con i server per operazioni DNS tipo la conversione tra nomi host e indirizzi IP, asyncio ha metodi di convenienza sul ciclo di eventi per gestire queste operazioni in background senza bloccare durante le interrogazioni.

Ricerca Indirizzo per Nome

Si usi la coroutine getaddrinfo() per convertire un nome host e un numero di porta in un indirizzo IP o IPv6. Come per la versione della funzione del modulo socket il valore ritornato è una tupla che contiene cinque elementi:

  1. La famiglia dell'indirizzo
  2. Il tipo di indirizzo
  3. Il protocollo
  4. Il nome canonico del server
  5. Una tupla con l'indirizzo del socket utilizzabile per aprire una connessione al server sulla porta specificata in origine.

Le interrogazioni possono essere filtrate per protocollo, come in questo esempio, dove sono ritornate solo le risposte TCP.

# asyncio_getaddrinfo.py

import asyncio
import logging
import socket
import sys


TARGETS = [
    ('pymotw.com', 'https'),
    ('robyp.x10host.com', 'http'),
    ('python.org', 'https'),
]


async def main(loop, targets):
    for target in targets:
        info = await loop.getaddrinfo(
            *target,
            proto=socket.IPPROTO_TCP,
        )

        for host in info:
            print('{:20}: {}'.format(target[0], host[4][0]))


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop, TARGETS))
finally:
    event_loop.close()

Questo esempio converte un nome host e un protocollo in un indirizzo IP e numero di porta.

$ python3 asyncio_getaddrinfo.py

pymotw.com          : 66.33.211.242
robyp.x10host.com   : 198.91.81.13
python.org          : 45.55.99.72
Ricerca del Nome per Indirizzo

La coroutine getnameinfo() converte un indirizzo IP in un nome host e un numero di porta in un nome di protocollo, dove possibile.

# asyncio_getnameinfo.py

import asyncio
import logging
import socket
import sys


TARGETS = [
    ('104.130.43.121', 443),
    ('198.91.81.13', 80),
]


async def main(loop, targets):
    for target in targets:
        info = await loop.getnameinfo(target)
        print('{:15}: {} {}'.format(target[0], *info))


event_loop = asyncio.get_event_loop()
try:
    event_loop.run_until_complete(main(event_loop, TARGETS))
finally:
    event_loop.close()

Questo esempio mostra che l'indirizzo IP per xrobyp.x10host.com fa riferimento a un server specificato mentre quello per python.org non viene risolto in un nome host.

$ python3 asyncio_getnameinfo.py

104.130.43.121 : 104.130.43.121 https
198.91.81.13   : xo12.x10hosting.com http

Lavorare con Sottoprocessi

E' di frequente necessario lavorare con altri programmi e processi, per trarre vantaggio dal codice esistente senza riscriverlo o per accedere a librerie o caratteristiche non disponibili all'interno di Python. Come per l'I/O di rete, asyncio include due astrazioni per lanciare un altro programma e interagire con esso.

Usare l'Astrazione di Protocollo con Sottoprocessi

Questo esempio usa una coroutine per lanciare un processo che esegue il comando unix df per ottenere lo spazio libero su dischi locali. Usa subprocess_exec() per lanciare il processo e legarlo a una classe protocollo che sa come leggere ed elaborare il risultato del comando df. I metodi della classe sono chiamati automaticamente in base a eventi I/O per il sottoprocesso. Visto che entrambi gli argomenti stdin e stderr sono impostati a None, questi canali di comunicazione non sono connessi al nuovo processo.

asyncio_subprocess_protocol.py
import asyncio
import functools


async def run_df(loop):
    print('in run_df')

    cmd_done = asyncio.Future(loop=loop)
    factory = functools.partial(DFProtocol, cmd_done)
    proc = loop.subprocess_exec(
        factory,
        'df', '-hl',
        stdin=None,
        stderr=None,
    )
    try:
        print('launching process')
        transport, protocol = await proc
        print('waiting for process to complete')
        await cmd_done
    finally:
        transport.close()

    return cmd_done.result()

La classe DFProtocol è derivata da SubProcessProtocol, che definisci una api per comunicare con un altro processo via conduttura. Ci si attende che l'argomento done sia un Future che il chiamante userà per conoscere quando termina il processo.

class DFProtocol(asyncio.SubprocessProtocol):

    FD_NAMES = ['stdin', 'stdout', 'stderr']

    def __init__(self, done_future):
        self.done = done_future
        self.buffer = bytearray()
        super().__init__()

Così come per una comunicazione via socket, connection_made() viene chiamata quando i canali in input per il nuovo processo sono impostati. L'argomento transport è una istanza di una sottoclasse di BaseSubprocessTransport. Può leggere dati fatti uscire dal processo e scrivere dati al canale in input per il processo, se il processo è stato configurato per ricevere input.

    def connection_made(self, transport):
        print('processo iniziato {}'.format(transport.get_pid()))
        self.transport = transport

Quando il processo ha generato un risultato, viene chiamato pipe_data_received() con il descrittore di file dove i dati sono stati emessi e i dati effettivamente letti dalla conduttura. La classe protocollo salva il risultato dal canale standard in uscita del processo in un buffer per successiva elaborazione.

    def pipe_data_received(self, fd, data):
        print('letti {} byte da {}'.format(len(data),
                                           self.FD_NAMES[fd]))
        if fd == 1:
            self.buffer.extend(data)

Quando il processo termina, viene chiamato process_exited(). Il codice di uscita del processo è disponibile dall'oggetto di trasporto tramite get_returncode(). In questo caso, se non ci sono errori segnalati il risultato disponibile viene decodificato ed elaborato prima di essere ritornato tramite l'istanza Future. Se c'è un errore, i risultati si considerano vuoti. L'impostazione del risultato del Future dice a run_df() che il processo è uscito, quindi viene effettuata una pulizia e ritornato il risultato.

    def process_exited(self):
        print('process uscito')
        return_code = self.transport.get_returncode()
        print('codice ritornato {}'.format(return_code))
        if not return_code:
            cmd_output = bytes(self.buffer).decode()
            results = self._parse_results(cmd_output)
        else:
            results = []
        self.done.set_result((return_code, results))

Il risultato del comando viene disposto in una sequenza di dizionari che mappano i nomi dell'intestazione ai loro valori per ogni riga del risultato, quindi viene restituita la lista risultante.

    def _parse_results(self, output):
        print('elaborazione risultati')
        # Il risultato ha una riga di intestazioni, tutte parole singole.
        # Le righe rimanenti sono una per il filesystem, con colonne
        # che corrispondono alle intestazioni (assumendo che nessun punto di
        # montaggio abbia spazi nel nome).
        if not output:
            return []
        lines = output.splitlines()
        headers = lines[0].split()
        devices = lines[1:]
        results = [
            dict(zip(headers, line.split()))
            for line in devices
        ]
        return results

La coroutine run_df() viene eseguita usando run_until_complete(), poi i risultati sono esaminati e viene stampato lo spazio libero su ogni dispositivo.

event_loop = asyncio.get_event_loop()
try:
    return_code, results = event_loop.run_until_complete(
        run_df(event_loop)
    )
finally:
    event_loop.close()

if return_code:
    print('errore in uscita {}'.format(return_code))
else:
    print('\nSpazio libero:')
    for r in results:
        print('{Mounted:25}: {Avail}'.format(**r))

Il risultato qui sotto mostra la sequenza di passi compiuti, e lo spazio libero sui drive del sistema dove è stato eseguito il programma.

$ python3 asyncio_subprocess_protocol.py

in run_df
lancio del processo
processo iniziato 14200
in attesa del completamento del processo
letti 1319 byte da stdout
processo uscito
codice ritornato 0
elaborazione risultati

Spazio libero:
/dati2                   : 41G
/dati3                   : 60G
/dati                    : 47G
/dati1                   : 30G
Chiamare Sottoprocessi con Coroutine e Canali

Per usare coroutine per eseguire direttamente un processo, invece che accedervi tramite una sottoclasse di Protocol, si chiami create_subprocess_exec() e si specifichi quale tra stdout, stderr, e stdin connettere a una conduttura. Il risultato della generazione del processo da parte della coroutine è una istanza di Process che può essere usata per manipolare il sottoprocesso o comunicare con esso.

# asyncio_subprocess_coroutine.py

import asyncio
import asyncio.subprocess


async def run_df():
    print('in run_df')

    buffer = bytearray()

    create = asyncio.create_subprocess_exec(
        'df', '-hl',
        stdout=asyncio.subprocess.PIPE,
    )
    print('processo lanciato')
    proc = await create
    print('processo partito {}'.format(proc.pid))

In questo esempio, df non necessita di alcun input eccetto quello dei suoi argomenti da riga di comando, quindi il prossimo passo è leggere tutto il risultato. Con Protocol non vi è controllo su quanti dati vengono letti alla volta. Questo esempio usa readline() ma potrebbe anche chiamare read() direttamente per leggere dati che non siano riga per riga. Il risultato del commando viene conservato, come nell'esempio del protocollo, in modo che possa essere successivamente elaborato.

    while True:
        line = await proc.stdout.readline()
        print('read {!r}'.format(line))
        if not line:
            print('non ci sono più risultati dal comando')
            break
        buffer.extend(line)

Il metodo readline() ritorna un stringa di byte vuota quando non si sono più risultati in quanto il programma è terminato. Per assicurarsi che il processo sia pulito accuratamente, il passo successivo è attendere che il processo esca completamente.

    print('in attesa di completamento del processo')
    await proc.wait()

A questo punto lo stato di uscita può essere esaminato per determinare se elaborare il risultato o trattarlo come errore se non è stato prodotto alcun risultato. La logica di elaborazione è la stessa dell'esempio precedente, ma in una funzione a se stante (qui non mostrata) visto che non vi è una classe protocollo nella quale nasconderla. Dopo che i dati sono stati elaborati, i risultati e il codice di uscita sono ritornati al chiamante.

    return_code = proc.returncode
    print('codice di ritorno {}'.format(return_code))
    if not return_code:
        cmd_output = bytes(buffer).decode()
        results = _parse_results(cmd_output)
    else:
        results = []

    return (return_code, results)

Il programma principale è simile a quello basato sul protocollo, visto che le modifiche di implementazione sono isolate in run_df().

event_loop = asyncio.get_event_loop()
try:
    return_code, results = event_loop.run_until_complete(
        run_df()
    )
finally:
    event_loop.close()

if return_code:
    print('errore in uscita {}'.format(return_code))
else:
    print('\nSpazio libero:')
    for r in results:
        print('{Mounted:25}: {Avail}'.format(**r))

Visto che il risultato da df può essere letto una riga per volta, viene replicato per mostrare l'avanzamento del programma. Altrimenti il risultato dell'esecuzione sarebbe simile a quello dell'esempio precedente.

$ python3 asyncio_subprocess_coroutine.py

in run_df
processo lanciato
processo partito 20018
letto b'Filesystem              Size  Used Avail Use% Mounted on\n'
letto b'/dev/sda8                92G   47G   41G  54% /dati2\n'
letto b'/dev/sda9               138G   71G   60G  55% /dati3\n'
letto b'/dev/sda6                92G   41G   47G  47% /dati\n'
letto b'/dev/sda7               184G  145G   30G  84% /dati1\n'
letto b''
non ci sono più risultati dal comando
in attesa di completamento del processo
codice di ritorno 0
elaborazione risultati

Spazio libero:
/dati2                   : 41G
/dati3                   : 60G
/dati                    : 47G
/dati1                   : 30G
Inviare Dati a un Sottoprocesso

Entrambi gli esempi precedenti usavano solo un singolo canale di comunicazione per leggere dati da un secondo processo. Spesso è necessario inviare dati a un comando affinchè avvenga l'elaborazione. Questo esempio definisce un coroutine per eseguire il comando Unix tr per tradurre caratteri nel suo canale in input. In questo caso, tr viene usato per convertire lettere minuscole in maiuscole.

La coroutine to_upper() riceve come argomento un ciclo di eventi e una stringa. Genera un secondo processo che esegue "tr [:lower] [:upper]".

# asyncio_subprocess_coroutine_write.py

import asyncio
import asyncio.subprocess


async def to_upper(input):
    print('in to_upper')

    create = asyncio.create_subprocess_exec(
        'tr', '[:lower:]', '[:upper:]',
        stdout=asyncio.subprocess.PIPE,
        stdin=asyncio.subprocess.PIPE,
    )
    print('processo lanciato')
    proc = await create
    print('pid {}'.format(proc.pid))

Successivamente to_upper() usa il metodo communicate() di Process per inviare la stringa in input al comando e leggere tutto il risultato, in modo asincrono. Come per la versione che usa subprocess.Popen dello stesso metodo, communicate() ritorna il risultato completo come stringa di byte. Se è probabile che un comando possa produrre un risultato che non possa essere conservato agevolmente in memoria, l'input non può essere prodotto tutto in una volta, oppure il risultato deve essere elaborato in modo incrementale; è possibile usare i gestori di stdin, stdout e stderr di Process direttamente invece che chiamare communicate().

    print('communicazione con il processo')
    stdout, stderr = await proc.communicate(input.encode())

Dopo il completamento delle operazioni di I/O, l'attendere che il processo esca completamente assicura che sia propriamente pulito.

    print('in attesa del completamento del processo')
    await proc.wait()

Il codice di ritorno può essere poi esaminato e la stringa di byte decodificata, per preparare il valore di ritorno dalla coroutine.

    return_code = proc.returncode
    print('return code {}'.format(return_code))
    if not return_code:
        results = bytes(stdout).decode()
    else:
        results = ''

    return (return_code, results)

La parte principale del programma imposta una stringa di messaggio da trasformare, quindi imposta il ciclo di eventi per eseguire to_upper() e stampare il risultato.

MESSAGE = """
Questo messaggio sara' convertito a
tutte maiuscole.
"""

event_loop = asyncio.get_event_loop()
try:
    return_code, results = event_loop.run_until_complete(
        to_upper(MESSAGE)
    )
finally:
    event_loop.close()

if return_code:
    print('errore in uscita {}'.format(return_code))
else:
    print('Originale : {!r}'.format(MESSAGE))
    print('Modificato: {!r}'.format(results))

Il risultato mostra la sequenza di operazioni, quindi come il messaggio di testo viene trasformato.

$ python3 asyncio_subprocess_coroutine_write.py

in to_upper
processo lanciato
pid 25350
communicazione con il processo
in attesa del completamento del processo
codice di ritorno 0
Originale : "\nQuesto messaggio sara' convertito a\ntutte maiuscole.\n"
Modificato: "\nQUESTO MESSAGGIO SARA' CONVERTITO A\nTUTTE MAIUSCOLE.\n"

Ricevere Segnali Unix

Le notifiche di eventi del sistema Unix in genere interrompono una applicazione, attivandone i propri gestori. Quando usati con asyncio i callback sono interconnessi con le altre coroutine e callback gestiti dal ciclo di eventi. Il che si traduce in meno funzioni interrotte, e la risultante esigenza è di fornire salvaguardie per pulire operazioni incomplete.

I gestori di segnale devono essere normali chiamabili, non coroutine.

# asyncio_signal.py

import asyncio
import functools
import os
import signal


def signal_handler(name):
    print('signal_handler({!r})'.format(name))

I gestori di segnale sono registrati usando add_signal_handler(). Il primo argomento è il segnale e il secondo è il callback. I callback sono passati senza argomenti quindi se ci sono argomenti necessari occorre impacchettare la funzione con functools.partial().

event_loop = asyncio.get_event_loop()

event_loop.add_signal_handler(
    signal.SIGHUP,
    functools.partial(signal_handler, name='SIGHUP'),
)
event_loop.add_signal_handler(
    signal.SIGUSR1,
    functools.partial(signal_handler, name='SIGUSR1'),
)
event_loop.add_signal_handler(
    signal.SIGINT,
    functools.partial(signal_handler, name='SIGINT'),

Questo programma di esempio usa una coroutine per inviare segnali a se stesso tramite os.kill(). Dopo l'invio di ciascun segnale, la coroutine cede il controllo per consentire al gestore di essere eseguito. In una normale applicazione, ci dovrebbero essere più punti dove il codice dell'applicazione cede il controllo al ciclo di eventi e nessuna "cessione" artificiale come questa sarebbe necessaria.

async def send_signals():
    pid = os.getpid()
    print('Partenza di send_signals per {}'.format(pid))

    for name in ['SIGHUP', 'SIGHUP', 'SIGUSR1', 'SIGINT']:
        print('sending {}'.format(name))
        os.kill(pid, getattr(signal, name))
        # La cessione del controllo consente al gestore del segnale di essere
        # eseguito visto che il segnale non interrompe il flusso del programma
        # in altro modo
        print('cessione del controllo')
        await asyncio.sleep(0.01)
    return

Il programma principale esegue send_signals() fino a che ha inviato tutti i segnali.

try:
    event_loop.run_until_complete(send_signals())
finally:
    event_loop.close()

Il risultato mostra come i gestori sono chiamati quando send_signals() cede il controllo dopo aver inviato un segnale.

$ python3 asyncio_signal.py

Partenza di send_signals per 11691
sending SIGHUP
cessione del controllo
signal_handler('SIGHUP')
sending SIGHUP
cessione del controllo
signal_handler('SIGHUP')
sending SIGUSR1
cessione del controllo
signal_handler('SIGUSR1')
sending SIGINT
cessione del controllo
signal_handler('SIGINT')

Combinare Coroutine con Thread e Processi

Molte delle librerie esistenti non sono pronte per usare asyncio nativamente. Potrebbero bloccare, o dipendere su caratteristiche di concorrenzialità non disponibili attraverso il modulo. E' ancora possibile usare queste librerie in una applicazione basata su asyncio usando un esecutore da concurrent.futures per eseguire il codice o in un thread separato o in un processo separato.

Thread

Il metodo run_in_executor() del ciclo di eventi riceve una istanza di esecutore, un normale chiamabile da invocare, e qualunque argomento sia da passare a quest'ultimo. Ritorna un Future che può essere usato per attendere che una funzione finisca il proprio lavoro e ritorni qualcosa. Se non viene passato alcun esecutore, viene creato un ThreadPoolExecutor. Questo esempio crea esplicitamente un esecutore per limitare il numero di thread di elaboratori che saranno disponibili.

Un ThreadPoolExecutor lancia i suoi thread esecutori, quindi chiama ognuna delle funzioni fornite una volta in un thread. Questo esempio mostra come combinare run_in_executor() e wait() per fare in modo che una coroutine ceda il controllo al ciclo di eventi mentre le funzioni bloccanti sono eseguite in thread separati, per poi riattivarsi quando le funzioni sono terminate.

# asyncio_executor_thread.py

import asyncio
import concurrent.futures
import logging
import sys
import time


def blocks(n):
    log = logging.getLogger('blocchi({})'.format(n))
    log.info('in esecuzione')
    time.sleep(0.1)
    log.info('done')
    return n ** 2


async def run_blocking_tasks(executor):
    log = logging.getLogger('run_blocking_tasks')
    log.info('partenza')

    log.info('creazione dell\'attività esecutore')
    loop = asyncio.get_event_loop()
    blocking_tasks = [
        loop.run_in_executor(executor, blocks, i)
        for i in range(6)
    ]
    log.info('in attesa dell\'attività esecutore')
    completed, pending = await asyncio.wait(blocking_tasks)
    results = [t.result() for t in completed]
    log.info('risultati: {!r}'.format(results))

    log.info('in uscita')


if __name__ == '__main__':
    # Configura logging per mostrare il nome del thread
    # dove il messaggio registrato si origina
    logging.basicConfig(
        level=logging.INFO,
        format='%(threadName)10s %(name)18s: %(message)s',
        stream=sys.stderr,
    )

    # Crea un pool di thread limitato
    executor = concurrent.futures.ThreadPoolExecutor(
        max_workers=3,
    )

    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(
            run_blocking_tasks(executor)
        )
    finally:
        event_loop.close()

asyncio_executor_thread.py usa logging per indicare opportunamente quale thread e funzione stanno producendo ogni messaggio registrato. Visto che un logger separato è usato in ciascuna chiamata a blocks(), il risultato mostra chiaramente che gli stessi thread sono riutilizzati per chiamare multiple copie della funzione con argomenti differenti.

$ python3 asyncio_executor_thread.py

MainThread run_blocking_tasks: partenza
MainThread run_blocking_tasks: creazione dell'attività esecutore
ThreadPoolExecutor-0_0         blocchi(0): in esecuzione
ThreadPoolExecutor-0_1         blocchi(1): in esecuzione
ThreadPoolExecutor-0_2         blocchi(2): in esecuzione
MainThread run_blocking_tasks: in attesa dell'attività esecutore
ThreadPoolExecutor-0_0         blocchi(0): done
ThreadPoolExecutor-0_0         blocchi(3): in esecuzione
ThreadPoolExecutor-0_1         blocchi(1): done
ThreadPoolExecutor-0_1         blocchi(4): in esecuzione
ThreadPoolExecutor-0_2         blocchi(2): done
ThreadPoolExecutor-0_2         blocchi(5): in esecuzione
ThreadPoolExecutor-0_0         blocchi(3): done
ThreadPoolExecutor-0_1         blocchi(4): done
ThreadPoolExecutor-0_2         blocchi(5): done
MainThread run_blocking_tasks: risultati: [16, 0, 25, 4, 1, 9]
MainThread run_blocking_tasks: in uscita
Processi

Un ProcessPoolExecutor lavora pressochè allo stesso modo, creando un insieme di processi esecutori al posto dei thread. L'utilizzo di processi separati richiede più risorse di sistema, ma per operazioni molto pesanti dal punto di vista del calcolo computazionale può avere senso eseguire attività separate in ciascun core di CPU.

# asyncio_executor_process.py

import asyncio
import concurrent.futures
import logging
import sys
import time


def blocks(n):
    log = logging.getLogger('blocchi({})'.format(n))
    log.info('in esecuzione')
    time.sleep(0.1)
    log.info('done')
    return n ** 2


async def run_blocking_tasks(executor):
    log = logging.getLogger('run_blocking_tasks')
    log.info('partenza')

    log.info('creazione dell\'attività esecutore')
    loop = asyncio.get_event_loop()
    blocking_tasks = [
        loop.run_in_executor(executor, blocks, i)
        for i in range(6)
    ]
    log.info('in attesa dell\'attività esecutore')
    completed, pending = await asyncio.wait(blocking_tasks)
    results = [t.result() for t in completed]
    log.info('risultati: {!r}'.format(results))

    log.info('in uscita')


# mdofiche rispetto a asyncio_executor_thread.py

if __name__ == '__main__':
    # Configura logging per mostrare l'identificativo del processo
    # dove il messaggio registrato si origina
    logging.basicConfig(
        level=logging.INFO,
        format='PID %(process)5s %(name)18s: %(message)s',
        stream=sys.stderr,
    )

    # Crea un pool di processi limitato
    executor = concurrent.futures.ProcessPoolExecutor(
        max_workers=3,
    )

    event_loop = asyncio.get_event_loop()
    try:
        event_loop.run_until_complete(
            run_blocking_tasks(executor)
        )
    finally:
        event_loop.close()

Le sole modifiche necessarie per passare dai thread ai processi è la creazione di un tipo diverso di esecutore. Questo esempio modifica anche il formato della stringa di registrazione per includere l'identificativo del processo in luogo del nome del thread, per dimostrare che le attività sono in effetti in esecuzione su processi separati.

$ python3 asyncio_executor_process.py

PID 16924 run_blocking_tasks: partenza
PID 16924 run_blocking_tasks: creazione dell'attività esecutore
PID 16924 run_blocking_tasks: in attesa dell'attività esecutore
PID 16925         blocchi(0): in esecuzione
PID 16926         blocchi(1): in esecuzione
PID 16927         blocchi(2): in esecuzione
PID 16925         blocchi(0): done
PID 16926         blocchi(1): done
PID 16927         blocchi(2): done
PID 16925         blocchi(3): in esecuzione
PID 16927         blocchi(5): in esecuzione
PID 16926         blocchi(4): in esecuzione
PID 16927         blocchi(5): done
PID 16925         blocchi(3): done
PID 16926         blocchi(4): done
PID 16924 run_blocking_tasks: risultati: [9, 16, 25, 0, 1, 4]
PID 16924 run_blocking_tasks: in uscita

Debug con asyncio

Ci sono parecchie utili caratteristiche di debugging all'interno di asyncio.

Per prima cosa il ciclo di eventi usa logging per emettere messaggi di stato mentre è in esecuzione. Alcuni di essi sono disponibili se viene abilitato il logging in una applicazione. Altri possono essere attivati dicendo al ciclo di emettere ulteriori messaggi di debug. Si chiami set_debug() passando un valore booleano che indichi se il debugging debba essere o meno abilitato.

Visto che le applicazione costruite su asyncio sono molto sensibili rispetto a coroutine "avide" che falliscano nel cedere il controllo, è supportata la caratteristica di individuare callback lenti costruiti all'interno del ciclo di eventi. Attivati dall'abilitazione del debugging, gestendo la definizione di "lento" tramite l'impostazione della proprietà del ciclo slow_callback_duration nel numero di secondi dopo i quali si dovrebbe emettere un avvertimento.

In ultimo, se un'applicazione che usa asyncio esce senza pulire qualcuna delle sue coroutine o altre risorse, potrebbe significare che c'è un errore logico che previene parte dell'esecuzione del codice applicativo. Si abiliti ResourceWarning per fare in modo che queste casistiche vengano segnalate quando il programma esce.

# asyncio_debug.py

import argparse
import asyncio
import logging
import sys
import time
import warnings

parser = argparse.ArgumentParser('debugging asyncio')
parser.add_argument(
    '-v',
    dest='verbose',
    default=False,
    action='store_true',
)
args = parser.parse_args()

logging.basicConfig(
    level=logging.DEBUG,
    format='%(levelname)7s: %(message)s',
    stream=sys.stderr,
)
LOG = logging.getLogger('')


async def inner():
    LOG.info('inner in partenza')
    # Usa un passo bloccante via sleep per simulare
    # esecuzione di lavoro all'interno della funzione
    time.sleep(0.1)
    LOG.info('inner completata')


async def outer(loop):
    LOG.info('outer in partenza')
    await asyncio.ensure_future(loop.create_task(inner()))
    LOG.info('outer completata')


event_loop = asyncio.get_event_loop()
if args.verbose:
    LOG.info('debugging abilitato')

    # Enable debugging
    event_loop.set_debug(True)

    # Imposta la soglia per attività "lente" molto bassa a scopi
    # illustrativi. Il valore predefinito è 0.1, o 100 millisecondi.
    event_loop.slow_callback_duration = 0.001

    # Segnala tutti gli errori gestiti dalle risorse asincrone
    warnings.simplefilter('always', ResourceWarning)

LOG.info('entrata nel ciclo di eventi')
event_loop.run_until_complete(outer(event_loop))

Quando eseguito senza il debugging abilitato, tutto sembra a posto con questa applicazione.

$ python3 asyncio_debug.py

DEBUG: Using selector: EpollSelector
INFO: entrata nel ciclo di eventi
INFO: outer in partenza
INFO: inner in partenza
INFO: inner completata
INFO: outer completata

L'abilitazione del debugging espone alcuni dei problemi che ha, incluso il fatto che sebbene inner() finisca, impiega più tempo di quanto impostato in slow_callback_duration e che il ciclo di eventi non è propriamente chiuso quando il programma esce.

python3 asyncio_debug.py -v

  DEBUG: Using selector: EpollSelector
   INFO: debugging abilitato
   INFO: entrata nel ciclo di eventi
   INFO: outer in partenza
WARNING: Executing <Task pending coro=<outer() running at asyncio_debug.py:37> wait_for=<Task pending coro=<inner() running at asyncio_debug.py:27> cb=[<TaskWakeupMethWrapper object at 0x7f584708d0d0>()] created at asyncio_debug.py:37> cb=[_run_until_complete_cb() at /dati/anaconda3/lib/python3.7/asyncio/base_events.py:153] created at /dati/anaconda3/lib/python3.7/asyncio/base_events.py:558> took 0.006 seconds
   INFO: inner in partenza
   INFO: inner completata
WARNING: Executing <Task finished coro=<inner() done, defined at asyncio_debug.py:27> result=None created at asyncio_debug.py:37> took 0.101 seconds
   INFO: outer completata

Vedere anche:

asyncio
La documentazione della libreria standard per questo modulo.
PEP 3156
Asynchronous IO Support Rebooted: the “asyncio” Module
PEP 380
Syntax for Delegating to a Subgenerator
PEP 492
Coroutines with async and await syntax
concurrent.futures.html
Gestisce insiemi di compiti concomitanti
socket
Comunicazione di rete a basso livello
socketserver
Infrastruttura per creare server di rete
signal
Eventi di sistema asincroni
asyncio: What's new in Python 3.6
Sommario delle modifiche a asyncio mentre l'API si stabilizza in Python 3.6
trollius
Un port di Tulip, la versione originale di asyncio su Python 2
The New asyncio Module in Python 3.4: Event Loops
Articolo di Gastón Hillar su Dr. Dobb’s
A Web Crawler With asyncio Coroutines
Un articolo su The Architecture of Open Source Applications di A. Jesse Jiryu Davis e Guido van Rossum
Playing with asyncio
Un post sul blog di Nathan Hoad
Async I/O and Python
Un post sul blog di Mark McLoughlin
A Curious Course on Coroutines and Concurrency – PyCon 2009
Tutorial di David Beazley
How the heck does async/await work in Python 3.5
Un post sul blog di Brett Cannon
Unix Network Programming, Volume 1: The Sockets Networking API, 3/E
W. Richard Stevens, Bill Fenner, and Andrew M. Rudoff. Published pubblicato da Addison-Wesley Professional, 2004. ISBN-10: 0131411551
Foundations of Python Network Programminng, 3/E
Brandon Rhodes e John Goerzen. Pubblicato da Apress, 2014. ISBN-10: 1430258543