Queue - Una implementazione FIFO thread-safe

Scopo Fornisce una implementazione FIFO thread-safe
Versione Python 1.4 e superiore

Il modulo Queue fornisce una implementazione FIFO adatta per la programmazione multithread. Può essere usato per passare messaggi od altri dati tra il produttore ed il consumatore in modo thread safely. Viene gestito il bloccaggio per il chiamante, quindi è semplice avere tanti thread quanti se ne vogliono che lavorano con la stessa istanza di Queue. La dimensione (il numero di elementi) di Queue può essere ristretta per regolamentare l'utilizzo della memoria o l'elaborazione.

Questo articolo assume che sia già nota la natura di una coda. Se non si conosce, meglio consultare qualcuno dei riferimenti prima di continuare.

Queue FIFO base

La classe Queue implementa un contenitore base primo-che-entra, primo-che-esce. Gli elementi sono aggiunti ad un estremo della sequenza usando put() e rimossi dall'altro tramite get().

import Queue

q = Queue.Queue()

for i in range(5):
    q.put(i)

while not q.empty():
    print q.get()

Questo esempio utilzza un singolo thread per illustrare che gli elementi sono rimossi dalla coda nello stesso ordine con il quale sono stati inseriti.

$ python Queue_fifo.py
0
1
2
3
4

Coda LIFO

Al contrario dell'implementazione FIFO standard di Queue, LifoQueue utilizza un ordinamento ultimo-che-entra, primo-che-esce (in genere associato con una struttura dati stack).

import Queue

q = Queue.LifoQueue()

for i in range(5):
    q.put(i)

while not q.empty():
    print q.get()

L'elemento inserito più di recente nella coda, tramite put(), viene rimosso da get().

$ python Queue_lifo.py

4
3
2
1
0

Priorità di Queue

Talvolta l'ordine di elaborazione degli elementi in una coda deve basarsi sulle caratteristiche degli elementi stessi, invece del mero ordine nel quale sono stati creati od aggiunti alla coda. Ad esempio, i lavori di stampa da un ufficio paghe, potrebbero avere la precedenza su di un listato di codice stampato da uno sviluppatore. PriorityQueue uilizza l'ordinamento del contenuto della coda per decidere cosa recuperare.

import Queue

class Job(object):
    def __init__(self, priority, description):
        self.priority = priority
        self.description = description
        print 'Nuovo Lavoro:', description
        return
    def __cmp__(self, other):
        return cmp(self.priority, other.priority)

q = Queue.PriorityQueue()

q.put( Job(3, 'Lavoro Normale') )
q.put( Job(10, 'Lavoro non significativo') )
q.put( Job(1, 'Lavoro importante') )

while not q.empty():
    next_job = q.get()
    print 'Elaborazione dei lavori:', next_job.description

In questo esempio a thread singolo, i lavori sono estratti dalla coda in stretto ordine di priorità. Se ci fossero thread multipli a consumare i lavori, essi sarebbero stati elaborati in base alla priorità degli elementi nella coda al tempo in cui viene chiamato get().

$ python Queue_priority.py

Nuovo Lavoro: Lavoro Normale
Nuovo Lavoro: Lavoro non significativo
Nuovo Lavoro: Lavoro importante
Elaborazione dei lavori: Lavoro importante
Elaborazione dei lavori: Lavoro Normale
Elaborazione dei lavori: Lavoro non significativo

Usare Queue con i Thread

Come esempio di come usare la classe Queue con thread multipli si può creare un client di podcasting molto semplice. Il client legge uno o più feed RSS, mette in coda le richieste di scaricamento ed elabora parecchi scaricamenti in parallelo tramite i thread. E' semplice ed inutilizzabile per una vera applicazione, ma l'implementazione base fornisce sufficiente codice da utilizzare per fornire un esempio dell'uso del modulo Queue.

Il codice dello script che segue è stato modificato dal traduttore in quanto la risorsa originariamente utilizzata per recuperare i feed ('www.castsampler.com/cast/feed/rss/guest') non esiste più. Al suo posto è stato utilizzato il feed RSS di questo sito.
# Moduli di sistem
from Queue import Queue
from threading import Thread
import time

# Mouli locali
import feedparser

# Impostazione di alcune variabili locali
num_fetch_threads = 2
enclosure_queue = Queue()

# Una vera applicazione non utilizzerebbe dati hard-coded ...
#feed_urls = [ 'http://www.castsampler.com/cast/feed/rss/guest',
             #]

# Il feed di esempio originale è stato commentato in quanto esso non esiste
# più; al suo posto si utilizza il feed rss del sito PyMotw-it
feed_urls = [ 'http://robyp.x10host.com/pymotw-it_feed.xml',
              ]


def downloadEnclosures(i, q):
    """Questa è la funzione di lavoro del thread.
    Elabora gli elementi nella coda uno dopo l'altro.
    Questi deamon thread girano in un loop infinito,
    ed escono solamente quando esce il thread principale
    """
    while True:
        print '%s: Cerco la prossima richiesta' % i
        url = q.get()
        print '%s: Scaricamento:' % i, url
        # invece di scaricare veramente l'URL
        # si fa finta e si mette in pausa il programma
        time.sleep(i + 2)
        q.task_done()


