threading - Gestire Operazioni Concomitanti all'Interno di un Processo

Scopo: Gestisce parecchi thread di esecuzioni

L'utilizzo di thread consente a un programma di eseguire operazioni multiple in concomitanza nello stesso spazio di processo.

Oggetti Thread

Il modo più semplice di usare un thread è di istanziare Thread con una funzione obiettivo, chiamare start() e lasciare che inizi a lavorare.

# threading_simple.py

import threading


def worker():
    """funzione in esecuzione nel thread"""
    print('Esecutore')


threads = []
for i in range(5):
    t = threading.Thread(target=worker)
    threads.append(t)
    t.start()

Il risultato è costituito da cinque righe, ciascuna contenente la parola "Esecutore".

$ python3 threading_simple.py

Esecutore
Esecutore
Esecutore
Esecutore
Esecutore

E' utile essere in grado di distribuire un thread e passargli argomenti per dirgli quale lavoro svolgere. Un qualsiasi tipo di oggetto può essere passato come argomento al thread. Questo esempio passa un numero, che il thread stampa.

# threading_simpleargs.py

import threading


def worker(num):
    """Funzione in esecuzione nel thread"""
    print('Esecutore: %s' % num)


threads = []
for i in range(5):
    t = threading.Thread(target=worker, args=(i,))
    threads.append(t)
    t.start()

L'argomento intero ora viene incluso nel messaggio stampato da ciascun thread.

$ python3 threading_simpleargs.py

Esecutore: 0
Esecutore: 1
Esecutore: 2
Esecutore: 3
Esecutore: 4

Determinare il Thread Corrente

Utilizzare argomenti per identificare o nominare il thread è poco gestibile e non necessario. Ogni istanza di Thread ha un nome con un valore predefinito che può essere modificato non appena il thread è creato. Attribuire nomi ai thread è utile in processi server con thread di servizi multipli che gestiscono differenti operazioni.

# threading_names.py

import threading
import time


def worker():
    print(threading.current_thread().getName(), 'Partenza')
    time.sleep(0.2)
    print(threading.current_thread().getName(), 'Uscita')


def my_service():
    print(threading.current_thread().getName(), 'Partenza')
    time.sleep(0.3)
    print(threading.current_thread().getName(), 'Uscita')


t = threading.Thread(name='il_mio_servizio', target=my_service)
w = threading.Thread(name='esecutore', target=worker)
w2 = threading.Thread(target=worker)  # usa nome predefinito

w.start()
w2.start()
t.start()

Il risultato del debug include il nome del thread corrente su ciascuna riga. Le righe con "Thread-1" nella colonna del nome del thread corrispondono al thread senza nome w2.

$ python3 threading_names.py

esecutore Partenza
Thread-1 Partenza
il_mio_servizio Partenza
esecutore Uscita
Thread-1 Uscita
il_mio_servizio Uscita

Molti programmi non usano print per debug. Il modulo logging supporta l'incorporazione del nome thread in ogni messaggio registrato usando il codice di formattazione %(threadname)s. Includere i nomi dei thread nel messaggi registrati rende possibile la tracciatura di detti messaggi verso la loro sorgente.

# threading_names_log.py

import logging
import threading
import time


def worker():
    logging.debug('Partenza')
    time.sleep(0.2)
    logging.debug('Uscita')


def my_service():
    logging.debug('Partenza')
    time.sleep(0.3)
    logging.debug('Uscita')


logging.basicConfig(
    level=logging.DEBUG,
    format='[%(levelname)s] (%(threadName)-10s) %(message)s',
)

t = threading.Thread(name='il_mio_servizio', target=my_service)
w = threading.Thread(name='esecutore', target=worker)
w2 = threading.Thread(target=worker)  # usa nome predefinito

w.start()
w2.start()
t.start()

logging è anche thread-safe, quindi i messaggi da diversi thread sono mantenuti distinti nel risultato.

$ python3 threading_names_log.py

[DEBUG] (esecutore ) Partenza
[DEBUG] (Thread-1  ) Partenza
[DEBUG] (il_mio_servizio) Partenza
[DEBUG] (esecutore ) Uscita
[DEBUG] (Thread-1  ) Uscita
[DEBUG] (il_mio_servizio) Uscita

Thread Demoni contro Thread Non Demoni

