Scopo | Fornisce una implementazione FIFO thread-safe |
Versione Python | 1.4 e superiore |
A partire dal 1 gennaio 2021 le versioni 2.x di Python non sono piu' supportate. Ti invito a consultare la corrispondente versione 3.x dell'articolo per il modulo Queue
Il modulo Queue fornisce una implementazione FIFO adatta per la programmazione multithread . Può essere usato per passare messaggi od altri dati tra il produttore ed il consumatore in modo thread safely . Viene gestito il bloccaggio per il chiamante, quindi è semplice avere tanti thread quanti se ne vogliono che lavorano con la stessa istanza di Queue . La dimensione (il numero di elementi) di Queue può essere ristretta per regolamentare l'utilizzo della memoria o l'elaborazione.
La classe
Queue
implementa un contenitore base primo-che-entra, primo-che-esce. Gli elementi sono aggiunti ad un estremo della sequenza usando
put()
e rimossi dall'altro tramite
get()
.
import Queue
q = Queue.Queue()
for i in range(5):
q.put(i)
while not q.empty():
print q.get()
Questo esempio utilzza un singolo thread per illustrare che gli elementi sono rimossi dalla coda nello stesso ordine con il quale sono stati inseriti.
$ python Queue_fifo.py 0 1 2 3 4
Al contrario dell'implementazione FIFO standard di
Queue
,
LifoQueue
utilizza un ordinamento ultimo-che-entra, primo-che-esce (in genere associato con una struttura dati
stack
).
import Queue
q = Queue.LifoQueue()
for i in range(5):
q.put(i)
while not q.empty():
print q.get()
L'elemento inserito più di recente nella coda, tramite
put()
, viene rimosso da
get()
.
$ python Queue_lifo.py 4 3 2 1 0
Talvolta l'ordine di elaborazione degli elementi in una coda deve basarsi sulle caratteristiche degli elementi stessi, invece del mero ordine nel quale sono stati creati od aggiunti alla coda. Ad esempio, i lavori di stampa da un ufficio paghe, potrebbero avere la precedenza su di un listato di codice stampato da uno sviluppatore.
PriorityQueue
uilizza l'ordinamento del contenuto della coda per decidere cosa recuperare.
import Queue
class Job(object):
def __init__(self, priority, description):
self.priority = priority
self.description = description
print 'Nuovo Lavoro:', description
return
def __cmp__(self, other):
return cmp(self.priority, other.priority)
q = Queue.PriorityQueue()
q.put( Job(3, 'Lavoro Normale') )
q.put( Job(10, 'Lavoro non significativo') )
q.put( Job(1, 'Lavoro importante') )
while not q.empty():
next_job = q.get()
print 'Elaborazione dei lavori:', next_job.description
In questo esempio a thread singolo, i lavori sono estratti dalla coda in stretto ordine di priorità. Se ci fossero thread multipli a consumare i lavori, essi sarebbero stati elaborati in base alla priorità degli elementi nella coda al tempo in cui viene chiamato
get()
.
$ python Queue_priority.py Nuovo Lavoro: Lavoro Normale Nuovo Lavoro: Lavoro non significativo Nuovo Lavoro: Lavoro importante Elaborazione dei lavori: Lavoro importante Elaborazione dei lavori: Lavoro Normale Elaborazione dei lavori: Lavoro non significativo
Come esempio di come usare la classe Queue con thread multipli si può creare un client di podcasting molto semplice. Il client legge uno o più feed RSS, mette in coda le richieste di scaricamento ed elabora parecchi scaricamenti in parallelo tramite i thread. E' semplice ed inutilizzabile per una vera applicazione, ma l'implementazione base fornisce sufficiente codice da utilizzare per fornire un esempio dell'uso del modulo Queue .
# Moduli di sistem
from Queue import Queue
from threading import Thread
import time
# Mouli locali
import feedparser
# Impostazione di alcune variabili locali
num_fetch_threads = 2
enclosure_queue = Queue()
# Una vera applicazione non utilizzerebbe dati hard-coded ...
#feed_urls = [ 'http://www.castsampler.com/cast/feed/rss/guest',
#]
# Il feed di esempio originale è stato commentato in quanto esso non esiste
# più; al suo posto si utilizza il feed rss del sito PyMotw-it
feed_urls = [ 'http://robyp.x10host.com/pymotw-it_feed.xml',
]
def downloadEnclosures(i, q):
"""Questa è la funzione di lavoro del thread.
Elabora gli elementi nella coda uno dopo l'altro.
Questi deamon thread girano in un loop infinito,
ed escono solamente quando esce il thread principale
"""
while True:
print '%s: Cerco la prossima richiesta' % i
url = q.get()
print '%s: Scaricamento:' % i, url
# invece di scaricare veramente l'URL
# si fa finta e si mette in pausa il programma
time.sleep(i + 2)
q.task_done()
# Impostazione di alcuni thread per ottenere le richieste
for i in range(num_fetch_threads):
worker = Thread(target=downloadEnclosures, args=(i, enclosure_queue,))
worker.setDaemon(True)
worker.start()
# Scaricamento del/i feed ed inserimento dell'url da scaricare
# nella coda.
# Il ciclo seguente è stato modificato dal traduttore in quanto il
# feed di esempio originale non è più disponibile
# -------------- INIZIO CODICE MODIFICATO --------------------------
for url in feed_urls:
response = feedparser.parse(url)
for i, entry in enumerate(response['entries']):
print 'Accodamento:', entry.link
enclosure_queue.put(entry.link)
if i == 9: # Per semplicità prendo solo 10 elementi
break
# -------------- FINE CODICE MODIFICATO ----------------------------
# Ora si attende lo svuotamento della coda, che indica che abbiamo
# elaborato tutti gli scaricamenti.
print '*** Thread principale in attesa'
enclosure_queue.join()
print '*** Fatto'
Per prima cosa si impostano alcuni parametri operativi. In genere questi proverrebbero da input utenti (preferenze, database, qualunque cosa). Per questo esempio si fissano i thread da usare e l'elenco degli URL da recuperare direttamente nel codice.
Successivamente, occore definire la funzione
downloadEnclosures()
che verrà eseguita nel thread di lavoro, che elabora gli scaricamenti. Ancora una volta, a scopo di dimostrazione gli scaricamenti non vengono realmente effettuati. Per farlo veramente si sarebbe potuto usare
urllib
oppure
urllib2
. In questo esempio si simula il tempo di scaricamento tramite un tempo variabile di pausa, in base all'
id
del thread.
Una volta che la funzione è definita, è possibile far partire i thread di lavoro. Si noti che
downloadEnclosures()
si fermerà all'istruzione
urls = q.get()
fino a che la coda ha qualcosa da restituire, quindi è sicuro far partire i thread prima che ci sia qualcosa in coda.
Il passo successivo è recuperare i contenuti del feed, utilizzando il modulo di Mark Pilgrim feedparser accodando gli URL da scaricare. Non appena il primo URL è aggiunto alla coda, uno dei thread di lavoro dovrebbe recuperarlo ed iniziare lo scaricamento. Il ciclo continuerà ad aggiungere elementi fino a che il feed viene consumato, ed i thread di lavoro a turno recupereranno gli URL dalla coda per scaricarli.
L'unica cosa rimasta da fare è attendere che la coda si svuoti ancora, utilizzando
join()
.
Se si esegue lo script di esempio si dovrebbe ottenere un output tipo questo:
0: Cerco la prossima richiesta 1: Cerco la prossima richiesta Accodamento: http://robyp.x10host.com/collections.html Accodamento: http://robyp.x10host.com/dis.html Accodamento: http://robyp.x10host.com/profile.html 1: Scaricamento: http://robyp.x10host.com/collections.html 0: Scaricamento: http://robyp.x10host.com/dis.html Accodamento: http://robyp.x10host.com/contextlib.html Accodamento: http://robyp.x10host.com/atexit.html Accodamento: http://robyp.x10host.com/compileall.html Accodamento: http://robyp.x10host.com/urlparse.html Accodamento: http://robyp.x10host.com/dis.html Accodamento: http://robyp.x10host.com/urllib2.html Accodamento: http://robyp.x10host.com/socketserver.html *** Thread principale in attesa 0: Cerco la prossima richiesta 0: Scaricamento: http://robyp.x10host.com/profile.html 1: Cerco la prossima richiesta 1: Scaricamento: http://robyp.x10host.com/contextlib.html 0: Cerco la prossima richiesta 0: Scaricamento: http://robyp.x10host.com/atexit.html 1: Cerco la prossima richiesta 1: Scaricamento: http://robyp.x10host.com/compileall.html 0: Cerco la prossima richiesta 0: Scaricamento: http://robyp.x10host.com/urlparse.html 0: Cerco la prossima richiesta 0: Scaricamento: http://robyp.x10host.com/dis.html 1: Cerco la prossima richiesta 1: Scaricamento: http://robyp.x10host.com/urllib2.html 0: Cerco la prossima richiesta 0: Scaricamento: http://robyp.x10host.com/socketserver.html 1: Cerco la prossima richiesta 0: Cerco la prossima richiesta *** Fatto
Vedere anche: