concurrent.futures - Gestisce Insiemi di Compiti Concomitanti

Scopo: Gestire facilmente compiti in esecuzione concomitanti ed in parallelo

Il modulo concurrent.futures fornisce interfacce per eseguire compiti usando insiemi (pool) di thread o elaboratori di processo. Le API sono le stesse, quindi le applicazioni possono passare tra thread e processi con minimi aggiustamenti.

Il modulo fornisce due tipi di classi per interagire con gli insiemi. Gli esecutori sono usati per gestire gruppi di elaboratori e i futures sono usati per gestire i risultati prodotti dagli esecutori. Per usare un gruppo di elaboratori una applicazione crea una istanza della classe di esecutore appropriata, quindi sottopone a essa i compiti da eseguire. Quando si fa partire ciascun compito, viene ritornata una istanza di Future. Quando è richiesto il risultato di un compito una applicazione può usare Future per bloccare fino a che si rende disponibile il risultato. Sono fornite varie API per rendere conveniente attendere che un compito si completi, in modo che gli oggetti Future non debbano essere gestiti direttamente.

Usare map() con un Insieme di Thread Basico

ThreadPoolExecutor gestisce un insieme di thread di elaboratori, passando a essi i compiti mano a mano che si rendono disponibili per eseguire nuove elaborazioni. Questo esempio usa map() per produrre in concomitanza un insieme di risultati da un input iterabile. Il compito usa time.sleep() per mettersi in pausa per un determinato tempo per dimostrare che, a prescindere dall'ordine di esecuzione dei compiti concomitanti, map() ritorna sempre i valori nell'ordine basato sull'input.

# futures_thread_pool_map.py

from concurrent import futures
import threading
import time


def task(n):
    print('{}: in pausa {}'.format(
        threading.current_thread().name,
        n)
    )
    time.sleep(n / 10)
    print('{}: eseguito con {}'.format(
        threading.current_thread().name,
        n)
    )
    return n / 10


ex = futures.ThreadPoolExecutor(max_workers=2)
print('principale: in partenza')
results = ex.map(task, range(5, 0, -1))
print('principale: risultati non elaborati {}'.format(results))
print('principale: in attesa dei veri risultati')
real_results = list(results)
print('principale: resultati: {}'.format(real_results))

Il valore ritornato da map() è in realtà un tipo speciale di iteratore che sa come attendere ciascuna risposta mano a mano che programma esegue le iterazioni.

$ python3 futures_thread_pool_map.py

principale: in partenza
ThreadPoolExecutor-0_0: in pausa 5ThreadPoolExecutor-0_1: in pausa 4principale: risultati non elaborati <generator object Executor.map.<locals>.result_iterator at 0x7f7fe09bc740>
principale: in attesa dei veri risultati


ThreadPoolExecutor-0_1: eseguito con 4
ThreadPoolExecutor-0_1: in pausa 3
ThreadPoolExecutor-0_0: eseguito con 5
ThreadPoolExecutor-0_0: in pausa 2
ThreadPoolExecutor-0_1: eseguito con 3
ThreadPoolExecutor-0_1: in pausa 1
ThreadPoolExecutor-0_0: eseguito con 2
ThreadPoolExecutor-0_1: eseguito con 1
principale: resultati: [0.5, 0.4, 0.3, 0.2, 0.1]

Pianificare Compiti Individuali

Oltre all'utilizzo di map(), è possibile pianificare un compito individuale con un esecutore usando submit(), e usare l'istanza di Future ritornata per attendere il risultato del compito.

# futures_thread_pool_submit.py

from concurrent import futures
import threading
import time


def task(n):
    print('{}: in pausa {}'.format(
        threading.current_thread().name,
        n)
    )
    time.sleep(n / 10)
    print('{}: fatto con {}'.format(
        threading.current_thread().name,
        n)
    )
    return n / 10


ex = futures.ThreadPoolExecutor(max_workers=2)
print('principale: starting')
f = ex.submit(task, 5)
print('principale: future: {}'.format(f))
print('principale: in attesa di risultati')
result = f.result()
print('principale: risultato: {}'.format(result))
print('principale: future dopo il risultato: {}'.format(f))

Lo stato del future cambia dopo che i compiti sono completati ed il risultato è reso disponibile.

$ python3 futures_thread_pool_submit.py

principale: starting
ThreadPoolExecutor-0_0: in pausa 5principale: future: <Future at 0x7f7e15f4ea60 state=running>
principale: in attesa di risultati

ThreadPoolExecutor-0_0: fatto con 5
principale: risultato: 0.5
principale: future dopo il risultato: <Future at 0x7f7e15f4ea60 state=finished returned float>

Attendere Compiti in Qualsiasi Ordine

La chiamata del metodo result() di un Future blocca fino a quando il compito si completa (sia ritornando un valore che sollevando una eccezione) oppure se viene cancellato. I risultati di compiti multipli possono essere fruiti nell'ordine nel quale i compiti sono stati pianificati usando map(). Qualora non importi l'ordine nel quale i risultati dovranno essere processati, si usi as_completed() per elaborarli non appena ciascun compito finisce.

# futures_as_completed.py

from concurrent import futures
import random
import time


def task(n):
    time.sleep(random.random())
    return (n, n / 10)


ex = futures.ThreadPoolExecutor(max_workers=5)
print('principale: in partenza')

wait_for = [
    ex.submit(task, i)
    for i in range(5, 0, -1)
]

for f in futures.as_completed(wait_for):
    print('principale: risultato: {}'.format(f.result()))

Visto che l'insieme ha molti elaboratori come compiti, è possibile far partire tutti i compiti. Essi finiranno in ordine casuale quindi i valori generati da as_completed() sono diversi ogni volta che l'esempio viene eseguito.

$ python3 futures_as_completed.py

principale: in partenza
principale: risultato: (3, 0.3)
principale: risultato: (5, 0.5)
principale: risultato: (4, 0.4)
principale: risultato: (2, 0.2)
principale: risultato: (1, 0.1)

Callback da Future

Per intraprendere qualche azione quando un compito è completato, senza attendere esplicitamente il risultato, si utilizzi add_done_callback() per specificare una nuova funzione da chiamare quando Future ha finito. Il callback dovrebbe essere una funzione che ottiene un singolo argomento, l'istanza di Future.

# futures_future_callback.py

from concurrent import futures
import time


def task(n):
    print('{}: in pausa'.format(n))
    time.sleep(0.5)
    print('{}: fatto'.format(n))
    return n / 10


def done(fn):
    if fn.cancelled():
        print('{}: cancellato'.format(fn.arg))
    elif fn.done():
        error = fn.exception()
        if error:
            print('{}: errore restituito: {}'.format(
                fn.arg, error))
        else:
            result = fn.result()
            print('{}: valore restituito: {}'.format(
                fn.arg, result))


if __name__ == '__main__':
    ex = futures.ThreadPoolExecutor(max_workers=2)
    print('principale: in partenza')
    f = ex.submit(task, 5)
    f.arg = 5
    f.add_done_callback(done)
    result = f.result()

Il callback viene chiamato a prescindere dalla ragione per la quale Future si considera "completato", quindi è necessario verificare lo stato dell'oggetto passato al callback prima di utilizzarlo in qualsivoglia modo.

$ python3 futures_future_callback.py

principale: in partenza
5: in pausa
5: fatto
5: valore restituito: 0.5

Cancellare Compiti

Un Future può essere cancellato, se è stato sottomesso ma non è ancora partito, chiamando il metodo cancel().

# futures_future_callback_cancel.py

from concurrent import futures
import time


def task(n):
    print('{}: in pausa'.format(n))
    time.sleep(0.5)
    print('{}: fatto'.format(n))
    return n / 10


def done(fn):
    if fn.cancelled():
        print('{}: cancellato'.format(fn.arg))
    elif fn.done():
        print('{}: non cancellato'.format(fn.arg))


if __name__ == '__main__':
    ex = futures.ThreadPoolExecutor(max_workers=2)
    print('principale: in partenza')
    tasks = []

    for i in range(10, 0, -1):
        print('principale: sottomesso {}'.format(i))
        f = ex.submit(task, i)
        f.arg = i
        f.add_done_callback(done)
        tasks.append((i, f))

    for i, t in reversed(tasks):
        if not t.cancel():
            print('principale: non si cancella {}'.format(i))

    ex.shutdown()

cancel() ritorna un booleano che indica se il compito sia stato cancellato o meno.

$ python3 futures_future_callback_cancel.py

principale: in partenza
principale: sottomesso 10
10: in pausa
principale: sottomesso 9
9: in pausaprincipale: sottomesso 8
principale: sottomesso 7

principale: sottomesso 6
principale: sottomesso 5
principale: sottomesso 4
principale: sottomesso 3
principale: sottomesso 2
principale: sottomesso 1
1: cancellato
2: cancellato
3: cancellato
4: cancellato
5: cancellato
6: cancellato
7: cancellato
8: cancellato
principale: non si cancella 9
principale: non si cancella 10
10: fatto
10: non cancellato
9: fatto
9: non cancellato