Fino a questo punto, i programmi di esempio sono scritti in modo da attendere implicitamente di uscire fino a quando tutti i thread hanno completato il proprio lavoro. Talvolta i programmi producono un thread come demone che viene eseguito senza impedire l'uscita del programma principale. L'utilizzo dei thread come demoni è utile per servizi dove potrebbe non esserci un modo semplice per interrompere il thread, o deve lasciar morire il thread nel mezzo dell'esecuzione del proprio lavoro non comporta una perdita o corruzione di dati (ad esempio un thread che genera "battiti" per uno strumento di monitoraggio di un servizio). Per contrassegnare un thread come demone si passi daemon=True quando si lo si costruisce oppure si chiami set_daemon() con True. La modalità predefinita per i thread è non demone.

# threading_daemon.py

import threading
import time
import logging


def daemon():
    logging.debug('Partenza')
    time.sleep(0.2)
    logging.debug('Uscita')


def non_daemon():
    logging.debug('Partenza')
    logging.debug('Uscita')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

d = threading.Thread(name='demone', target=daemon, daemon=True)

t = threading.Thread(name='non-demone', target=non_daemon)

d.start()
t.start()

Il risultato non comprende il messaggio "Uscita" dal thread demone, visto che tutti i thread non demoni (compreso quello principale) sono usciti prima che il thread demone si "risvegli" dalla chiamata di sleep().

$ python3 threading_daemon.py

(demone    ) Partenza
(non-demone) Partenza
(non-demone) Uscita

Per attendere che un thread demone abbia completato il proprio lavoro, si usi il metodo join().

# threading_daemon_join.py

import threading
import time
import logging


def daemon():
    logging.debug('Partenza')
    time.sleep(0.2)
    logging.debug('Uscita')


def non_daemon():
    logging.debug('Partenza')
    logging.debug('Uscita')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

d = threading.Thread(name='demone', target=daemon, daemon=True)

t = threading.Thread(name='non-demone', target=non_daemon)

d.start()
t.start()

d.join()
t.join()

L'attendere l'uscita del thread demone usando join() fa sì che ci sia la possibilità di produrre il messaggio "Uscita".

$ python3 threading_daemon_join.py

(demone    ) Partenza
(non-demone) Partenza
(non-demone) Uscita
(demone    ) Uscita

Nella modalità predefinita, join() blocca a tempo indeterminato. E' anche possibile passare un valore a virgola mobile che rappresenti il numero di secondi da attendere prima che il thread diventi inattivo. Se il thread non si completa in quel lasso di tempo, join() ritorna comunque.

# threading_daemon_join_timeout.py

import threading
import time
import logging


def daemon():
    logging.debug('Partenza')
    time.sleep(0.2)
    logging.debug('Uscita')


def non_daemon():
    logging.debug('Partenza')
    logging.debug('Uscita')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

d = threading.Thread(name='demone', target=daemon, daemon=True)

t = threading.Thread(name='non-demone', target=non_daemon)

d.start()
t.start()

d.join(0.1)
print('d.isAlive()', d.isAlive())
t.join()

Visto che il periodo di attesa passato è minore di quello nel quale il thread viene messo in pausa, il thread è ancora "vivo" dopo che join() ritorna.

$ python3 threading_daemon_join_timeout.py

(demone    ) Partenza
(non-demone) Partenza
(non-demone) Uscita
threading_daemon_join_timeout.py:32: DeprecationWarning: isAlive() is deprecated, use is_alive() instead
  print('d.isAlive()', d.isAlive())
d.isAlive() True

Enumerare tutti i Thread

Non è necessario mantenere un handle esplicito per tutti i thread demoni per assicurarsi che siano completati prima di uscire dal processo principale. enumerate() ritorna una lista delle istanze attive di Thread. La lista comprende il thread corrente, e visto che effettuare il join() sul thread corrente introdurrebbe una situazione di stallo, deve essere ignorato.

# threading_enumerate.py

import random
import threading
import time
import logging


def worker():
    """thread worker function"""
    pause = random.randint(1, 5) / 10
    logging.debug('in pausa %0.2f', pause)
    time.sleep(pause)
    logging.debug('terminato')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

for i in range(3):
    t = threading.Thread(target=worker, daemon=True)
    t.start()

