Queue - Una implementazione FIFO thread-safe

Scopo: Fornisce una implementazione FIFO adatta per la programmazione

Il modulo queue fornisce una struttura dati FIFO adatta per la programmazione multithread. Può essere usato per passare messaggi o altri dati tra il produttore e il consumatore in modo thread safety. Viene gestito il bloccaggio per il chiamante, in modo che molteplici thread possono lavorare con la stessa istanza di Queue in sicurezza. La dimensione di Queue (il numero di elementi che contiene) può essere ristretta per regolamentare l'utilizzo della memoria o l'elaborazione.

Questo articolo assume che sia già nota la natura di una coda. Se non la si conosce, meglio consultare qualcuno dei riferimenti in fondo all'articolo prima di continuare.

Queue FIFO base

La classe Queue implementa un contenitore base primo-che-entra, primo-che-esce. Gli elementi sono aggiunti a un estremo della sequenza usando put() e rimossi dall'altro tramite get().

# queue_fifo.py

import queue

q = queue.Queue()

for i in range(5):
    q.put(i)

while not q.empty():
    print(q.get(), end=' ')
print()

Questo esempio utilizza un singolo thread per dimostrare che gli elementi sono rimossi dalla coda nello stesso ordine con il quale sono stati inseriti.

$ python3 queue_fifo.py

0 1 2 3 4

Coda LIFO

Al contrario dell'implementazione FIFO standard di Queue, LifoQueue utilizza un ordinamento ultimo-che-entra, primo-che-esce (in genere associato con una struttura dati stack).

# queue_lifo.py

import queue

q = queue.LifoQueue()

for i in range(5):
    q.put(i)

while not q.empty():
    print(q.get(), end=' ')
print()

L'elemento inserito più di recente nella coda, tramite put(), viene rimosso da get().

$ python3 queue_lifo.py

4 3 2 1 0

Coda di Priorità

Talvolta l'ordine di elaborazione degli elementi in una coda deve basarsi sulle caratteristiche degli elementi stessi, invece del mero ordine nel quale sono stati creati o aggiunti alla coda. Ad esempio, i lavori di stampa di un ufficio paghe, potrebbero avere la precedenza su di un listato di codice stampato da uno sviluppatore. PriorityQueue uilizza l'ordinamento del contenuto della coda per decidere cosa recuperare.

# queue_priority.py

import functools
import queue
import threading


@functools.total_ordering
class Job:

    def __init__(self, priority, description):
        self.priority = priority
        self.description = description
        print('Nuovo lavoro:', description)
        return

    def __eq__(self, other):
        try:
            return self.priority == other.priority
        except AttributeError:
            return NotImplemented

    def __lt__(self, other):
        try:
            return self.priority < other.priority
        except AttributeError:
            return NotImplemented


q = queue.PriorityQueue()

q.put(Job(3, 'Lavoro normale'))
q.put(Job(10, 'Lavoro non significativo'))
q.put(Job(1, 'Lavoro importante'))


def process_job(q):
    while True:
        next_job = q.get()
        print('Elaborazione dei lavori:', next_job.description)
        q.task_done()

workers = [
    threading.Thread(target=process_job, args=(q,)),
    threading.Thread(target=process_job, args=(q,)),
]
for w in workers:
    w.setDaemon(True)
    w.start()

q.join()

In questo esempio molteplici thread consumano i lavori, che sono elaborati in base alla priorità degli elementi nella coda quando è stato chiamato get. L'ordine di elaborazione per gli elementi aggiunti alla coda mentre i thread per consumarli sono in esecuzione dipende dallo scambio di contesto del thread.

$ python3 queue_priority.py

Nuovo lavoro: Lavoro normale
Nuovo lavoro: Lavoro non significativo
Nuovo lavoro: Lavoro importante
Elaborazione dei lavori: Lavoro importante
Elaborazione dei lavori: Lavoro normale
Elaborazione dei lavori: Lavoro non significativo

Costruire un Client Podcast con Thread

Il codice sorgente per questo client per il podcast in questa sezione dimostra come usare la classe Queue con thread multipli. Il programma legge uno o più feed RSS, mette in coda le richieste di scaricamento dei cinque episodi più recenti da ogni feed da scaricare, ed elabora parecchi scaricamenti in parallelo tramite i thread. Non ha una gestione degli errori sufficiente per un utilizzo in produzione, ma l'implementazione base fornisce un esempio dell'uso del modulo Queue.

Per prima cosa occorre impostare qualche parametro operativo. In genere questi dovrebbero essere recuperati da un input da utente (preferenze, un database ecc.). Questo esempio utilizza valori scritti direttamente nel codice per il numero dei thread e l'elenco degli URL da recuperare.

# fetch_podcasts.py

from queue import Queue
import threading
import time
import urllib
from urllib.parse import urlparse

import feedparser

