concurrent.futures - Gestisce Insiemi di Compiti Concomitanti
Scopo: Gestire facilmente compiti in esecuzione concomitanti ed in parallelo
Il modulo concurrent.futures fornisce interfacce per eseguire compiti usando insiemi (pool) di thread o elaboratori di processo. Le API sono le stesse, quindi le applicazioni possono passare tra thread e processi con minimi aggiustamenti.
Il modulo fornisce due tipi di classi per interagire con gli insiemi. Gli esecutori sono usati per gestire gruppi di elaboratori e i futures sono usati per gestire i risultati prodotti dagli esecutori. Per usare un gruppo di elaboratori una applicazione crea una istanza della classe di esecutore appropriata, quindi sottopone a essa i compiti da eseguire. Quando si fa partire ciascun compito, viene ritornata una istanza di Future
. Quando è richiesto il risultato di un compito una applicazione può usare Future
per bloccare fino a che si rende disponibile il risultato. Sono fornite varie API per rendere conveniente attendere che un compito si completi, in modo che gli oggetti Future
non debbano essere gestiti direttamente.
Usare map() con un Insieme di Thread Basico
ThreadPoolExecutor
gestisce un insieme di thread di elaboratori, passando a essi i compiti mano a mano che si rendono disponibili per eseguire nuove elaborazioni. Questo esempio usa map()
per produrre in concomitanza un insieme di risultati da un input iterabile. Il compito usa time.sleep()
per mettersi in pausa per un determinato tempo per dimostrare che, a prescindere dall'ordine di esecuzione dei compiti concomitanti, map()
ritorna sempre i valori nell'ordine basato sull'input.
# futures_thread_pool_map.py
from concurrent import futures
import threading
import time
def task(n):
print('{}: in pausa {}'.format(
threading.current_thread().name,
n)
)
time.sleep(n / 10)
print('{}: eseguito con {}'.format(
threading.current_thread().name,
n)
)
return n / 10
ex = futures.ThreadPoolExecutor(max_workers=2)
print('principale: in partenza')
results = ex.map(task, range(5, 0, -1))
print('principale: risultati non elaborati {}'.format(results))
print('principale: in attesa dei veri risultati')
real_results = list(results)
print('principale: resultati: {}'.format(real_results))
Il valore ritornato da map()
è in realtà un tipo speciale di iteratore che sa come attendere ciascuna risposta mano a mano che programma esegue le iterazioni.
$ python3 futures_thread_pool_map.py principale: in partenza ThreadPoolExecutor-0_0: in pausa 5ThreadPoolExecutor-0_1: in pausa 4principale: risultati non elaborati <generator object Executor.map.<locals>.result_iterator at 0x7f7fe09bc740> principale: in attesa dei veri risultati ThreadPoolExecutor-0_1: eseguito con 4 ThreadPoolExecutor-0_1: in pausa 3 ThreadPoolExecutor-0_0: eseguito con 5 ThreadPoolExecutor-0_0: in pausa 2 ThreadPoolExecutor-0_1: eseguito con 3 ThreadPoolExecutor-0_1: in pausa 1 ThreadPoolExecutor-0_0: eseguito con 2 ThreadPoolExecutor-0_1: eseguito con 1 principale: resultati: [0.5, 0.4, 0.3, 0.2, 0.1]
Pianificare Compiti Individuali
Oltre all'utilizzo di map()
, è possibile pianificare un compito individuale con un esecutore usando submit()
, e usare l'istanza di Future
ritornata per attendere il risultato del compito.
# futures_thread_pool_submit.py
from concurrent import futures
import threading
import time
def task(n):
print('{}: in pausa {}'.format(
threading.current_thread().name,
n)
)
time.sleep(n / 10)
print('{}: fatto con {}'.format(
threading.current_thread().name,
n)
)
return n / 10
ex = futures.ThreadPoolExecutor(max_workers=2)
print('principale: starting')
f = ex.submit(task, 5)
print('principale: future: {}'.format(f))
print('principale: in attesa di risultati')
result = f.result()
print('principale: risultato: {}'.format(result))
print('principale: future dopo il risultato: {}'.format(f))
Lo stato del future cambia dopo che i compiti sono completati ed il risultato è reso disponibile.
$ python3 futures_thread_pool_submit.py principale: starting ThreadPoolExecutor-0_0: in pausa 5principale: future: <Future at 0x7f7e15f4ea60 state=running> principale: in attesa di risultati ThreadPoolExecutor-0_0: fatto con 5 principale: risultato: 0.5 principale: future dopo il risultato: <Future at 0x7f7e15f4ea60 state=finished returned float>
Attendere Compiti in Qualsiasi Ordine
La chiamata del metodo result()
di un Future
blocca fino a quando il compito si completa (sia ritornando un valore che sollevando una eccezione) oppure se viene cancellato. I risultati di compiti multipli possono essere fruiti nell'ordine nel quale i compiti sono stati pianificati usando map()
. Qualora non importi l'ordine nel quale i risultati dovranno essere processati, si usi as_completed()
per elaborarli non appena ciascun compito finisce.
# futures_as_completed.py
from concurrent import futures
import random
import time
def task(n):
time.sleep(random.random())
return (n, n / 10)
ex = futures.ThreadPoolExecutor(max_workers=5)
print('principale: in partenza')
wait_for = [
ex.submit(task, i)
for i in range(5, 0, -1)
]
for f in futures.as_completed(wait_for):
print('principale: risultato: {}'.format(f.result()))
Visto che l'insieme ha molti elaboratori come compiti, è possibile far partire tutti i compiti. Essi finiranno in ordine casuale quindi i valori generati da as_completed()
sono diversi ogni volta che l'esempio viene eseguito.
$ python3 futures_as_completed.py principale: in partenza principale: risultato: (3, 0.3) principale: risultato: (5, 0.5) principale: risultato: (4, 0.4) principale: risultato: (2, 0.2) principale: risultato: (1, 0.1)
Callback da Future
Per intraprendere qualche azione quando un compito è completato, senza attendere esplicitamente il risultato, si utilizzi add_done_callback()
per specificare una nuova funzione da chiamare quando Future
ha finito. Il callback dovrebbe essere una funzione che ottiene un singolo argomento, l'istanza di Future
.
# futures_future_callback.py
from concurrent import futures
import time
def task(n):
print('{}: in pausa'.format(n))
time.sleep(0.5)
print('{}: fatto'.format(n))
return n / 10
def done(fn):
if fn.cancelled():
print('{}: cancellato'.format(fn.arg))
elif fn.done():
error = fn.exception()
if error:
print('{}: errore restituito: {}'.format(
fn.arg, error))
else:
result = fn.result()
print('{}: valore restituito: {}'.format(
fn.arg, result))
if __name__ == '__main__':
ex = futures.ThreadPoolExecutor(max_workers=2)
print('principale: in partenza')
f = ex.submit(task, 5)
f.arg = 5
f.add_done_callback(done)
result = f.result()
Il callback viene chiamato a prescindere dalla ragione per la quale Future
si considera "completato", quindi è necessario verificare lo stato dell'oggetto passato al callback prima di utilizzarlo in qualsivoglia modo.
$ python3 futures_future_callback.py principale: in partenza 5: in pausa 5: fatto 5: valore restituito: 0.5
Cancellare Compiti
Un Future
può essere cancellato, se è stato sottomesso ma non è ancora partito, chiamando il metodo cancel()
.
# futures_future_callback_cancel.py
from concurrent import futures
import time
def task(n):
print('{}: in pausa'.format(n))
time.sleep(0.5)
print('{}: fatto'.format(n))
return n / 10
def done(fn):
if fn.cancelled():
print('{}: cancellato'.format(fn.arg))
elif fn.done():
print('{}: non cancellato'.format(fn.arg))
if __name__ == '__main__':
ex = futures.ThreadPoolExecutor(max_workers=2)
print('principale: in partenza')
tasks = []
for i in range(10, 0, -1):
print('principale: sottomesso {}'.format(i))
f = ex.submit(task, i)
f.arg = i
f.add_done_callback(done)
tasks.append((i, f))
for i, t in reversed(tasks):
if not t.cancel():
print('principale: non si cancella {}'.format(i))
ex.shutdown()
cancel()
ritorna un booleano che indica se il compito sia stato cancellato o meno.
$ python3 futures_future_callback_cancel.py principale: in partenza principale: sottomesso 10 10: in pausa principale: sottomesso 9 9: in pausaprincipale: sottomesso 8 principale: sottomesso 7 principale: sottomesso 6 principale: sottomesso 5 principale: sottomesso 4 principale: sottomesso 3 principale: sottomesso 2 principale: sottomesso 1 1: cancellato 2: cancellato 3: cancellato 4: cancellato 5: cancellato 6: cancellato 7: cancellato 8: cancellato principale: non si cancella 9 principale: non si cancella 10 10: fatto 10: non cancellato 9: fatto 9: non cancellato
Eccezioni nei Compiti
Se un compito solleva una eccezione non gestita, viene salvata nel Future
per il compito e resa disponibile tramite i metodi result()
o exception()
.
# futures_future_exception.py
from concurrent import futures
def task(n):
print('{}: in partenza'.format(n))
raise ValueError('il valore {} non è buono'.format(n))
ex = futures.ThreadPoolExecutor(max_workers=2)
print('principale: in partenza')
f = ex.submit(task, 5)
error = f.exception()
print('principale: errore: {}'.format(error))
try:
result = f.result()
except ValueError as e:
print('principale: visto l\'errore "{}" accedendo a result'.format(e))
Se result()
viene chiamato dopo che è stata sollevata una eccezione non gestita all'interno di una funzione di un compito, la stessa eccezione viene sollevata nuovamente nel contesto corrente.
$ python3 futures_future_exception.py principale: in partenza 5: in partenza principale: errore: il valore 5 non è buono principale: visto l'errore "il valore 5 non è buono" accedendo a result
Gestore di Contesto
Gli esecutori lavorano come gestori di contesto, eseguendo compiti in concomitanza ed attendendo che tutti si completino. Quando il gestore di contesto esce, viene chiamato il metodo dell'esecutore shutdown()
.
# futures_context_manager.py
from concurrent import futures
def task(n):
print(n)
with futures.ThreadPoolExecutor(max_workers=2) as ex:
print('principale: in partenza')
ex.submit(task, 1)
ex.submit(task, 2)
ex.submit(task, 3)
ex.submit(task, 4)
print('principale: fatto')
Questa modalità di utilizzo dell'esecutore è utile quando le risorse del thread o del processo dovrebbero essere ripulite quando l'esecuzione lascia il contesto corrente.
$ python3 futures_context_manager.py principale: in partenza 1 2 3 4 principale: fatto
Insiemi di Processi
ProcessPoolExecutor
funziona allo stesso modo di ThreadPoolExecutor
, ma utilizza i processi in luogo dei thread. Questo consente a operazioni che sollecitano intensivamente la CPU di usare una CPU separata e non saranno bloccate dal meccanismo di blocco dell'interprete globale di CPython.
# futures_process_pool_map.py
from concurrent import futures
import os
def task(n):
return (n, os.getpid())
ex = futures.ProcessPoolExecutor(max_workers=2)
results = ex.map(task, range(5, 0, -1))
for n, pid in results:
print('escuzione del compito {} in elaborazione {}'.format(n, pid))
Così come nell'insieme di thread, i processi individuali degli elaboratori sono utilizzati per compiti multipli.
$ python3 futures_process_pool_map.py escuzione del compito 5 in elaborazione 8669 escuzione del compito 4 in elaborazione 8670 escuzione del compito 3 in elaborazione 8669 escuzione del compito 2 in elaborazione 8670 escuzione del compito 1 in elaborazione 8669
Se succede qualcosa ad uno dei processi elaboratori che ne causano l'uscita inaspettata, il ProcessPollExecutor
viene considerato "compromesso" e non potrà più pianificare compiti.
# futures_process_pool_broken.py
from concurrent import futures
import os
import signal
with futures.ProcessPoolExecutor(max_workers=2) as ex:
print('si ottiene il pid per un esecutore')
f1 = ex.submit(os.getpid)
pid1 = f1.result()
print('uccisione del processo {}'.format(pid1))
os.kill(pid1, signal.SIGHUP)
print('sottomissione di un altro processo')
f2 = ex.submit(os.getpid)
try:
pid2 = f2.result()
except futures.process.BrokenProcessPool as e:
print('non è possibile far partire nuovi compiti: {}'.format(e))
L'eccezione BrokenProcessPool
viene in effetti sollevata quando i risultati sono elaborati, non quando il nuovo compito viene sottomesso.
$ python3 futures_process_pool_broken.py si ottiene il pid per un esecutore uccisione del processo 8675 sottomissione di un altro processo non è possibile far partire nuovi compiti: A process in the process pool was terminated abruptly while the future was running or pending.
Vedere anche:
- concurrent.futures
- La documentazione della libreria standard per questo modulo.
- PEP 3148
- La proposta per la creazione dell'insieme delle caratteristiche di concurrent.future
- Combinare Coroutine con Thread e Processi
- threading
- multiprocessing