main_thread = threading.main_thread()
for t in threading.enumerate():
    if t is main_thread:
        continue
    logging.debug('operazione di join %s', t.getName())
    t.join()

Visto che l'elaboratore è in pausa per un periodo casuale di tempo, il risultato da questo programma potrebbe variare.

$ python3 threading_enumerate.py

(Thread-1  ) in pausa 0.20
(Thread-2  ) in pausa 0.50
(Thread-3  ) in pausa 0.20
(MainThread) operazione di join Thread-1
(Thread-1  ) terminato
(MainThread) operazione di join Thread-2
(Thread-3  ) terminato
(Thread-2  ) terminato
(MainThread) operazione di join Thread-3

Subclassare i Thread

Alla partenza, Thread esegue alcune basiche inizializzazioni, quindi chiama il proprio metodo run(), che a sua volta chiama la funzione passata al costruttore. Per creare una sottoclasse di Thread si sovrascriva run() per eseguire qualsiasi cosa si ritenga necessario.

# threading_subclass.py

import threading
import logging


class MyThread(threading.Thread):

    def run(self):
        logging.debug('in esecuzione')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

for i in range(5):
    t = MyThread()
    t.start()

Il valore di ritorno da run() viene ignorato.

$ python3 threading_subclass.py

(Thread-1  ) in esecuzione
(Thread-2  ) in esecuzione
(Thread-3  ) in esecuzione
(Thread-4  ) in esecuzione
(Thread-5  ) in esecuzione

Visto che i valori di args e kwargs passati al costruttore di Thread sono salvati in variabili private usando nomi prefissati da "__", non sono facilmente accessibili da una sottoclasse. Per passare argomenti ad un tipo di thread personalizzato, si ridefinisca il costruttore per salvare i valori in un attributo di istanza che possa essere visto nella sottoclasse.

# threading_subclass_args.py

import threading
import logging


class MyThreadWithArgs(threading.Thread):

    def __init__(self, group=None, target=None, name=None,
                 args=(), kwargs=None, *, daemon=None):
        super().__init__(group=group, target=target, name=name,
                         daemon=daemon)
        self.args = args
        self.kwargs = kwargs

    def run(self):
        logging.debug('in esecuzione con %s e %s',
                      self.args, self.kwargs)


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

for i in range(5):
    t = MyThreadWithArgs(args=(i,), kwargs={'a': 'A', 'b': 'B'})
    t.start()

MyThreadWithArgs usa la stessa API di Thread, ma un'altra classe potrebbe facilmente cambiare il metodo costruttore per ricevere più o differenti argomenti più direttamente legati allo scopo del thread, così come una qualsiasi altra classe.

$ python3 threading_subclass_args.py

(Thread-1  ) in esecuzione con (0,) e {'a': 'A', 'b': 'B'}
(Thread-2  ) in esecuzione con (1,) e {'a': 'A', 'b': 'B'}
(Thread-3  ) in esecuzione con (2,) e {'a': 'A', 'b': 'B'}
(Thread-4  ) in esecuzione con (3,) e {'a': 'A', 'b': 'B'}
(Thread-5  ) in esecuzione con (4,) e {'a': 'A', 'b': 'B'}

Thread in Timer

Un esempio di una ragione per derivare Thread è fornito da Timer, anch'esso incluso in threading. Un Timer inizia il suo lavoro dopo un differimento, e può essere cancellato in qualsiasi momento all'interno di quel periodo di differimento.

# threading_timer.py

import threading
import time
import logging


def delayed():
    logging.debug('elaboratore in esecuzione')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

t1 = threading.Timer(0.3, delayed)
t1.setName('t1')
t2 = threading.Timer(0.3, delayed)
t2.setName('t2')

logging.debug('timer partiti')
t1.start()
t2.start()

logging.debug('in attesa prima di cancellare %s', t2.getName())
time.sleep(0.2)
logging.debug('cancellazione %s', t2.getName())
t2.cancel()
logging.debug('fatto')

In questo esempio il secondo timer non viene mai eseguito, ed il primo sembra venga eseguito dopo che il resto del programma principale ha finito. Visto che non è un thread demone, viene implicitamente unito quando il thread principale termina.

$ python3 threading_timer.py

(MainThread) timer partiti
(MainThread) in attesa prima di cancellare t2
(MainThread) cancellazione t2
(MainThread) fatto
(t1        ) elaboratore in esecuzione