# Impostazione di qualche variabile globale
num_fetch_threads = 2
enclosure_queue = Queue()

# Una vera applicazione non utilizzerebbe dati
# scritti direttamente nel codice
feed_urls = [
    'http://talkpython.fm/episodes/rss',
]

def message(s):
    print('{}: {}'.format(threading.current_thread().name, s))

La funzione download_enclosures() verrà eseguita in ogni thread di lavoro ed effettuerà gli scaricamenti utilizzando urllib.

def download_enclosures(q):
    """Questa è la funzione di lavoro del thread.
    Elabora gli elementi nella coda uno dopo l'altro.
    Questi thread di demone girano in un ciclo infinito,
    ed escono solamente quando esce il thread principale
    """

    while True:
        message('Cerco la prossima richiesta')
        url = q.get()
        filename = url.rpartition('/')[-1]
        message('scaricamento {}'.format(filename))
        response = urllib.request.urlopen(url)
        data = response.read()
        # Salva il file scaricato nella directory corrente
        message('scrittura in {}'.format(filename))
        with open(filename, 'wb') as outfile:
            outfile.write(data)
        q.task_done()

Una volta che la funzione da utilizzare per i thread è definita, i thread di lavoro possono essere lanciati. Quando download_enclosures() elabora l'istruzione url = q.get() si interrompe e attende fino a quando la coda ha qualcosa da restituire. Il che significa che si possono far partire in sicurezza i thread prima che ci sia qualcosa in coda.

# Impostazione di alcuni thread per ottenere le richieste
for i in range(num_fetch_threads):
    worker = threading.Thread(
        target=download_enclosures,
        args=(enclosure_queue,),
        name='worker-{}'.format(i),
    )
    worker.setDaemon(True)
    worker.start()

IL prossimo passo è il recupero dei contenuto del feed utilizzando il modulo feedparser, e accodando gli URL delle richieste. Non appena il primo URL viene aggiunto alla coda, uno dei thread di lavoro lo prende in consegna e ne inizia lo scaricamento. Il ciclo continuerà ad aggiungere elementi fino a che il feed viene consumato interamente, e i thread di lavoro faranno a turno per ottenere dalla coda gli URL da scaricare.

# Scaricamento del/i feed e inserimento dell'url da scaricare
# nella coda.
for url in feed_urls:
    response = feedparser.parse(url, agent='fetch_podcasts.py')
    for entry in response['entries'][:5]:
        for enclosure in entry.get('enclosures', []):
            parsed_url = urlparse(enclosure['url'])
            message('accodamento {}'.format(
                parsed_url.path.rpartition('/')[-1]))
            enclosure_queue.put(enclosure['url'])

La sola cosa rimasta da fare è di attendere che la coda si svuoti nuovamente, utilizzando join().

# Ora si attende lo svuotamento della coda, che indica che abbiamo
# elaborato tutti gli scaricamenti.
message('*** Thread principale in attesa')
enclosure_queue.join()
message('*** fatto')

L'esecuzione dello script di esempio produce un output tipo il seguente:

$ python fetch_podcasts.py

worker-0: Cerco la prossima richiesta
worker-1: Cerco la prossima richiesta
MainThread: accodamento data-science-from-scratch.mp3
MainThread: accodamento how-our-engineering-environments-are-killing-diversity-and-how-we-can-fix-it.mp3
MainThread: accodamento enterprise-software-with-python.mp3
MainThread: accodamento python-in-visual-studio.mp3
MainThread: accodamento eve-online-mmo-game-powered-by-python.mp3
MainThread: *** Thread principale in attesa
worker-0: scaricamento data-science-from-scratch.mp3
worker-1: scaricamento how-our-engineering-environments-are-killing-diversity-and-how-we-can-fix-it.mp3
worker-0: scrittura in data-science-from-scratch.mp3
worker-0: Cerco la prossima richiesta
worker-0: scaricamento enterprise-software-with-python.mp3
worker-0: scrittura in enterprise-software-with-python.mp3
worker-0: Cerco la prossima richiesta
worker-0: scaricamento python-in-visual-studio.mp3
worker-1: scrittura in how-our-engineering-environments-are-killing-diversity-and-how-we-can-fix-it.mp3
worker-1: Cerco la prossima richiesta
worker-1: scaricamento eve-online-mmo-game-powered-by-python.mp3
worker-0: scrittura in python-in-visual-studio.mp3
worker-0: Cerco la prossima richiesta
worker-1: scrittura in eve-online-mmo-game-powered-by-python.mp3
worker-1: Cerco la prossima richiesta
MainThread: *** fatto

L'effettivo output dipenderà dal tipo di feed RSS utilizzato.

Vedere anche:

Queue
La documentazione della libreria standard per questo modulo.
Deque -- una coda a due estremi
Coda Informatica
Pagina Wikipedia
FIFO
Pagina Wikipedia
feedparser
Il modulo feedparser di Mark Pilgrim