threading - Gestire Operazioni Concomitanti all'Interno di un Processo
Scopo: Gestisce parecchi thread di esecuzioni
L'utilizzo di thread consente a un programma di eseguire operazioni multiple in concomitanza nello stesso spazio di processo.
Oggetti Thread
Il modo più semplice di usare un thread è di istanziare Thread
con una funzione obiettivo, chiamare start()
e lasciare che inizi a lavorare.
# threading_simple.py
import threading
def worker():
"""funzione in esecuzione nel thread"""
print('Esecutore')
threads = []
for i in range(5):
t = threading.Thread(target=worker)
threads.append(t)
t.start()
Il risultato è costituito da cinque righe, ciascuna contenente la parola "Esecutore".
$ python3 threading_simple.py Esecutore Esecutore Esecutore Esecutore Esecutore
E' utile essere in grado di distribuire un thread e passargli argomenti per dirgli quale lavoro svolgere. Un qualsiasi tipo di oggetto può essere passato come argomento al thread. Questo esempio passa un numero, che il thread stampa.
# threading_simpleargs.py
import threading
def worker(num):
"""Funzione in esecuzione nel thread"""
print('Esecutore: %s' % num)
threads = []
for i in range(5):
t = threading.Thread(target=worker, args=(i,))
threads.append(t)
t.start()
L'argomento intero ora viene incluso nel messaggio stampato da ciascun thread.
$ python3 threading_simpleargs.py Esecutore: 0 Esecutore: 1 Esecutore: 2 Esecutore: 3 Esecutore: 4
Determinare il Thread Corrente
Utilizzare argomenti per identificare o nominare il thread è poco gestibile e non necessario. Ogni istanza di Thread
ha un nome con un valore predefinito che può essere modificato non appena il thread è creato. Attribuire nomi ai thread è utile in processi server con
# threading_names.py
import threading
import time
def worker():
print(threading.current_thread().getName(), 'Partenza')
time.sleep(0.2)
print(threading.current_thread().getName(), 'Uscita')
def my_service():
print(threading.current_thread().getName(), 'Partenza')
time.sleep(0.3)
print(threading.current_thread().getName(), 'Uscita')
t = threading.Thread(name='il_mio_servizio', target=my_service)
w = threading.Thread(name='esecutore', target=worker)
w2 = threading.Thread(target=worker) # usa nome predefinito
w.start()
w2.start()
t.start()
Il risultato del debug include il nome del thread corrente su ciascuna riga. Le righe con "Thread-1
" nella colonna del nome del thread corrispondono al thread senza nome w2
.
$ python3 threading_names.py esecutore Partenza Thread-1 Partenza il_mio_servizio Partenza esecutore Uscita Thread-1 Uscita il_mio_servizio Uscita
Molti programmi non usano print
per debug. Il modulo logging supporta l'incorporazione del nome thread in ogni messaggio registrato usando il codice di formattazione %(threadname)s
. Includere i nomi dei thread nel messaggi registrati rende possibile la tracciatura di detti messaggi verso la loro sorgente.
# threading_names_log.py
import logging
import threading
import time
def worker():
logging.debug('Partenza')
time.sleep(0.2)
logging.debug('Uscita')
def my_service():
logging.debug('Partenza')
time.sleep(0.3)
logging.debug('Uscita')
logging.basicConfig(
level=logging.DEBUG,
format='[%(levelname)s] (%(threadName)-10s) %(message)s',
)
t = threading.Thread(name='il_mio_servizio', target=my_service)
w = threading.Thread(name='esecutore', target=worker)
w2 = threading.Thread(target=worker) # usa nome predefinito
w.start()
w2.start()
t.start()
logging è anche thread-safe, quindi i messaggi da diversi thread sono mantenuti distinti nel risultato.
$ python3 threading_names_log.py [DEBUG] (esecutore ) Partenza [DEBUG] (Thread-1 ) Partenza [DEBUG] (il_mio_servizio) Partenza [DEBUG] (esecutore ) Uscita [DEBUG] (Thread-1 ) Uscita [DEBUG] (il_mio_servizio) Uscita
Thread Demoni contro Thread Non Demoni
Fino a questo punto, i programmi di esempio sono scritti in modo da attendere implicitamente di uscire fino a quando tutti i thread hanno completato il proprio lavoro. Talvolta i programmi producono un thread come demone che viene eseguito senza impedire l'uscita del programma principale. L'utilizzo dei thread come demoni è utile per servizi dove potrebbe non esserci un modo semplice per interrompere il thread, o deve lasciar morire il thread nel mezzo dell'esecuzione del proprio lavoro non comporta una perdita o corruzione di dati (ad esempio un thread che genera "battiti" per uno strumento di monitoraggio di un servizio). Per contrassegnare un thread come demone si passi daemon=True
quando si lo si costruisce oppure si chiami set_daemon()
con True
. La modalità predefinita per i thread è non demone.
# threading_daemon.py
import threading
import time
import logging
def daemon():
logging.debug('Partenza')
time.sleep(0.2)
logging.debug('Uscita')
def non_daemon():
logging.debug('Partenza')
logging.debug('Uscita')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='demone', target=daemon, daemon=True)
t = threading.Thread(name='non-demone', target=non_daemon)
d.start()
t.start()
Il risultato non comprende il messaggio "Uscita
" dal thread demone, visto che tutti i thread non demoni (compreso quello principale) sono usciti prima che il thread demone si "risvegli" dalla chiamata di sleep()
.
$ python3 threading_daemon.py (demone ) Partenza (non-demone) Partenza (non-demone) Uscita
Per attendere che un thread demone abbia completato il proprio lavoro, si usi il metodo join()
.
# threading_daemon_join.py
import threading
import time
import logging
def daemon():
logging.debug('Partenza')
time.sleep(0.2)
logging.debug('Uscita')
def non_daemon():
logging.debug('Partenza')
logging.debug('Uscita')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='demone', target=daemon, daemon=True)
t = threading.Thread(name='non-demone', target=non_daemon)
d.start()
t.start()
d.join()
t.join()
L'attendere l'uscita del thread demone usando join()
fa sì che ci sia la possibilità di produrre il messaggio "Uscita
".
$ python3 threading_daemon_join.py (demone ) Partenza (non-demone) Partenza (non-demone) Uscita (demone ) Uscita
Nella modalità predefinita, join()
blocca a tempo indeterminato. E' anche possibile passare un valore a virgola mobile che rappresenti il numero di secondi da attendere prima che il thread diventi inattivo. Se il thread non si completa in quel lasso di tempo, join()
ritorna comunque.
# threading_daemon_join_timeout.py
import threading
import time
import logging
def daemon():
logging.debug('Partenza')
time.sleep(0.2)
logging.debug('Uscita')
def non_daemon():
logging.debug('Partenza')
logging.debug('Uscita')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
d = threading.Thread(name='demone', target=daemon, daemon=True)
t = threading.Thread(name='non-demone', target=non_daemon)
d.start()
t.start()
d.join(0.1)
print('d.isAlive()', d.isAlive())
t.join()
Visto che il periodo di attesa passato è minore di quello nel quale il thread viene messo in pausa, il thread è ancora "vivo" dopo che join()
ritorna.
$ python3 threading_daemon_join_timeout.py (demone ) Partenza (non-demone) Partenza (non-demone) Uscita threading_daemon_join_timeout.py:32: DeprecationWarning: isAlive() is deprecated, use is_alive() instead print('d.isAlive()', d.isAlive()) d.isAlive() True
Enumerare tutti i Thread
Non è necessario mantenere un handle esplicito per tutti i thread demoni per assicurarsi che siano completati prima di uscire dal processo principale. enumerate()
ritorna una lista delle istanze attive di Thread
. La lista comprende il thread corrente, e visto che effettuare il join()
sul thread corrente introdurrebbe una situazione di stallo, deve essere ignorato.
# threading_enumerate.py
import random
import threading
import time
import logging
def worker():
"""thread worker function"""
pause = random.randint(1, 5) / 10
logging.debug('in pausa %0.2f', pause)
time.sleep(pause)
logging.debug('terminato')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
for i in range(3):
t = threading.Thread(target=worker, daemon=True)
t.start()
main_thread = threading.main_thread()
for t in threading.enumerate():
if t is main_thread:
continue
logging.debug('operazione di join %s', t.getName())
t.join()
Visto che l'elaboratore è in pausa per un periodo casuale di tempo, il risultato da questo programma potrebbe variare.
$ python3 threading_enumerate.py (Thread-1 ) in pausa 0.20 (Thread-2 ) in pausa 0.50 (Thread-3 ) in pausa 0.20 (MainThread) operazione di join Thread-1 (Thread-1 ) terminato (MainThread) operazione di join Thread-2 (Thread-3 ) terminato (Thread-2 ) terminato (MainThread) operazione di join Thread-3
Subclassare i Thread
Alla partenza, Thread
esegue alcune basiche inizializzazioni, quindi chiama il proprio metodo run()
, che a sua volta chiama la funzione passata al costruttore. Per creare una sottoclasse di Thread
si sovrascriva run()
per eseguire qualsiasi cosa si ritenga necessario.
# threading_subclass.py
import threading
import logging
class MyThread(threading.Thread):
def run(self):
logging.debug('in esecuzione')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
for i in range(5):
t = MyThread()
t.start()
Il valore di ritorno da run()
viene ignorato.
$ python3 threading_subclass.py (Thread-1 ) in esecuzione (Thread-2 ) in esecuzione (Thread-3 ) in esecuzione (Thread-4 ) in esecuzione (Thread-5 ) in esecuzione
Visto che i valori di args
e kwargs
passati al costruttore di Thread
sono salvati in variabili private usando nomi prefissati da "__
", non sono facilmente accessibili da una sottoclasse. Per passare argomenti ad un tipo di thread personalizzato, si ridefinisca il costruttore per salvare i valori in un attributo di istanza che possa essere visto nella sottoclasse.
# threading_subclass_args.py
import threading
import logging
class MyThreadWithArgs(threading.Thread):
def __init__(self, group=None, target=None, name=None,
args=(), kwargs=None, *, daemon=None):
super().__init__(group=group, target=target, name=name,
daemon=daemon)
self.args = args
self.kwargs = kwargs
def run(self):
logging.debug('in esecuzione con %s e %s',
self.args, self.kwargs)
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
for i in range(5):
t = MyThreadWithArgs(args=(i,), kwargs={'a': 'A', 'b': 'B'})
t.start()
MyThreadWithArgs
usa la stessa API di Thread
, ma un'altra classe potrebbe facilmente cambiare il metodo costruttore per ricevere più o differenti argomenti più direttamente legati allo scopo del thread, così come una qualsiasi altra classe.
$ python3 threading_subclass_args.py (Thread-1 ) in esecuzione con (0,) e {'a': 'A', 'b': 'B'} (Thread-2 ) in esecuzione con (1,) e {'a': 'A', 'b': 'B'} (Thread-3 ) in esecuzione con (2,) e {'a': 'A', 'b': 'B'} (Thread-4 ) in esecuzione con (3,) e {'a': 'A', 'b': 'B'} (Thread-5 ) in esecuzione con (4,) e {'a': 'A', 'b': 'B'}
Thread in Timer
Un esempio di una ragione per derivare Thread
è fornito da Timer
, anch'esso incluso in threading. Un Timer
inizia il suo lavoro dopo un differimento, e può essere cancellato in qualsiasi momento all'interno di quel periodo di differimento.
# threading_timer.py
import threading
import time
import logging
def delayed():
logging.debug('elaboratore in esecuzione')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
t1 = threading.Timer(0.3, delayed)
t1.setName('t1')
t2 = threading.Timer(0.3, delayed)
t2.setName('t2')
logging.debug('timer partiti')
t1.start()
t2.start()
logging.debug('in attesa prima di cancellare %s', t2.getName())
time.sleep(0.2)
logging.debug('cancellazione %s', t2.getName())
t2.cancel()
logging.debug('fatto')
In questo esempio il secondo timer non viene mai eseguito, ed il primo sembra venga eseguito dopo che il resto del programma principale ha finito. Visto che non è un thread demone, viene implicitamente unito quando il thread principale termina.
$ python3 threading_timer.py (MainThread) timer partiti (MainThread) in attesa prima di cancellare t2 (MainThread) cancellazione t2 (MainThread) fatto (t1 ) elaboratore in esecuzione
Segnalazioni fra Thread
Anche se lo scopo dell'usare thread multipli è di eseguire in concorrenza operazioni separate, ci sono volte nelle quali è importante essere in grado di sincronizzare le operazioni in due o più thread. Un semplice modo per comunicare tra thread in sicurezza è rappresentato da oggetti evento. Un oggetto Event
gestisce un flag interno che i chiamanti possono controllare tramite i metodi set()
e clear()
. Altri thread possono usare wait()
per mettersi in pausa fino a che il flag è impostato, di fatto bloccandosi fino a quando gli si consente di continuare.
# threading_event.py
import logging
import threading
import time
def wait_for_event(e):
"""Attende che l'evento sia impostato prima di fare qualsiasi cosa"""
logging.debug('wait_for_event in partenza')
event_is_set = e.wait()
logging.debug('evento impostato: %s', event_is_set)
def wait_for_event_timeout(e, t):
"""Wait t seconds and then timeout"""
while not e.is_set():
logging.debug('wait_for_event_timeout in partenza')
event_is_set = e.wait(t)
logging.debug('evento impostato: %s', event_is_set)
if event_is_set:
logging.debug('evento in elaborazione')
else:
logging.debug('altro lavoro in esecuzione')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
e = threading.Event()
t1 = threading.Thread(
name='block',
target=wait_for_event,
args=(e,),
)
t1.start()
t2 = threading.Thread(
name='nonblock',
target=wait_for_event_timeout,
args=(e, 2),
)
t2.start()
logging.debug('In attesa prima di chiamare Event.set()')
time.sleep(0.3)
e.set()
logging.debug('Event è impostato')
Il metodo wait()
riceve un argomento che rappresenta il numero di secondi da attendere prima che l'evento vada in time out. Ritorna un booleano che indica se l'evento sia impostato o meno, in modo che il chiamante sappia perchè wait()
è ritornato. Il metodo is_set()
può essere usato separatamente sull'evento senza temere di bloccarlo.
In questo esempio, wait_for_event_timeout()
verifica lo stato dell'evento senza bloccare a tempo indeterminato. wait_for_event()
blocca sulla chiamata a wait()
, che non ritorna fino a quando lo stato dell'evento cambia.
$ python3 threading_event.py (block ) wait_for_event in partenza (nonblock ) wait_for_event_timeout in partenza (MainThread) In attesa prima di chiamare Event.set() (MainThread) Event è impostato (nonblock ) evento impostato: True (nonblock ) evento in elaborazione (block ) evento impostato: True
Controllare l'Accesso alle Risorse
Oltre alla sincronizzazione delle operazioni dei thread, è anche importante essere in grado di controllare l'accesso a risorse condivise per prevenire un danneggiamento od una perdita di dati. Le strutture dati built-in di Python sono thread-safe come effetto collaterale dell'avere byte-code atomici per manipolarle (il bloccaggio dell'interprete globale usato per proteggere le strutture dati interne di Python non viene rilasciato nel mezzo di un aggiornamento). Altre strutture dati implementate in Python, o tipi più semplici come interi e cifre a virgola mobile, non hanno questa protezione. Per ripararsi da accessi simultanei ad un oggetto, si utilizzi un oggetto Lock
.
# threading_lock.py
import logging
import random
import threading
import time
class Counter:
def __init__(self, start=0):
self.lock = threading.Lock()
self.value = start
def increment(self):
logging.debug('In attesa del bloccaggio')
self.lock.acquire()
try:
logging.debug('Bloccaggio acquisito')
self.value = self.value + 1
finally:
self.lock.release()
def worker(c):
for i in range(2):
pause = random.random()
logging.debug('In pausa %0.02f', pause)
time.sleep(pause)
c.increment()
logging.debug('Fatto')
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
counter = Counter()
for i in range(2):
t = threading.Thread(target=worker, args=(counter,))
t.start()
logging.debug('In attesa dei thread elaboratori')
main_thread = threading.main_thread()
for t in threading.enumerate():
if t is not main_thread:
t.join()
logging.debug('Contatorer: %d', counter.value)
In questo esempio la funzione worker()
incrementa una istanza di Counter
che gestisce un Lock
per prevenire che due thread cambino il loro stato interno allo stesso tempo. Se non fosse stato usato Lock
, ci sarebbe stata la possibilità di non rilevare un cambiamento nel valore dell'attributo.
$ python3 threading_lock.py (Thread-1 ) In pausa 0.44 (Thread-2 ) In pausa 0.79 (MainThread) In attesa dei thread elaboratori (Thread-1 ) In attesa del bloccaggio (Thread-1 ) Bloccaggio acquisito (Thread-1 ) In pausa 0.78 (Thread-2 ) In attesa del bloccaggio (Thread-2 ) Bloccaggio acquisito (Thread-2 ) In pausa 0.87 (Thread-1 ) In attesa del bloccaggio (Thread-1 ) Bloccaggio acquisito (Thread-1 ) Fatto (Thread-2 ) In attesa del bloccaggio (Thread-2 ) Bloccaggio acquisito (Thread-2 ) Fatto (MainThread) Contatorer: 4
Per scoprire se un altro thread ha acquisito il bloccaggio senza sostenere il thread corrente, si passi False
per l'argomento blocking
per acquire()
. Nel prossimo esempio, worker()
tenta di acquisire il bloccaggio per tre volte distinte e conta quanti tentativi ha fatto per questo scopo. Nel frattempo, lock_holder()
passa tra il mantenere e rilasciare il bloccaggio, con piccole pause in ogni stato usate per simulare il caricamento.
# threading_lock_noblock.py
import logging
import threading
import time
def lock_holder(lock):
logging.debug('In partenza')
while True:
lock.acquire()
try:
logging.debug('Trattenuto')
time.sleep(0.5)
finally:
logging.debug('Non trattenuto')
lock.release()
time.sleep(0.5)
def worker(lock):
logging.debug('In partenza')
num_tries = 0
num_acquires = 0
while num_acquires < 3:
time.sleep(0.5)
logging.debug('Tentativo di acquisizione')
have_it = lock.acquire(0)
try:
num_tries += 1
if have_it:
logging.debug('Iterazione %d: acquisita',
num_tries)
num_acquires += 1
else:
logging.debug('Iterazione %d: Non acquisita',
num_tries)
finally:
if have_it:
lock.release()
logging.debug('Terminato dopo %d iterazioni', num_tries)
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
lock = threading.Lock()
holder = threading.Thread(
target=lock_holder,
args=(lock,),
name='LockHolder',
daemon=True,
)
holder.start()
worker = threading.Thread(
target=worker,
args=(lock,),
name='Worker',
)
worker.start()
Occorrono a worker()
più di tre iterazioni per acquisire il bloccaggio per tre volte distinte.
$ python3 threading_lock_noblock.py (LockHolder) In partenza (LockHolder) Trattenuto (Worker ) In partenza (LockHolder) Non trattenuto (Worker ) Tentativo di acquisizione (Worker ) Iterazione 1: acquisita (LockHolder) Trattenuto (Worker ) Tentativo di acquisizione (Worker ) Iterazione 2: Non acquisita (LockHolder) Non trattenuto (Worker ) Tentativo di acquisizione (Worker ) Iterazione 3: acquisita (LockHolder) Trattenuto (Worker ) Tentativo di acquisizione (Worker ) Iterazione 4: Non acquisita (LockHolder) Non trattenuto (Worker ) Tentativo di acquisizione (Worker ) Iterazione 5: acquisita (Worker ) Terminato dopo 5 iterazioni
Bloccaggi Rientranti
Gli oggetti Lock
normali non possono essere acquisiti più di una volta, anche se nello stesso thread. Questo può introdurre effetti collaterali indesiderati se il bloccaggio viene indirizzato da più di una funzione nella stessa catena di chiamate.
# threading_lock_reacquire.py
import threading
lock = threading.Lock()
print('Primo tentativo :', lock.acquire())
print('Secondo tentativo:', lock.acquire(0))
In questo caso, alla seconda chiamata ad acquire()
viene dato un timeout di zero per prevenirne il bloccaggio visto che lo stesso è stato ottenuto dalla prima chiamata.
$ python3 threading_lock_reacquire.py Primo tentativo : True Secondo tentativo: False
In una situazione dove codice separato dallo stesso thread deve "riacquisire" il bloccaggio si utilizzi RLock
.
# threading_rlock.py
import threading
lock = threading.RLock()
print('Primo tentativo :', lock.acquire())
print('Secondo tentativo:', lock.acquire(0))
Il solo cambiamento nel codice rispetto all'esempio precedente è la sostituzione di Lock
con RLock
.
$ python3 threading_rlock.py Primo tentativo : True Secondo tentativo: True
Bloccaggi come Gestori di Contesto
I bloccaggi implementano le API del gestore di contesto se sono compatibili con l'istruzione with
. L'utilizzo di with
consente di rimuovere la necessità di acquisire e rilasciare esplicitamente il bloccaggio.
# threading_lock_with.py
import threading
import logging
def worker_with(lock):
with lock:
logging.debug('Bloccaggio acquisito via with')
def worker_no_with(lock):
lock.acquire()
try:
logging.debug('Lock acquisito direttamente')
finally:
lock.release()
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
lock = threading.Lock()
w = threading.Thread(target=worker_with, args=(lock,))
nw = threading.Thread(target=worker_no_with, args=(lock,))
w.start()
nw.start()
Le due funzioni worker_with()
e worker_no_with()
gestiscono il bloccaggio in modo equivalente.
$ python3 threading_lock_with.py (Thread-1 ) Bloccaggio acquisito via with (Thread-2 ) Lock acquisito direttamente
Sincronizzare i Thread
Oltre all'utilizzo di Events
, un altro modo di sincronizzare i thread è tramite l'oggetto Condition
. Visto che Condition
usa Lock
, potrebbe essere legato ad una risorsa condivisa, consentendo a thread multipli di attendere che la risorsa sia aggiornata. In questo esempio, i thread in consumer()
attendono che la condizione venga impostata in Condition
prima di continuare. Il thread producer()
è responsabile dell'impostazione della condizione e della notifica di via libera agli altri thread.
# threading_condition.py
import logging
import threading
import time
def consumer(cond):
"""Attende la condizione ed usa la risorsa"""
logging.debug('Partenza del thread consumer ')
with cond:
cond.wait()
logging.debug('La risorsa è disponibile per consumer')
def producer(cond):
"""Imposta la risorsa che viene usata da consumer"""
logging.debug('Partenza del thread producer')
with cond:
logging.debug('Si rende disponibile la risorsa')
cond.notifyAll()
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s (%(threadName)-2s) %(message)s',
)
condition = threading.Condition()
c1 = threading.Thread(name='c1', target=consumer,
args=(condition,))
c2 = threading.Thread(name='c2', target=consumer,
args=(condition,))
p = threading.Thread(name='p', target=producer,
args=(condition,))
c1.start()
time.sleep(0.2)
c2.start()
time.sleep(0.2)
p.start()
I thread usano with
per acquisire il bloccaggio associato a Condition
. La cosa funziona anche utilizzando i metodi acquire()
e release()
esplicitamente.
$ python3 threading_condition.py 2021-06-03 09:16:31,771 (c1) Partenza del thread consumer 2021-06-03 09:16:31,972 (c2) Partenza del thread consumer 2021-06-03 09:16:32,172 (p ) Partenza del thread producer 2021-06-03 09:16:32,172 (p ) Si rende disponibile la risorsa 2021-06-03 09:16:32,173 (c1) La risorsa è disponibile per consumer 2021-06-03 09:16:32,173 (c2) La risorsa è disponibile per consumer
La barriera è un altro meccanismo di sincronizzazione dei thread. Barrier
imposta un punto di controllo e tutti i thread partecipanti si bloccano fino a che tutte le parti in causa hanno raggiunto quel punto. Consente ai thread di partire separatamente, quindi interrompe fino a che tutti sono pronti a procedere.
# threading_barrier.py
import threading
import time
def worker(barrier):
print(threading.current_thread().name,
'In attesa della barriera con altri {}'.format(
barrier.n_waiting))
worker_id = barrier.wait()
print(threading.current_thread().name, 'dopo la barriera',
worker_id)
NUM_THREADS = 3
barrier = threading.Barrier(NUM_THREADS)
threads = [
threading.Thread(
name='worker-%s' % i,
target=worker,
args=(barrier,),
)
for i in range(NUM_THREADS)
]
for t in threads:
print(t.name, 'partenza')
t.start()
time.sleep(0.1)
for t in threads:
t.join()
In questo esempio, la barriera Barrier
viene configurata per bloccare fino a quando ci sono 3 thread in attesa. Quando la condizione viene soddisfatta, i thread sono rilasciati oltre il punto di controllo simultaneamente. Il valore di ritorno di wait()
indica il numero di parti che sono state rilasciate, e può essere usato per limitare alcuni thread dall'intraprendere una azione tipo la pulizia di una risorsa condivisa.
$ python3 threading_barrier.py worker-0 partenza worker-0 In attesa della barriera con altri 0 worker-1 partenza worker-1 In attesa della barriera con altri 1 worker-2 partenza worker-2 In attesa della barriera con altri 2 worker-2 dopo la barriera 2 worker-1 dopo la barriera 1 worker-0 dopo la barriera 0
Il metodo abort()
di Barrier
fa sì che tutti i thread in attesa ricevano un segnale BrokenBarrierError
, che consente ai thread di eseguire una pulizia se il processo viene interrotto mentre sono bloccati in wait()
.
# threading_barrier_abort.py
import threading
import time
def worker(barrier):
print(threading.current_thread().name,
'in attesa della barriera con altri {}'.format(
barrier.n_waiting))
try:
worker_id = barrier.wait()
except threading.BrokenBarrierError:
print(threading.current_thread().name, 'interruzione')
else:
print(threading.current_thread().name, 'dopo la barriera',
worker_id)
NUM_THREADS = 3
barrier = threading.Barrier(NUM_THREADS + 1)
threads = [
threading.Thread(
name='worker-%s' % i,
target=worker,
args=(barrier,),
)
for i in range(NUM_THREADS)
]
for t in threads:
print(t.name, 'partenza')
t.start()
time.sleep(0.1)
barrier.abort()
for t in threads:
t.join()
Questo esempio configura Barrier
per attendersi un ulteriore thread partecipante rispetto a quanti realmente partiti, quindi l'elaborazione in tutti i thread è bloccata. La chiamata di abort()
solleva una eccezione in ogni thread bloccato.
$ python3 threading_barrier_abort.py worker-0 partenza worker-0 in attesa della barriera con altri 0 worker-1 partenza worker-1 in attesa della barriera con altri 1 worker-2 partenza worker-2 in attesa della barriera con altri 2 worker-0 interruzione worker-2 interruzioneworker-1 interruzione
Limitare l'Accesso Concorrenziale alle Risorse
Talvolta è utile consentire a più di un elaboratore l'accesso contemporaneo ad una risorsa, mantenendo sempre la possibilità di limitarne il numero complessivo. Ad esempio un pool di connessioni potrebbe supportare un numero definito di connessioni simultanee, oppure una applicazione di rete potrebbe supportare un numero definito di scaricamenti concorrenti. Un modo per gestire queste connessioni può essere Semaphore
.
# threading_semaphore.py
import logging
import random
import threading
import time
class ActivePool:
def __init__(self):
super(ActivePool, self).__init__()
self.active = []
self.lock = threading.Lock()
def makeActive(self, name):
with self.lock:
self.active.append(name)
logging.debug('In esecuzione: %s', self.active)
def makeInactive(self, name):
with self.lock:
self.active.remove(name)
logging.debug('In esecuzione: %s', self.active)
def worker(s, pool):
logging.debug('In attesa di raggiungere il pool')
with s:
name = threading.current_thread().getName()
pool.makeActive(name)
time.sleep(0.1)
pool.makeInactive(name)
logging.basicConfig(
level=logging.DEBUG,
format='%(asctime)s (%(threadName)-2s) %(message)s',
)
pool = ActivePool()
s = threading.Semaphore(2)
for i in range(4):
t = threading.Thread(
target=worker,
name=str(i),
args=(s, pool),
)
t.start()
In questo esempio la classe ActivePool
serve semplicemente per tracciare con comodità quali thread possono essere eseguiti ad un dato momento. Un pool di risorse reale dovrebbe allocare una connessione o qualche altro valore al thread appena attivato, e riprendersi quel valore quando il thread ha concluso. Qui viene semplicemente usato per mantenere i nomi dei thread attivi per mostrare che sono in esecuzione al massimo due thread contemporaneamente.
$ python3 threading_semaphore.py 2021-06-03 09:16:32,925 (0 ) In attesa di raggiungere il pool 2021-06-03 09:16:32,925 (0 ) In esecuzione: ['0'] 2021-06-03 09:16:32,925 (1 ) In attesa di raggiungere il pool 2021-06-03 09:16:32,925 (1 ) In esecuzione: ['0', '1'] 2021-06-03 09:16:32,926 (2 ) In attesa di raggiungere il pool 2021-06-03 09:16:32,926 (3 ) In attesa di raggiungere il pool 2021-06-03 09:16:33,025 (0 ) In esecuzione: ['1'] 2021-06-03 09:16:33,026 (2 ) In esecuzione: ['1', '2'] 2021-06-03 09:16:33,026 (1 ) In esecuzione: ['2'] 2021-06-03 09:16:33,026 (3 ) In esecuzione: ['2', '3'] 2021-06-03 09:16:33,126 (2 ) In esecuzione: ['3'] 2021-06-03 09:16:33,127 (3 ) In esecuzione: []
Dati Specifici al Thread
Mentre alcune risorse devono essere bloccate in modo che possano essere usate da thread multipli, le altre devono essere protette in modo che siano nascoste ai thread che le detengono. La classe local()
crea un oggetto in grado di nascondere i valori dalla vista in thread separati.
# threading_local.py
import random
import threading
import logging
def show_value(data):
try:
val = data.value
except AttributeError:
logging.debug('Ancora nessun valore')
else:
logging.debug('value=%s', val)
def worker(data):
show_value(data)
data.value = random.randint(1, 100)
show_value(data)
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
local_data = threading.local()
show_value(local_data)
local_data.value = 1000
show_value(local_data)
for i in range(2):
t = threading.Thread(target=worker, args=(local_data,))
t.start()
L'attributo local_data.value
non è presente per tutti gli altri thread fino a quando rimane impostato in quel thread.
$ python3 threading_local.py (MainThread) Ancora nessun valore (MainThread) value=1000 (Thread-1 ) Ancora nessun valore (Thread-1 ) value=11 (Thread-2 ) Ancora nessun valore (Thread-2 ) value=30
Per inizializzare le impostazioni in modo che tutti i thread partano con lo stesso valore, si utilizzi una sottoclasse e si impostino gli attributi in __init__()
.
# threading_local_defaults.py
import random
import threading
import logging
def show_value(data):
try:
val = data.value
except AttributeError:
logging.debug('Ancora nessun valore')
else:
logging.debug('value=%s', val)
def worker(data):
show_value(data)
data.value = random.randint(1, 100)
show_value(data)
class MyLocal(threading.local):
def __init__(self, value):
super().__init__()
logging.debug('Inizializzazione %r', self)
self.value = value
logging.basicConfig(
level=logging.DEBUG,
format='(%(threadName)-10s) %(message)s',
)
local_data = MyLocal(1000)
show_value(local_data)
for i in range(2):
t = threading.Thread(target=worker, args=(local_data,))
t.start()
__init__()
viene invocato sullo stesso oggetto (notare il valore di id()
), una volta in ciascun thread per impostare i valori predefiniti.
$ python3 threading_local_defaults.py (MainThread) Inizializzazione <__main__.MyLocal object at 0x7ffadcf04be0> (MainThread) value=1000 (Thread-1 ) Inizializzazione <__main__.MyLocal object at 0x7ffadcf04be0> (Thread-1 ) value=1000 (Thread-1 ) value=52 (Thread-2 ) Inizializzazione <__main__.MyLocal object at 0x7ffadcf04be0> (Thread-2 ) value=1000 (Thread-2 ) value=19
Vedere anche:
- threading
- La documentazione della libreria standard per questo modulo.
- Note di portabilità per threading
- multiprocessing
- Una API per lavorare con processi che rispecchiano l'API di threading
- Queue
- Coda thread-safe, utile per passare messaggi tra thread
- thread
- Api thread di basso livello