Segnalazioni fra Thread

Anche se lo scopo dell'usare thread multipli è di eseguire in concorrenza operazioni separate, ci sono volte nelle quali è importante essere in grado di sincronizzare le operazioni in due o più thread. Un semplice modo per comunicare tra thread in sicurezza è rappresentato da oggetti evento. Un oggetto Event gestisce un flag interno che i chiamanti possono controllare tramite i metodi set() e clear(). Altri thread possono usare wait() per mettersi in pausa fino a che il flag è impostato, di fatto bloccandosi fino a quando gli si consente di continuare.

# threading_event.py

import logging
import threading
import time


def wait_for_event(e):
    """Attende che l'evento sia impostato prima di fare qualsiasi cosa"""
    logging.debug('wait_for_event in partenza')
    event_is_set = e.wait()
    logging.debug('evento impostato: %s', event_is_set)


def wait_for_event_timeout(e, t):
    """Wait t seconds and then timeout"""
    while not e.is_set():
        logging.debug('wait_for_event_timeout in partenza')
        event_is_set = e.wait(t)
        logging.debug('evento impostato: %s', event_is_set)
        if event_is_set:
            logging.debug('evento in elaborazione')
        else:
            logging.debug('altro lavoro in esecuzione')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

e = threading.Event()
t1 = threading.Thread(
    name='block',
    target=wait_for_event,
    args=(e,),
)
t1.start()

t2 = threading.Thread(
    name='nonblock',
    target=wait_for_event_timeout,
    args=(e, 2),
)
t2.start()

logging.debug('In attesa prima di chiamare Event.set()')
time.sleep(0.3)
e.set()
logging.debug('Event è impostato')

Il metodo wait() riceve un argomento che rappresenta il numero di secondi da attendere prima che l'evento vada in time out. Ritorna un booleano che indica se l'evento sia impostato o meno, in modo che il chiamante sappia perchè wait() è ritornato. Il metodo is_set() può essere usato separatamente sull'evento senza temere di bloccarlo.

In questo esempio, wait_for_event_timeout() verifica lo stato dell'evento senza bloccare a tempo indeterminato. wait_for_event() blocca sulla chiamata a wait(), che non ritorna fino a quando lo stato dell'evento cambia.

$ python3 threading_event.py

(block     ) wait_for_event in partenza
(nonblock  ) wait_for_event_timeout in partenza
(MainThread) In attesa prima di chiamare Event.set()
(MainThread) Event è impostato
(nonblock  ) evento impostato: True
(nonblock  ) evento in elaborazione
(block     ) evento impostato: True

Controllare l'Accesso alle Risorse