# Impostazione di alcuni thread per ottenere le richieste
for i in range(num_fetch_threads):
    worker = Thread(target=downloadEnclosures, args=(i, enclosure_queue,))
    worker.setDaemon(True)
    worker.start()

# Scaricamento del/i feed ed inserimento dell'url da scaricare
# nella coda.

# Il ciclo seguente è stato modificato dal traduttore in quanto il
# feed di esempio originale non è più disponibile
# -------------- INIZIO CODICE MODIFICATO --------------------------
for url in feed_urls:
    response = feedparser.parse(url)
    for i, entry in enumerate(response['entries']):
        print 'Accodamento:', entry.link
        enclosure_queue.put(entry.link)
        if i == 9:  # Per semplicità prendo solo 10 elementi
            break
# -------------- FINE CODICE MODIFICATO ----------------------------



# Ora si attende lo svuotamento della coda, che indica che abbiamo
# elaborato tutti gli scaricamenti.
print '*** Thread principale in attesa'
enclosure_queue.join()
print '*** Fatto'

Per prima cosa si impostano alcuni parametri operativi. In genere questi proverrebbero da input utenti (preferenze, database, qualunque cosa). Per questo esempio si fissano i thread da usare e l'elenco degli URL da recuperare direttamente nel codice.

Successivamente, occore definire la funzione downloadEnclosures() che verrà eseguita nel thread di lavoro, che elabora gli scaricamenti. Ancora una volta, a scopo di dimostrazione gli scaricamenti non vengono realmente effettuati. Per farlo veramente si sarebbe potuto usare urllib oppure urllib2. In questo esempio si simula il tempo di scaricamento tramite un tempo variabile di pausa, in base all'id del thread.

Una volta che la funzione è definita, è possibile far partire i thread di lavoro. Si noti che downloadEnclosures() si fermerà all'istruzione urls = q.get() fino a che la coda ha qualcosa da restituire, quindi è sicuro far partire i thread prima che ci sia qualcosa in coda.

Il passo successivo è recuperare i contenuti del feed, utilizzando il modulo di Mark Pilgrim feedparser accodando gli URL da scaricare. Non appena il primo URL è aggiunto alla coda, uno dei thread di lavoro dovrebbe recuperarlo ed iniziare lo scaricamento. Il ciclo continuerà ad aggiungere elementi fino a che il feed viene consumato, ed i thread di lavoro a turno recupereranno gli URL dalla coda per scaricarli.

L'unica cosa rimasta da fare è attendere che la coda si svuoti ancora, utilizzando join().

Se si esegue lo script di esempio si dovrebbe ottenere un output tipo questo:

0: Cerco la prossima richiesta
1: Cerco la prossima richiesta
Accodamento: http://robyp.x10host.com/collections.html
Accodamento: http://robyp.x10host.com/dis.html
Accodamento: http://robyp.x10host.com/profile.html
1: Scaricamento: http://robyp.x10host.com/collections.html
0: Scaricamento: http://robyp.x10host.com/dis.html
Accodamento: http://robyp.x10host.com/contextlib.html
Accodamento: http://robyp.x10host.com/atexit.html
Accodamento: http://robyp.x10host.com/compileall.html
Accodamento: http://robyp.x10host.com/urlparse.html
Accodamento: http://robyp.x10host.com/dis.html
Accodamento: http://robyp.x10host.com/urllib2.html
Accodamento: http://robyp.x10host.com/socketserver.html
*** Thread principale in attesa
0: Cerco la prossima richiesta
0: Scaricamento: http://robyp.x10host.com/profile.html
1: Cerco la prossima richiesta
1: Scaricamento: http://robyp.x10host.com/contextlib.html
0: Cerco la prossima richiesta
0: Scaricamento: http://robyp.x10host.com/atexit.html
1: Cerco la prossima richiesta
1: Scaricamento: http://robyp.x10host.com/compileall.html
0: Cerco la prossima richiesta
0: Scaricamento: http://robyp.x10host.com/urlparse.html
0: Cerco la prossima richiesta
0: Scaricamento: http://robyp.x10host.com/dis.html
1: Cerco la prossima richiesta
1: Scaricamento: http://robyp.x10host.com/urllib2.html
0: Cerco la prossima richiesta
0: Scaricamento: http://robyp.x10host.com/socketserver.html
1: Cerco la prossima richiesta
0: Cerco la prossima richiesta
*** Fatto

Vedere anche:

Queue
La documentazione della libreria standard per questo modulo.
Deque da collections
collections comprende una classe deque (una coda a due estremi)
Coda Informatica
Pagina Wikipedia
FIFO
Pagina Wikipedia
feedparser
Il modulo feedparser di Mark Pilgrim
Strutture dati in-memoria
Altre complesse strutture dati nella libreria standard. (in inglese)