Eccezioni nei Compiti

Se un compito solleva una eccezione non gestita, viene salvata nel Future per il compito e resa disponibile tramite i metodi result() o exception().

# futures_future_exception.py

from concurrent import futures


def task(n):
    print('{}: in partenza'.format(n))
    raise ValueError('il valore {} non è buono'.format(n))


ex = futures.ThreadPoolExecutor(max_workers=2)
print('principale: in partenza')
f = ex.submit(task, 5)

error = f.exception()
print('principale: errore: {}'.format(error))

try:
    result = f.result()
except ValueError as e:
    print('principale: visto l\'errore "{}" accedendo a result'.format(e))

Se result() viene chiamato dopo che è stata sollevata una eccezione non gestita all'interno di una funzione di un compito, la stessa eccezione viene sollevata nuovamente nel contesto corrente.

$ python3 futures_future_exception.py

principale: in partenza
5: in partenza
principale: errore: il valore 5 non è buono
principale: visto l'errore "il valore 5 non è buono" accedendo a result

Gestore di Contesto

Gli esecutori lavorano come gestori di contesto, eseguendo compiti in concomitanza ed attendendo che tutti si completino. Quando il gestore di contesto esce, viene chiamato il metodo dell'esecutore shutdown().

# futures_context_manager.py

from concurrent import futures


def task(n):
    print(n)


with futures.ThreadPoolExecutor(max_workers=2) as ex:
    print('principale: in partenza')
    ex.submit(task, 1)
    ex.submit(task, 2)
    ex.submit(task, 3)
    ex.submit(task, 4)

print('principale: fatto')

Questa modalità di utilizzo dell'esecutore è utile quando le risorse del thread o del processo dovrebbero essere ripulite quando l'esecuzione lascia il contesto corrente.

$ python3 futures_context_manager.py

principale: in partenza
1
2
3
4
principale: fatto

Insiemi di Processi

ProcessPoolExecutor funziona allo stesso modo di ThreadPoolExecutor, ma utilizza i processi in luogo dei thread. Questo consente a operazioni che sollecitano intensivamente la CPU di usare una CPU separata e non saranno bloccate dal meccanismo di blocco dell'interprete globale di CPython.

# futures_process_pool_map.py

from concurrent import futures
import os


def task(n):
    return (n, os.getpid())


ex = futures.ProcessPoolExecutor(max_workers=2)
results = ex.map(task, range(5, 0, -1))
for n, pid in results:
    print('escuzione del compito {} in elaborazione {}'.format(n, pid))

Così come nell'insieme di thread, i processi individuali degli elaboratori sono utilizzati per compiti multipli.

$ python3 futures_process_pool_map.py

escuzione del compito 5 in elaborazione 8669
escuzione del compito 4 in elaborazione 8670
escuzione del compito 3 in elaborazione 8669
escuzione del compito 2 in elaborazione 8670
escuzione del compito 1 in elaborazione 8669

Se succede qualcosa ad uno dei processi elaboratori che ne causano l'uscita inaspettata, il ProcessPollExecutor viene considerato "compromesso" e non potrà più pianificare compiti.

# futures_process_pool_broken.py

from concurrent import futures
import os
import signal


with futures.ProcessPoolExecutor(max_workers=2) as ex:
    print('si ottiene il pid per un esecutore')
    f1 = ex.submit(os.getpid)
    pid1 = f1.result()

    print('uccisione del processo {}'.format(pid1))
    os.kill(pid1, signal.SIGHUP)

    print('sottomissione di un altro processo')
    f2 = ex.submit(os.getpid)
    try:
        pid2 = f2.result()
    except futures.process.BrokenProcessPool as e:
        print('non è possibile far partire nuovi compiti: {}'.format(e))

L'eccezione BrokenProcessPool viene in effetti sollevata quando i risultati sono elaborati, non quando il nuovo compito viene sottomesso.

$ python3 futures_process_pool_broken.py

si ottiene il pid per un esecutore
uccisione del processo 8675
sottomissione di un altro processo
non è possibile far partire nuovi compiti: A process in the process pool was terminated abruptly while the future was running or pending.

Vedere anche:

concurrent.futures
La documentazione della libreria standard per questo modulo.
PEP 3148
La proposta per la creazione dell'insieme delle caratteristiche di concurrent.future
Combinare Coroutine con Thread e Processi
threading
multiprocessing