Oltre alla sincronizzazione delle operazioni dei thread, è anche importante essere in grado di controllare l'accesso a risorse condivise per prevenire un danneggiamento od una perdita di dati. Le strutture dati built-in di Python sono thread-safe come effetto collaterale dell'avere byte-code atomici per manipolarle (il bloccaggio dell'interprete globale usato per proteggere le strutture dati interne di Python non viene rilasciato nel mezzo di un aggiornamento). Altre strutture dati implementate in Python, o tipi più semplici come interi e cifre a virgola mobile, non hanno questa protezione. Per ripararsi da accessi simultanei ad un oggetto, si utilizzi un oggetto Lock.

# threading_lock.py

import logging
import random
import threading
import time


class Counter:

    def __init__(self, start=0):
        self.lock = threading.Lock()
        self.value = start

    def increment(self):
        logging.debug('In attesa del bloccaggio')
        self.lock.acquire()
        try:
            logging.debug('Bloccaggio acquisito')
            self.value = self.value + 1
        finally:
            self.lock.release()


def worker(c):
    for i in range(2):
        pause = random.random()
        logging.debug('In pausa %0.02f', pause)
        time.sleep(pause)
        c.increment()
    logging.debug('Fatto')


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

counter = Counter()
for i in range(2):
    t = threading.Thread(target=worker, args=(counter,))
    t.start()

logging.debug('In attesa dei thread elaboratori')
main_thread = threading.main_thread()
for t in threading.enumerate():
    if t is not main_thread:
        t.join()
logging.debug('Contatorer: %d', counter.value)

In questo esempio la funzione worker() incrementa una istanza di Counter che gestisce un Lock per prevenire che due thread cambino il loro stato interno allo stesso tempo. Se non fosse stato usato Lock, ci sarebbe stata la possibilità di non rilevare un cambiamento nel valore dell'attributo.

$ python3 threading_lock.py

(Thread-1  ) In pausa 0.44
(Thread-2  ) In pausa 0.79
(MainThread) In attesa dei thread elaboratori
(Thread-1  ) In attesa del bloccaggio
(Thread-1  ) Bloccaggio acquisito
(Thread-1  ) In pausa 0.78
(Thread-2  ) In attesa del bloccaggio
(Thread-2  ) Bloccaggio acquisito
(Thread-2  ) In pausa 0.87
(Thread-1  ) In attesa del bloccaggio
(Thread-1  ) Bloccaggio acquisito
(Thread-1  ) Fatto
(Thread-2  ) In attesa del bloccaggio
(Thread-2  ) Bloccaggio acquisito
(Thread-2  ) Fatto
(MainThread) Contatorer: 4

Per scoprire se un altro thread ha acquisito il bloccaggio senza sostenere il thread corrente, si passi False per l'argomento blocking per acquire(). Nel prossimo esempio, worker() tenta di acquisire il bloccaggio per tre volte distinte e conta quanti tentativi ha fatto per questo scopo. Nel frattempo, lock_holder() passa tra il mantenere e rilasciare il bloccaggio, con piccole pause in ogni stato usate per simulare il caricamento.

# threading_lock_noblock.py

import logging
import threading
import time


def lock_holder(lock):
    logging.debug('In partenza')
    while True:
        lock.acquire()
        try:
            logging.debug('Trattenuto')
            time.sleep(0.5)
        finally:
            logging.debug('Non trattenuto')
            lock.release()
        time.sleep(0.5)


def worker(lock):
    logging.debug('In partenza')
    num_tries = 0
    num_acquires = 0
    while num_acquires < 3:
        time.sleep(0.5)
        logging.debug('Tentativo di acquisizione')
        have_it = lock.acquire(0)
        try:
            num_tries += 1
            if have_it:
                logging.debug('Iterazione %d: acquisita',
                              num_tries)
                num_acquires += 1
            else:
                logging.debug('Iterazione %d: Non acquisita',
                              num_tries)
        finally:
            if have_it:
                lock.release()
    logging.debug('Terminato dopo %d iterazioni', num_tries)


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

lock = threading.Lock()

holder = threading.Thread(
    target=lock_holder,
    args=(lock,),
    name='LockHolder',
    daemon=True,
)
holder.start()

worker = threading.Thread(
    target=worker,
    args=(lock,),
    name='Worker',
)
worker.start()

Occorrono a worker() più di tre iterazioni per acquisire il bloccaggio per tre volte distinte.

$ python3 threading_lock_noblock.py

(LockHolder) In partenza
(LockHolder) Trattenuto
(Worker    ) In partenza
(LockHolder) Non trattenuto
(Worker    ) Tentativo di acquisizione
(Worker    ) Iterazione 1: acquisita
(LockHolder) Trattenuto
(Worker    ) Tentativo di acquisizione
(Worker    ) Iterazione 2: Non acquisita
(LockHolder) Non trattenuto
(Worker    ) Tentativo di acquisizione
(Worker    ) Iterazione 3: acquisita
(LockHolder) Trattenuto
(Worker    ) Tentativo di acquisizione
(Worker    ) Iterazione 4: Non acquisita
(LockHolder) Non trattenuto
(Worker    ) Tentativo di acquisizione
(Worker    ) Iterazione 5: acquisita
(Worker    ) Terminato dopo 5 iterazioni
Bloccaggi Rientranti

Gli oggetti Lock normali non possono essere acquisiti più di una volta, anche se nello stesso thread. Questo può introdurre effetti collaterali indesiderati se il bloccaggio viene indirizzato da più di una funzione nella stessa catena di chiamate.

# threading_lock_reacquire.py

import threading

lock = threading.Lock()

print('Primo tentativo :', lock.acquire())
print('Secondo tentativo:', lock.acquire(0))

In questo caso, alla seconda chiamata ad acquire() viene dato un timeout di zero per prevenirne il bloccaggio visto che lo stesso è stato ottenuto dalla prima chiamata.

$ python3 threading_lock_reacquire.py

Primo tentativo : True
Secondo tentativo: False

In una situazione dove codice separato dallo stesso thread deve "riacquisire" il bloccaggio si utilizzi RLock.

# threading_rlock.py

import threading

lock = threading.RLock()

print('Primo tentativo :', lock.acquire())
print('Secondo tentativo:', lock.acquire(0))

Il solo cambiamento nel codice rispetto all'esempio precedente è la sostituzione di Lock con RLock.

$ python3 threading_rlock.py

Primo tentativo : True
Secondo tentativo: True
Bloccaggi come Gestori di Contesto

I bloccaggi implementano le API del gestore di contesto se sono compatibili con l'istruzione with. L'utilizzo di with consente di rimuovere la necessità di acquisire e rilasciare esplicitamente il bloccaggio.

# threading_lock_with.py

import threading
import logging


def worker_with(lock):
    with lock:
        logging.debug('Bloccaggio acquisito via with')


def worker_no_with(lock):
    lock.acquire()
    try:
        logging.debug('Lock acquisito direttamente')
    finally:
        lock.release()


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

lock = threading.Lock()
w = threading.Thread(target=worker_with, args=(lock,))
nw = threading.Thread(target=worker_no_with, args=(lock,))

w.start()
nw.start()

Le due funzioni worker_with() e worker_no_with() gestiscono il bloccaggio in modo equivalente.

$ python3 threading_lock_with.py

(Thread-1  ) Bloccaggio acquisito via with
(Thread-2  ) Lock acquisito direttamente

Sincronizzare i Thread

Oltre all'utilizzo di Events, un altro modo di sincronizzare i thread è tramite l'oggetto Condition. Visto che Condition usa Lock, potrebbe essere legato ad una risorsa condivisa, consentendo a thread multipli di attendere che la risorsa sia aggiornata. In questo esempio, i thread in consumer() attendono che la condizione venga impostata in Condition prima di continuare. Il thread producer() è responsabile dell'impostazione della condizione e della notifica di via libera agli altri thread.

# threading_condition.py

import logging
import threading
import time


def consumer(cond):
    """Attende la condizione ed usa la risorsa"""
    logging.debug('Partenza del thread consumer ')
    with cond:
        cond.wait()
        logging.debug('La risorsa è disponibile per consumer')


def producer(cond):
    """Imposta la risorsa che viene usata da consumer"""
    logging.debug('Partenza del thread producer')
    with cond:
        logging.debug('Si rende disponibile la risorsa')
        cond.notifyAll()


logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s (%(threadName)-2s) %(message)s',
)

condition = threading.Condition()
c1 = threading.Thread(name='c1', target=consumer,
                      args=(condition,))
c2 = threading.Thread(name='c2', target=consumer,
                      args=(condition,))
p = threading.Thread(name='p', target=producer,
                     args=(condition,))

c1.start()
time.sleep(0.2)
c2.start()
time.sleep(0.2)
p.start()

I thread usano with per acquisire il bloccaggio associato a Condition. La cosa funziona anche utilizzando i metodi acquire() e release() esplicitamente.

$ python3 threading_condition.py

2021-06-03 09:16:31,771 (c1) Partenza del thread consumer 
2021-06-03 09:16:31,972 (c2) Partenza del thread consumer 
2021-06-03 09:16:32,172 (p ) Partenza del thread producer
2021-06-03 09:16:32,172 (p ) Si rende disponibile la risorsa
2021-06-03 09:16:32,173 (c1) La risorsa è disponibile per consumer
2021-06-03 09:16:32,173 (c2) La risorsa è disponibile per consumer

La barriera è un altro meccanismo di sincronizzazione dei thread. Barrier imposta un punto di controllo e tutti i thread partecipanti si bloccano fino a che tutte le parti in causa hanno raggiunto quel punto. Consente ai thread di partire separatamente, quindi interrompe fino a che tutti sono pronti a procedere.

# threading_barrier.py

import threading
import time


def worker(barrier):
    print(threading.current_thread().name,
          'In attesa della barriera con altri {}'.format(
              barrier.n_waiting))
    worker_id = barrier.wait()
    print(threading.current_thread().name, 'dopo la barriera',
          worker_id)


NUM_THREADS = 3

barrier = threading.Barrier(NUM_THREADS)

threads = [
    threading.Thread(
        name='worker-%s' % i,
        target=worker,
        args=(barrier,),
    )
    for i in range(NUM_THREADS)
]

for t in threads:
    print(t.name, 'partenza')
    t.start()
    time.sleep(0.1)

for t in threads:
    t.join()

In questo esempio, la barriera Barrier viene configurata per bloccare fino a quando ci sono 3 thread in attesa. Quando la condizione viene soddisfatta, i thread sono rilasciati oltre il punto di controllo simultaneamente. Il valore di ritorno di wait() indica il numero di parti che sono state rilasciate, e può essere usato per limitare alcuni thread dall'intraprendere una azione tipo la pulizia di una risorsa condivisa.

$ python3 threading_barrier.py

worker-0 partenza
worker-0 In attesa della barriera con altri 0
worker-1 partenza
worker-1 In attesa della barriera con altri 1
worker-2 partenza
worker-2 In attesa della barriera con altri 2
worker-2 dopo la barriera 2
worker-1 dopo la barriera 1
worker-0 dopo la barriera 0

Il metodo abort() di Barrier fa sì che tutti i thread in attesa ricevano un segnale BrokenBarrierError, che consente ai thread di eseguire una pulizia se il processo viene interrotto mentre sono bloccati in wait().

# threading_barrier_abort.py

import threading
import time


def worker(barrier):
    print(threading.current_thread().name,
          'in attesa della barriera con altri {}'.format(
              barrier.n_waiting))
    try:
        worker_id = barrier.wait()
    except threading.BrokenBarrierError:
        print(threading.current_thread().name, 'interruzione')
    else:
        print(threading.current_thread().name, 'dopo la barriera',
              worker_id)


NUM_THREADS = 3

barrier = threading.Barrier(NUM_THREADS + 1)

threads = [
    threading.Thread(
        name='worker-%s' % i,
        target=worker,
        args=(barrier,),
    )
    for i in range(NUM_THREADS)
]

for t in threads:
    print(t.name, 'partenza')
    t.start()
    time.sleep(0.1)

barrier.abort()

for t in threads:
    t.join()

Questo esempio configura Barrier per attendersi un ulteriore thread partecipante rispetto a quanti realmente partiti, quindi l'elaborazione in tutti i thread è bloccata. La chiamata di abort() solleva una eccezione in ogni thread bloccato.

$ python3 threading_barrier_abort.py

worker-0 partenza
worker-0 in attesa della barriera con altri 0
worker-1 partenza
worker-1 in attesa della barriera con altri 1
worker-2 partenza
worker-2 in attesa della barriera con altri 2
worker-0 interruzione
worker-2 interruzioneworker-1 interruzione

Limitare l'Accesso Concorrenziale alle Risorse

Talvolta è utile consentire a più di un elaboratore l'accesso contemporaneo ad una risorsa, mantenendo sempre la possibilità di limitarne il numero complessivo. Ad esempio un pool di connessioni potrebbe supportare un numero definito di connessioni simultanee, oppure una applicazione di rete potrebbe supportare un numero definito di scaricamenti concorrenti. Un modo per gestire queste connessioni può essere Semaphore.

# threading_semaphore.py

import logging
import random
import threading
import time


class ActivePool:

    def __init__(self):
        super(ActivePool, self).__init__()
        self.active = []
        self.lock = threading.Lock()

    def makeActive(self, name):
        with self.lock:
            self.active.append(name)
            logging.debug('In esecuzione: %s', self.active)

    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)
            logging.debug('In esecuzione: %s', self.active)


def worker(s, pool):
    logging.debug('In attesa di raggiungere il pool')
    with s:
        name = threading.current_thread().getName()
        pool.makeActive(name)
        time.sleep(0.1)
        pool.makeInactive(name)


logging.basicConfig(
    level=logging.DEBUG,
    format='%(asctime)s (%(threadName)-2s) %(message)s',
)

pool = ActivePool()
s = threading.Semaphore(2)
for i in range(4):
    t = threading.Thread(
        target=worker,
        name=str(i),
        args=(s, pool),
    )
    t.start()

In questo esempio la classe ActivePool serve semplicemente per tracciare con comodità quali thread possono essere eseguiti ad un dato momento. Un pool di risorse reale dovrebbe allocare una connessione o qualche altro valore al thread appena attivato, e riprendersi quel valore quando il thread ha concluso. Qui viene semplicemente usato per mantenere i nomi dei thread attivi per mostrare che sono in esecuzione al massimo due thread contemporaneamente.

$ python3 threading_semaphore.py

2021-06-03 09:16:32,925 (0 ) In attesa di raggiungere il pool
2021-06-03 09:16:32,925 (0 ) In esecuzione: ['0']
2021-06-03 09:16:32,925 (1 ) In attesa di raggiungere il pool
2021-06-03 09:16:32,925 (1 ) In esecuzione: ['0', '1']
2021-06-03 09:16:32,926 (2 ) In attesa di raggiungere il pool
2021-06-03 09:16:32,926 (3 ) In attesa di raggiungere il pool
2021-06-03 09:16:33,025 (0 ) In esecuzione: ['1']
2021-06-03 09:16:33,026 (2 ) In esecuzione: ['1', '2']
2021-06-03 09:16:33,026 (1 ) In esecuzione: ['2']
2021-06-03 09:16:33,026 (3 ) In esecuzione: ['2', '3']
2021-06-03 09:16:33,126 (2 ) In esecuzione: ['3']
2021-06-03 09:16:33,127 (3 ) In esecuzione: []

Dati Specifici al Thread

Mentre alcune risorse devono essere bloccate in modo che possano essere usate da thread multipli, le altre devono essere protette in modo che siano nascoste ai thread che le detengono. La classe local() crea un oggetto in grado di nascondere i valori dalla vista in thread separati.

# threading_local.py

import random
import threading
import logging


def show_value(data):
    try:
        val = data.value
    except AttributeError:
        logging.debug('Ancora nessun valore')
    else:
        logging.debug('value=%s', val)


def worker(data):
    show_value(data)
    data.value = random.randint(1, 100)
    show_value(data)


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

local_data = threading.local()
show_value(local_data)
local_data.value = 1000
show_value(local_data)

for i in range(2):
    t = threading.Thread(target=worker, args=(local_data,))
    t.start()

L'attributo local_data.value non è presente per tutti gli altri thread fino a quando rimane impostato in quel thread.

$ python3 threading_local.py

(MainThread) Ancora nessun valore
(MainThread) value=1000
(Thread-1  ) Ancora nessun valore
(Thread-1  ) value=11
(Thread-2  ) Ancora nessun valore
(Thread-2  ) value=30

Per inizializzare le impostazioni in modo che tutti i thread partano con lo stesso valore, si utilizzi una sottoclasse e si impostino gli attributi in __init__().

# threading_local_defaults.py

import random
import threading
import logging


def show_value(data):
    try:
        val = data.value
    except AttributeError:
        logging.debug('Ancora nessun valore')
    else:
        logging.debug('value=%s', val)


def worker(data):
    show_value(data)
    data.value = random.randint(1, 100)
    show_value(data)


class MyLocal(threading.local):

    def __init__(self, value):
        super().__init__()
        logging.debug('Inizializzazione %r', self)
        self.value = value


logging.basicConfig(
    level=logging.DEBUG,
    format='(%(threadName)-10s) %(message)s',
)

local_data = MyLocal(1000)
show_value(local_data)

for i in range(2):
    t = threading.Thread(target=worker, args=(local_data,))
    t.start()

__init__() viene invocato sullo stesso oggetto (notare il valore di id()), una volta in ciascun thread per impostare i valori predefiniti.

$ python3 threading_local_defaults.py

(MainThread) Inizializzazione <__main__.MyLocal object at 0x7ffadcf04be0>
(MainThread) value=1000
(Thread-1  ) Inizializzazione <__main__.MyLocal object at 0x7ffadcf04be0>
(Thread-1  ) value=1000
(Thread-1  ) value=52
(Thread-2  ) Inizializzazione <__main__.MyLocal object at 0x7ffadcf04be0>
(Thread-2  ) value=1000
(Thread-2  ) value=19

Vedere anche:

threading
La documentazione della libreria standard per questo modulo.
Note di portabilità per threading
multiprocessing
Una API per lavorare con processi che rispecchiano l'API di threading
Queue
Coda thread-safe, utile per passare messaggi tra thread
thread
Api thread di basso livello