multiprocessing - Gestisce Processi come Thread
Scopo: Fornisce una API per la gestione dei processi
Il modulo multiprocessing fornisce un API per suddividere il lavoro tra processi molteplici basati sulla API di threading. In taluni casi multiprocessing può essere usato come rimpiazzo al posto di threading per trarre vantaggio dei core multipli di CPU per evitare colli di bottiglia computazionali associati con i bloccaggi dell'interprete globale di Python.
Viste le similarità, i primi pochi esempi di seguito sono tratti dagli esempi di threading. Le caratteristiche fornite da multiprocessing non disponibili in threading sono trattate successivamente.
Concetti base di multiprocessing
Il modo più semplice di generare un secondo processo è istanziare un oggetto Process
con una funzione obiettivo. quindi chiamare start()
per iniziare il lavoro.
# multiprocessing_simple.py
import multiprocessing
def worker():
"""Funzione elaboratore"""
print('Elaboratore')
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker)
jobs.append(p)
p.start()
Il risultato include la parola "Worker" stampata cinque volte, sebbene possa non ancora essere completamente pulito, a seconda dell'ordine di esecuzione, visto che ogni processo sta tentando di accedere al flusso in uscita.
$ python3 multiprocessing_simple.py Elaboratore Elaboratore Elaboratore Elaboratore Elaboratore
In genere è' più utile essere in grado di generare un processo con argomenti per dirgli che lavoro dovrà fare. A differenza di threading, per passare argomenti a un oggetto multiprocessing.Process
, essi devono potere essere serializzati usando pickle. Questo esempio passa a ciascun elaboratore un numero da stampare.
# multiprocessing_simpleargs.py
import multiprocessing
def worker(num):
"""Funzione del thread elaboratore"""
print('Elaboratore:', num)
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(target=worker, args=(i,))
jobs.append(p)
p.start()
L'intero passato come argomento ora viene incluso nel messaggio stampato da ogni elaboratore.
$ python3 multiprocessing_simpleargs.py Elaboratore: 1 Elaboratore: 2 Elaboratore:Elaboratore: 43 Elaboratore: 0
Funzioni Obiettivo Importabili
Una differenza tra gli esempi per threading e multiprocessing è la protezione supplementare per __main__
usata negli esempi multiprocessing. A causa del modo con il quale i nuovi processi sono fatti partire, il processo figlio deve essere capace di importare lo script che contiene la funzione obiettivo. Impacchettando la parte principale dell'applicazione in una verifica per __main__
assicura che non venga eseguita in modo ricorsivo in ogni figlio quando il modulo viene importato. Un altro approccio è di importare la funzione obiettivo da uno script separato. Ad esempio multiprocessing_import_main.py
usa una funzione elaboratore definita in un secondo modulo.
# multiprocessing_import_main.py
import multiprocessing
import multiprocessing_import_worker
if __name__ == '__main__':
jobs = []
for i in range(5):
p = multiprocessing.Process(
target=multiprocessing_import_worker.worker,
)
jobs.append(p)
p.start()
La funzione elaboratore viene definita in multiprocessing_import_worker.py.
# multiprocessing_import_worker.py
def worker():
"""Funzione elaboratore"""
print('Elaboratore')
return
La chiamata del programma principale produce un risultato simile al primo esempio.
$ python3 multiprocessing_import_main.py Elaboratore Elaboratore Elaboratore Elaboratore Elaboratore
Determinare il Processo Corrente
Passare argomenti per identificare o nominare il processo è difficoltoso e non necessario. Ogni istanza di Process
ha un nome con un valore predefinito che può essere modificato alla creazione del processo. Attribuire nomi ai processi è utile per tenerne traccia, specialmente in applicazioni con molteplici tipi di processi in esecuzione simultaneamente.
# multiprocessing_names.py
import multiprocessing
import time
def worker():
name = multiprocessing.current_process().name
print(name, 'In partenza')
time.sleep(2)
print(name, 'In uscita')
def my_service():
name = multiprocessing.current_process().name
print(name, 'In partenza')
time.sleep(3)
print(name, 'In uscita')
if __name__ == '__main__':
service = multiprocessing.Process(
name='my_service',
target=my_service,
)
worker_1 = multiprocessing.Process(
name='Elaboratore 1',
target=worker,
)
worker_2 = multiprocessing.Process( # nome predefinito
target=worker,
)
worker_1.start()
worker_2.start()
service.start()
Il risultato di debug include il nome del processo corrente su ogni riga. Le righe con Process-3
nella colonna del nome corrispondono al processo non nominato worker_2
.
$ python3 multiprocessing_names.py Elaboratore 1 In partenza Process-3 In partenza my_service In partenza Process-3Elaboratore 1 In uscitaIn uscita my_service In uscita
Processi Demone
Nella modalità predefinita, il programma principale non esce fino a quando non sono usciti tutti i figli. Ci sono volte nelle quali è utile far partire un processo in background che viene eseguito senza impedire al programma principale di uscire, come ad esempio in servizi dove potrebbe non essere facile interrompere l'elaboratore, o dove lasciarlo morire nel mezzo dell'esecuzione del suo lavoro non comporta perdita o corruzione di dati (ad esempio un compito che genera "battiti" per uno strumento di monitoraggio di servizi).
Per marcare un processo come demone si imposti il suo attributo daemon
a True
. Nella modalità predefinita i processi non sono demoni.
# multiprocessing_daemon.py
import multiprocessing
import time
import sys
def daemon():
p = multiprocessing.current_process()
print('In partenza:', p.name, p.pid)
sys.stdout.flush()
time.sleep(2)
print('In uscita :', p.name, p.pid)
sys.stdout.flush()
def non_daemon():
p = multiprocessing.current_process()
print('In partenza:', p.name, p.pid)
sys.stdout.flush()
print('In uscita :', p.name, p.pid)
sys.stdout.flush()
if __name__ == '__main__':
d = multiprocessing.Process(
name='demone',
target=daemon,
)
d.daemon = True
n = multiprocessing.Process(
name='non demone',
target=non_daemon,
)
n.daemon = False
d.start()
time.sleep(1)
n.start()
Il risultato non comprende il messaggio "In uscita" dal processo demone, visto che tutti i processi non demoni (compreso il programma principale) escono prima che il processo demone si attivi dopo la sua seconda pausa.
$ python3 multiprocessing_daemon.py In partenza: demone 8965 In partenza: non demone 8966 In uscita : non demone 8966
Il processo demone viene terminato automaticamente prima che il programma principale esca, il che evita di lasciare processi orfani in esecuzione. Questo può essere verificato cercando l'identificativo del processo stampato quando il programma è in esecuzione, poi cercando quel processo con un comando tipo ps.
Attendere Processi
Per attendere fino a quando un processo ha completato il suo lavoro ed è uscito, si usi il metodo join()
.
# multiprocessing_daemon_join.py
import multiprocessing
import time
import sys
def daemon():
name = multiprocessing.current_process().name
print('In partenza:', name)
time.sleep(2)
print('In uscita :', name)
def non_daemon():
name = multiprocessing.current_process().name
print('In partenza:', name)
print('In uscita :', name)
if __name__ == '__main__':
d = multiprocessing.Process(
name='daemon',
target=daemon,
)
d.daemon = True
n = multiprocessing.Process(
name='non-daemon',
target=non_daemon,
)
n.daemon = False
d.start()
time.sleep(1)
n.start()
d.join()
n.join()
Visto che usando join()
il processo principale attende che il demone esca, il messaggio "In uscita" questa volta viene stampato.
$ python3 multiprocessing_daemon_join.py In partenza: daemon In partenza: non-daemon In uscita : non-daemon In uscita : daemon
Nella modalità predefinita, join()
blocca a tempo indeterminato. E' anche possibile passare un argomento di timeout (un numero a virgola mobile che rappresenta il numero di secondi da attendere affinchè il processo divenga inattivo). Se il processo non si completa entro il periodo di timeout, join()
ritorna comunque.
# multiprocessing_daemon_join_timeout.py
import multiprocessing
import time
import sys
def daemon():
name = multiprocessing.current_process().name
print('In partenza:', name)
time.sleep(2)
print('In uscita :', name)
def non_daemon():
name = multiprocessing.current_process().name
print('In partenza:', name)
print('In uscita :', name)
if __name__ == '__main__':
d = multiprocessing.Process(
name='daemon',
target=daemon,
)
d.daemon = True
n = multiprocessing.Process(
name='non-daemon',
target=non_daemon,
)
n.daemon = False
d.start()
n.start()
d.join(1)
print('d.is_alive()', d.is_alive())
n.join()
Visto che il tempo nel quale il demone è in pausa è maggiore rispetto al timeout passato, il processo è ancora "vivo" dopo che join()
ritorna.
$ python3 multiprocessing_daemon_join_timeout.py In partenza: daemon In partenza: non-daemon In uscita : non-daemon d.is_alive() True
Terminare i Processi
Quantunque sia meglio usare il metodo della pillola avvelenata di segnalare a un processo che dovrebbe uscire (vedere Passare Messaggi ai Processi), se un processo appare bloccato potrebbe essere utile essere capaci di eliminarlo forzatamente. La chiamata di terminate()
su di un oggetto processo elimina il processo figlio.
# multiprocessing_terminate.py
import multiprocessing
import time
def slow_worker():
print('Elaboratore in partenza')
time.sleep(0.1)
print('Elaborazione conclusa')
if __name__ == '__main__':
p = multiprocessing.Process(target=slow_worker)
print('PRIMA:', p, p.is_alive())
p.start()
print('DURANTE:', p, p.is_alive())
p.terminate()
print('TERMINATO:', p, p.is_alive())
p.join()
print('CON JOIN:', p, p.is_alive())
join()
con il processo dopo averlo terminato per poter dare al codice che gestisce il processo il tempo di aggiornare lo stato dell'oggetto per rifletterne l'eliminazione.$ python3 multiprocessing_terminate.py PRIMA: <Process name='Process-1' parent=8978 initial> False DURANTE: <Process name='Process-1' pid=8979 parent=8978 started> True TERMINATO: <Process name='Process-1' pid=8979 parent=8978 started> True CON JOIN: <Process name='Process-1' pid=8979 parent=8978 stopped exitcode=-SIGTERM> False
Stato di Uscita di un Processo
Il codice di stato prodotto quando un processo esce può essere indirizzato tramite l'attributo exitcode
. L'intervallo di valori consentito è elencato nella tabella che segue.
CODICE USCITA | DESCRIZIONE |
---|---|
== 0 |
nessun errore prodotto |
> 0 |
il processo ha un errore, ed è uscito con quel codice |
< 0 |
il processo è stato eliminato con un segnale di -1 * exitcode |
# multiprocessing_exitcode.py
import multiprocessing
import sys
import time
def exit_error():
sys.exit(1)
def exit_ok():
return
def return_value():
return 1
def raises():
raise RuntimeError('Si è verificato un errore!')
def terminated():
time.sleep(3)
if __name__ == '__main__':
jobs = []
funcs = [
exit_error,
exit_ok,
return_value,
raises,
terminated,
]
for f in funcs:
print('Processo in partenza per', f.__name__)
j = multiprocessing.Process(target=f, name=f.__name__)
jobs.append(j)
j.start()
jobs[-1].terminate()
for j in jobs:
j.join()
print('{:>15}.exitcode = {}'.format(j.name, j.exitcode))
I processi che sollevano automaticamente una eccezione ottengono un exitcode
di 1.
$ python3 multiprocessing_exitcode.py Processo in partenza per exit_error Processo in partenza per exit_ok Processo in partenza per return_value Processo in partenza per raises Processo in partenza per terminated exit_error.exitcode = 1 exit_ok.exitcode = 0 return_value.exitcode = 0 Process raises: Traceback (most recent call last): File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap self.run() File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run self._target(*self._args, **self._kwargs) File "multiprocessing_exitcode.py", line 21, in raises raise RuntimeError('Si è verificato un errore!') RuntimeError: Si è verificato un errore! raises.exitcode = 1 terminated.exitcode = -15
Registrazione
Quando si deve effettuare un debug per problemi di concorrenza, può essere utile avere accesso ai dati interni degli oggetti forniti da multiprocessing. Esiste una funzione di convenienza a livello di modulo per abilitare la registrazione chiamata log_to_stderr()
. Essa imposta un oggetto logger
usando logging ad aggiunge un gestore in modo che i messaggi registrati siano inviati al canale di errore standard.
# multiprocessing_log_to_stderr.py
import multiprocessing
import logging
import sys
def worker():
print('Esecuzione di qualche attività')
sys.stdout.flush()
if __name__ == '__main__':
multiprocessing.log_to_stderr(logging.DEBUG)
p = multiprocessing.Process(target=worker)
p.start()
p.join()
Nella modalità predefinita, il livello di registrazione è impostato a NOTSET
, quindi non viene prodotto alcun messaggio. Si passi un livello differente al logger in fase di inizializzazione per ottenere il livello di dettaglio desiderato.
$ python3 multiprocessing_log_to_stderr.py [INFO/Process-1] child process calling self.run() Esecuzione di qualche attività [INFO/Process-1] process shutting down [DEBUG/Process-1] running all "atexit" finalizers with priority >= 0 [DEBUG/Process-1] running the remaining "atexit" finalizers [INFO/Process-1] process exiting with exitcode 0 [INFO/MainProcess] process shutting down [DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0 [DEBUG/MainProcess] running the remaining "atexit" finalizers
Per manipolare direttamente il logger (modificare il livello o aggiungere gestori), si usi get_logger()
.
# multiprocessing_get_logger.py
import multiprocessing
import logging
import sys
def worker():
print('Esecuzione di qualche attività')
sys.stdout.flush()
if __name__ == '__main__':
multiprocessing.log_to_stderr()
logger = multiprocessing.get_logger()
logger.setLevel(logging.INFO)
p = multiprocessing.Process(target=worker)
p.start()
p.join()
Il logger può anche essere configurato tramite l'API di configurazione file di logging, usando il nome "multiprocessing
".
$ python3 multiprocessing_get_logger.py [INFO/Process-1] child process calling self.run() Esecuzione di qualche attività [INFO/Process-1] process shutting down [INFO/Process-1] process exiting with exitcode 0 [INFO/MainProcess] process shutting down
Derivare Processi
Sebbene il modo più semplice di far partire una elaborazione in un processo separato sia usare Process
passando una funzione obiettivo, è anche possibile usare una sottoclasse personalizzata.
# multiprocessing_subclass.py
import multiprocessing
class Worker(multiprocessing.Process):
def run(self):
print('In {}'.format(self.name))
return
if __name__ == '__main__':
jobs = []
for i in range(5):
p = Worker()
jobs.append(p)
p.start()
for j in jobs:
j.join()
La classe derivata dovrebbe riscrivere run()
per eseguire il proprio lavoro.
$ python3 multiprocessing_subclass.py In Worker-1 In Worker-2 In Worker-3 In Worker-4 In Worker-5
Passare Messaggi ai Processi
Analogamente ai thread, un modello di uso comune per processi multipli è dividere una attività tra diversi esecutori che sono in esecuzione in parallelo. Un uso efficace di processi multipli in genere richiede un certo livello di comunicazione tra di essi, in modo che il lavoro possa essere diviso e i risultati aggregati. Un semplice modo di comunicare tra processi con multiprocessing è usare un oggetto Queue
per passare messaggi. Qualunque oggetto che possa essere serializzato con pickle può essere passato attraverso Queue
.
# multiprocessing_queue.py
import multiprocessing
class MyFancyClass:
def __init__(self, name):
self.name = name
def do_something(self):
proc_name = multiprocessing.current_process().name
print('Qualcosa in esecuzione in {} per {}!'.format(
proc_name, self.name))
def worker(q):
obj = q.get()
obj.do_something()
if __name__ == '__main__':
queue = multiprocessing.Queue()
p = multiprocessing.Process(target=worker, args=(queue,))
p.start()
queue.put(MyFancyClass('Fancy Dan'))
# Wait for the worker to finish
queue.close()
queue.join_thread()
p.join()
Questo breve esempio passa solo un singolo messaggio a un singolo esecutore, poi il processo principale attende che l'esecutore finisca.
$ python3 multiprocessing_queue.py Qualcosa in esecuzione in Process-1 per Fancy Dan!
Un esempio più complesso mostra come gestire parecchi esecutori che consumano dati da un oggetto JoinableQueue
e passano i risultati di nuovo al processo genitore. Viene usata la tecnica della pillola avvelenata per fermare gli esecutori. Dopo l'impostazione dei compiti reali, il programma principale aggiunge una valore di "arresto" per ogni esecutore alla coda dei lavori. Quando un esecutore trova quel valore speciale esce dal suo ciclo di elaborazione. Il processo principale usa il metodo join()
della coda dei compiti per attendere che tutti i compiti siano terminati prima di elaborare i risultati.
# multiprocessing_producer_consumer.py
import multiprocessing
import time
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue
self.result_queue = result_queue
def run(self):
proc_name = self.name
while True:
next_task = self.task_queue.get()
if next_task is None:
# Pillola avvelenata provoca l'arresto
print('{}: In uscita'.format(proc_name))
self.task_queue.task_done()
break
print('{}: {}'.format(proc_name, next_task))
answer = next_task()
self.task_queue.task_done()
self.result_queue.put(answer)
class Task:
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # simula il tempo impiegato per eseguire il lavoro
return '{self.a} * {self.b} = {product}'.format(
self=self, product=self.a * self.b)
def __str__(self):
return '{self.a} * {self.b}'.format(self=self)
if __name__ == '__main__':
# Costituisce le code di comunicazione
tasks = multiprocessing.JoinableQueue()
results = multiprocessing.Queue()
# Partono i consulmatori
num_consumers = multiprocessing.cpu_count() * 2
print('Creazione di {} consumatori'.format(num_consumers))
consumers = [
Consumer(tasks, results)
for i in range(num_consumers)
]
for w in consumers:
w.start()
# Accodamento lavori
num_jobs = 10
for i in range(num_jobs):
tasks.put(Task(i, i))
# Aggiunge una pillola avvelenata per ogni consumatore
for i in range(num_consumers):
tasks.put(None)
# Attende la fine di tutti i compiti
tasks.join()
# Inizia la stampa dei risultati
while num_jobs:
result = results.get()
print('Resultato:', result)
num_jobs -= 1
Sebbene i lavori entrino nella coda in ordine, la loro esecuzione è parallelizzata in modo che non vi è garanzia circa l'ordine di completamento.
$ python3 multiprocessing_producer_consumer.py Creazione di 8 consumatori Consumer-1: 0 * 0 Consumer-2: 1 * 1 Consumer-3: 2 * 2 Consumer-4: 3 * 3 Consumer-5: 4 * 4 Consumer-6: 5 * 5 Consumer-7: 6 * 6 Consumer-8: 7 * 7 Consumer-1: 8 * 8 Consumer-2: 9 * 9 Consumer-6: In uscita Consumer-5: In uscita Consumer-4: In uscita Consumer-3: In uscita Consumer-7: In uscita Consumer-8: In uscita Consumer-1: In uscita Consumer-2: In uscita Resultato: 0 * 0 = 0 Resultato: 1 * 1 = 1 Resultato: 5 * 5 = 25 Resultato: 4 * 4 = 16 Resultato: 2 * 2 = 4 Resultato: 6 * 6 = 36 Resultato: 3 * 3 = 9 Resultato: 7 * 7 = 49 Resultato: 8 * 8 = 64 Resultato: 9 * 9 = 81
Segnalazioni tra Processi
La classe Event
fornisce un semplice modo per comunicare informazioni di stato tra i processi. Un evento può alternare il suo stato tra impostato e non impostato. Gli utenti dell'oggetto evento possono attendere che lo stesso passi da non impostato a impostato, usando un valore opzionale di pausa.
# multiprocessing_event.py
import multiprocessing
import time
def wait_for_event(e):
"""Attence che l'evento sia impostato prima di fare qualcosa"""
print('wait_for_event: in partenza')
e.wait()
print('wait_for_event: e.is_set()->', e.is_set())
def wait_for_event_timeout(e, t):
"""Attende t secondi, poi va in pausa"""
print('wait_for_event_timeout: starting')
e.wait(t)
print('wait_for_event_timeout: e.is_set()->', e.is_set())
if __name__ == '__main__':
e = multiprocessing.Event()
w1 = multiprocessing.Process(
name='block',
target=wait_for_event,
args=(e,),
)
w1.start()
w2 = multiprocessing.Process(
name='nonblock',
target=wait_for_event_timeout,
args=(e, 2),
)
w2.start()
print('principale: in attesa prima di chiamare Event.set()')
time.sleep(3)
e.set()
print('principale: evento impostato')
Quando wait()
termina il periodo di pausa ritorna senza errori. Il chiamante è responsabile per la verifica dello stato dell'evento usando is_set()
.
$ python3 multiprocessing_event.py principale: in attesa prima di chiamare Event.set() wait_for_event: in partenza wait_for_event_timeout: starting wait_for_event_timeout: e.is_set()-> False principale: evento impostato wait_for_event: e.is_set()-> True
Controllare Accesso alle Risorse
In situazioni dove una singola risorsa debba essere condivisa tra processi multipli, si può utilizzare un oggetto Lock
per evitare conflitti di accesso.
# multiprocessing_lock.py
import multiprocessing
import sys
def worker_with(lock, stream):
with lock:
stream.write('Lock acquisito via with\n')
def worker_no_with(lock, stream):
lock.acquire()
try:
stream.write('Lock acquisito direttamente\n')
finally:
lock.release()
lock = multiprocessing.Lock()
w = multiprocessing.Process(
target=worker_with,
args=(lock, sys.stdout),
)
nw = multiprocessing.Process(
target=worker_no_with,
args=(lock, sys.stdout),
)
w.start()
nw.start()
w.join()
nw.join()
In questo esempio, i messaggi stampati alla console possono essere mescolati assieme se i due processi non sincronizzano il proprio accesso al flusso in uscita con Lock
.
$ python3 multiprocessing_lock.py Lock acquisito via with Lock acquisito direttamente
Sincronizzare le Operazioni
Si possono usare oggetti Condition
per sincronizzare parti di flusso di lavoro in modo che alcune vengano eseguite in parallelo e altri in modo sequenziale, anche se sono in processi separati.
# multiprocessing_condition.py
import multiprocessing
import time
def stage_1(cond):
"""esegue il primo segmento del lavoro
poi notifica al secondo segmento stage_2 di continuare
"""
name = multiprocessing.current_process().name
print('In partenza', name)
with cond:
print('{} finito e pronto per il secondo segmento'.format(name))
cond.notify_all()
def stage_2(cond):
"""attende la condizione che dice che stage_1 è completato"""
name = multiprocessing.current_process().name
print('In partenza', name)
with cond:
cond.wait()
print('{} in esecuzione'.format(name))
if __name__ == '__main__':
condition = multiprocessing.Condition()
s1 = multiprocessing.Process(name='s1',
target=stage_1,
args=(condition,))
s2_clients = [
multiprocessing.Process(
name='stage_2[{}]'.format(i),
target=stage_2,
args=(condition,),
)
for i in range(1, 3)
]
for c in s2_clients:
c.start()
time.sleep(1)
s1.start()
s1.join()
for c in s2_clients:
c.join()
In questo esempio, due processi eseguono in parallelo il secondo segmento di un lavoro, ma solo dopo che il primo segmento è terminato.,.
$ python3 multiprocessing_condition.py In partenza stage_2[1] In partenza stage_2[2] In partenza s1 s1 finito e pronto per il secondo segmento stage_2[2] in esecuzione stage_2[1] in esecuzione
Controllare l'Accesso Simultaneo alle Risorse
Talvolta è utile consentire a più di un esecutore alla volta di accedere a una risorsa, limitando comunque il numero di accessi totale. Ad esempio un pool di connessioni potrebbe supportare un numero fisso di connessioni simultanee, oppure una applicazione di rete potrebbe supportare un numero fisso di scaricamenti concorrenti. Un modo per gestire queste connessioni è usare Semaphore
.
# multiprocessing_semaphore.py
import random
import multiprocessing
import time
class ActivePool:
def __init__(self):
super(ActivePool, self).__init__()
self.mgr = multiprocessing.Manager()
self.active = self.mgr.list()
self.lock = multiprocessing.Lock()
def makeActive(self, name):
with self.lock:
self.active.append(name)
def makeInactive(self, name):
with self.lock:
self.active.remove(name)
def __str__(self):
with self.lock:
return str(self.active)
def worker(s, pool):
name = multiprocessing.current_process().name
with s:
pool.makeActive(name)
print('Attivazione di {} ora in esecuzione {}'.format(
name, pool))
time.sleep(random.random())
pool.makeInactive(name)
if __name__ == '__main__':
pool = ActivePool()
s = multiprocessing.Semaphore(3)
jobs = [
multiprocessing.Process(
target=worker,
name=str(i),
args=(s, pool),
)
for i in range(10)
]
for j in jobs:
j.start()
while True:
alive = 0
for j in jobs:
if j.is_alive():
alive += 1
j.join(timeout=0.1)
print('Ora in esecuzione {}'.format(pool))
if alive == 0:
# tutto fatto
break
In questo esempio, la classe ActivePool
serve semplicemente come un sistema conveniente per tracciare quali processi sono in esecuzione a un dato momento. Un vero pool di risorse probabilmente avrebbe allocato una connessione o un qualche altro valore al nuovo processo attivo, e recuperato il valore alla fine dell'esecuzione del compito. Qui il pool è usato semplicemente per mantenere i nomi dei processi attivi per mostrare che sono tre sono in esecuzione contemporanea.
$ python3 multiprocessing_semaphore.py Attivazione di 0 ora in esecuzione ['0'] Attivazione di 2 ora in esecuzione ['0', '2', '1'] Attivazione di 1 ora in esecuzione ['0', '2', '1'] Ora in esecuzione ['0', '2', '1'] Attivazione di 3 ora in esecuzione ['2', '1', '3'] Ora in esecuzione ['2', '1', '3'] Ora in esecuzione ['2', '1', '3'] Ora in esecuzione ['2', '1', '3'] Attivazione di 4 ora in esecuzione ['1', '3', '4'] Ora in esecuzione ['1', '3', '4'] Ora in esecuzione ['1', '3', '4'] Attivazione di 5 ora in esecuzione ['1', '3', '5'] Ora in esecuzione ['1', '3', '5'] Attivazione di 6 ora in esecuzione ['1', '5', '6'] Attivazione di 8 ora in esecuzione ['5', '6', '8'] Attivazione di 9 ora in esecuzione ['6', '8', '9'] Ora in esecuzione ['6', '8', '9'] Ora in esecuzione ['6', '8', '9'] Attivazione di 7 ora in esecuzione ['8', '9', '7'] Ora in esecuzione ['8', '9', '7'] Ora in esecuzione ['9', '7'] Ora in esecuzione ['9', '7'] Ora in esecuzione ['9', '7'] Ora in esecuzione ['9', '7'] Ora in esecuzione ['9', '7'] Ora in esecuzione ['9', '7'] Ora in esecuzione ['7'] Ora in esecuzione []
Gestire lo Stato Condiviso
Nell'esempio precedente, la lista di processi attivi viene mantenuta centralmente nell'istanza ActivePool
grazie a un tipo di oggetto speciale creato da un Manager
. Questo oggetto è responsabile del coordinamento delle informazioni di stato condivise tra tutti gli utenti.
# multiprocessing_manager_dict.py
import multiprocessing
import pprint
def worker(d, key, value):
d[key] = value
if __name__ == '__main__':
mgr = multiprocessing.Manager()
d = mgr.dict()
jobs = [
multiprocessing.Process(
target=worker,
args=(d, i, i * 2),
)
for i in range(10)
]
for j in jobs:
j.start()
for j in jobs:
j.join()
print('Risultati:', d)
Creando la lista con il Manager
, essa viene condivisa e gli aggiornamenti sono visti da tutti i processi. Sono supportati anche i dizionari.
$ python3 multiprocessing_manager_dict.py Risultati: {0: 0, 1: 2, 2: 4, 5: 10, 3: 6, 6: 12, 7: 14, 9: 18, 8: 16, 4: 8}
Spazi dei nomi condivisi
Oltre ai dizionari e alle liste, Manager
può creare anche spazi dei nomi condivisi con Namespace
.
# multiprocessing_namespaces.py
import multiprocessing
def producer(ns, event):
ns.value = 'Questo è il valore'
event.set()
def consumer(ns, event):
try:
print('Prima dell\'evento: {}'.format(ns.value))
except Exception as err:
print('Prima dell\'evento, errore:', str(err))
event.wait()
print('Dopo l\'evento:', ns.value)
if __name__ == '__main__':
mgr = multiprocessing.Manager()
namespace = mgr.Namespace()
event = multiprocessing.Event()
p = multiprocessing.Process(
target=producer,
args=(namespace, event),
)
c = multiprocessing.Process(
target=consumer,
args=(namespace, event),
)
c.start()
p.start()
c.join()
p.join()
Qualunque valore aggiunto a Namespace
è visibile a chiunque riceva una istanza di Namespace
.
$ python3 multiprocessing_namespaces.py Prima dell'evento, errore: 'Namespace' object has no attribute 'value' Dopo l'evento: Questo è il valore
E' importante sapere che gli aggiornamenti al contenuto di valori mutabili nello spazio dei nomi non vengono propagati automaticamente.
# multiprocessing_namespaces_mutable.py
import multiprocessing
def producer(ns, event):
# DOES NOT UPDATE GLOBAL VALUE!
ns.my_list.append('Questo è il valroe')
event.set()
def consumer(ns, event):
print('Prima dell\' evento:', ns.my_list)
event.wait()
print('Dopo l\' evento :', ns.my_list)
if __name__ == '__main__':
mgr = multiprocessing.Manager()
namespace = mgr.Namespace()
namespace.my_list = []
event = multiprocessing.Event()
p = multiprocessing.Process(
target=producer,
args=(namespace, event),
)
c = multiprocessing.Process(
target=consumer,
args=(namespace, event),
)
c.start()
p.start()
c.join()
p.join()
Per aggiornare la lista occorre attaccare nuovamente allo spazio dei nomi l'oggetto.
$ python3 multiprocessing_namespaces_mutable.py Prima dell' evento: [] Dopo l' evento : []
Pool di Processi
La classe Pool
può essere usata per gestire un numero fisso di esecutori per casi semplici nei quali il lavoro da compiere può essere diviso e distribuito indipendentemente tra gli esecutori. I valori di ritorno dai lavori sono raccolti e ritornati come lista. Gli argomenti per Pool
includono il numero di processi e la funzione da eseguire quando si fa partire il compito (chiamata una volta per figlio).
# multiprocessing_pool.py
import multiprocessing
def do_calculation(data):
return data * 2
def start_process():
print('In partenza', multiprocessing.current_process().name)
if __name__ == '__main__':
inputs = list(range(10))
print('Input :', inputs)
builtin_outputs = map(do_calculation, inputs)
print('Built-in:', builtin_outputs)
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(
processes=pool_size,
initializer=start_process,
)
pool_outputs = pool.map(do_calculation, inputs)
pool.close() # compiti esauriti
pool.join() # chiude il task corrente
print('Pool :', pool_outputs)
Il risultato del metodo map()
è funzionalmente equivalente al metodo built-in map()
eccetto che i singoli compiti vengono eseguiti in parallelo. Visto che il pool sta elaborando i suoi input in parallelo, close()
e join()
possono essere usati per sincronizzare il processo principale con quelli che eseguono i compiti per consentire una corretta pulizia.
$ python3 multiprocessing_pool.py Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] Built-in: <map object at 0x7f1c15fcb3a0> In partenza ForkPoolWorker-1 In partenza ForkPoolWorker-2 In partenza ForkPoolWorker-3 In partenza ForkPoolWorker-4 In partenza ForkPoolWorker-5 In partenza ForkPoolWorker-6 In partenza ForkPoolWorker-7 In partenza ForkPoolWorker-8 Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Nella modalità predefinita, Pool
crea un numero fisso di processi esecutori e a essi passa lavori fino a quando sono terminati. L'impostazione del parametro maxtaskperchild
indica al pool di far ripartire un processo esecutore dopo che ha finito alcuni compiti, prevenendo esecutori che di lunga esecuzione dal consumare ulteriori risorse di sistema.
# multiprocessing_pool_maxtasksperchild.py
import multiprocessing
def do_calculation(data):
return data * 2
def start_process():
print('In partenza', multiprocessing.current_process().name)
if __name__ == '__main__':
inputs = list(range(10))
print('Input :', inputs)
builtin_outputs = map(do_calculation, inputs)
print('Built-in:', builtin_outputs)
pool_size = multiprocessing.cpu_count() * 2
pool = multiprocessing.Pool(
processes=pool_size,
initializer=start_process,
maxtasksperchild=2,
)
pool_outputs = pool.map(do_calculation, inputs)
pool.close() # compiti esauriti
pool.join()
print('Pool :', pool_outputs)
Il pool fa ripartire gli esecutori quando hanno completato i compiti assegnati, anche se non ci sono più lavori. In questo risultato, sono creati otto esecutori, anche se ci sono solo 10 compiti, e ciascun esecutore può completarne due alla volta.
$ python3 multiprocessing_pool_maxtasksperchild.py Input : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9] Built-in: <map object at 0x7fcb8c41b3a0> In partenza ForkPoolWorker-1 In partenza ForkPoolWorker-2 In partenza ForkPoolWorker-3 In partenza ForkPoolWorker-4 In partenza ForkPoolWorker-5 In partenza ForkPoolWorker-6 In partenza ForkPoolWorker-7 In partenza ForkPoolWorker-8 In partenza ForkPoolWorker-9 In partenza ForkPoolWorker-10 Pool : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Implementare MapReduce
La classe Pool
può essere usata per creare una semplice implementazione MapReduce di un server singolo. Sebbene non fornisca i pieni benefici di una distribuzione del processo, illustra quanto sia facile suddividere e distribuire alcuni problemi in unità di elaborazione.
In un sistema basato su MapReduce, i dati in entrata sono suddivisi in spezzoni da elaborare da diverse istanze di esecutori. Ogni spezzone viene mappato a uno stato intermedio usando una semplice trasformazione. I dati intermedi vengono poi raccolti insieme e partizionati in base a un valore chiave in modo che tutti i valori collegati siano insieme. In ultimo il dato partizionato viene ridotto a un insieme di risultati.
# multiprocessing_mapreduce.py
import collections
import itertools
import multiprocessing
class SimpleMapReduce:
def __init__(self, map_func, reduce_func, num_workers=None):
"""
map_func
Funzione che mappa gli input a dati intermedi. Riceve
come argomento un valore in input e ritorna una tupla
con la chiave e il valore da ridurre.
reduce_func
Funzione per ridurre la versione partizionata di dati
intermedi verso il risultato finale. Riceve come
argomento una chiave prodotta da map_func e una sequenza
dei valori associati a quella chiave
num_workers
Il numero di esecutori da creare nel pool. Il valore
predefinito è il numero di CPU disponibili nell'host
corrente.
"""
self.map_func = map_func
self.reduce_func = reduce_func
self.pool = multiprocessing.Pool(num_workers)
def partition(self, mapped_values):
"""Sistema i valori mappati in base alle loro chiavi.
Ritorna una sequenza non ordinata di tuple con una chiave
e una sequenza di valori
"""
partitioned_data = collections.defaultdict(list)
for key, value in mapped_values:
partitioned_data[key].append(value)
return partitioned_data.items()
def __call__(self, inputs, chunksize=1):
"""Elabora l'input tramite le funzioni di map e reduce
passate.
inputs
Un iterabile che contiene i dati in input da elaborare
chunksize=1
La porzione di dati in input da passare a ciascun esecutore
Può essere usato per sintonizzare le prestazioni durante
la fase di mappatura.
"""
map_responses = self.pool.map(
self.map_func,
inputs,
chunksize=chunksize,
)
partitioned_data = self.partition(
itertools.chain(*map_responses)
)
reduced_values = self.pool.map(
self.reduce_func,
partitioned_data,
)
return reduced_values
Lo script di esempio che segue usa SimpleMapReduce
per contare le parole più frequenti nel sorgente di questo articolo, ignorando i marcatori.
# multiprocessing_wordcount.py
import multiprocessing
import string
from multiprocessing_mapreduce import SimpleMapReduce
def file_to_words(filename):
"""Legge un file e ritorna una sequenza di valori
(parole, occorrenze).
"""
STOP_WORDS = set([word.strip() for word in open('ita_stopwords.txt')])
TR = str.maketrans({
p: ' '
for p in string.punctuation
})
print('{} in lettura {}'.format(
multiprocessing.current_process().name, filename))
output = []
with open(filename, 'rt') as f:
for line in f:
# Salta le righe di commento.
if line.lstrip().startswith(('<', '#', '$')):
continue
line = line.translate(TR) # Elimina la punteggiatura
for word in line.split():
word = word.lower()
if word.isalpha() and word not in STOP_WORDS:
output.append((word, 1))
return output
def count_words(item):
"""Converte i dati partizionati per una parola in una tupla
che contiene la parola e il numero di occorrenze.
"""
word, occurences = item
return (word, sum(occurences))
if __name__ == '__main__':
import operator
import glob
input_files = glob.glob('../tran/multiprocessing.xml')
mapper = SimpleMapReduce(file_to_words, count_words)
word_counts = mapper(input_files)
word_counts.sort(key=operator.itemgetter(1))
word_counts.reverse()
print('\nLE 20 PAROLE PIU\' FREQUENTI\n')
top20 = word_counts[:20]
longest = max(len(word) for word, count in top20)
for word, count in top20:
print('{word:<{len}}: {count:5}'.format(
len=longest + 1,
word=word,
count=count)
)
La funzione file_to_words
converte ogni file in input in una sequenza di tuple che contengono la parola e il numero 1
che rappresenta una singola occorrenza. I dati sono poi divisi da partition
usando la parola come chiave, in modo che la struttura risultante consiste in una chiave e una sequenza di valori 1
che rappresentano ciascuna occorrenza della parola. I dati partizionati sono convertiti in un insieme di tuple che contengono una parola e il conteggio per quella parola effettuato da count_words()
durante la fase di riduzione.
$ python3 multiprocessing_wordcount.py LE 20 PAROLE PIU' FREQUENTI Traceback (most recent call last): File "multiprocessing_wordcount.py", line 57, in <module> longest = max(len(word) for word, count in top20) ValueError: max() arg is an empty sequence
Vedere anche:
- multiprocessing
- La documentazione della libreria standard per questo modulo.
- threading
- API di alto livello per lvaorare con i thread
- MapReduce - Wikipedia
- Panoramica di MapReduce su Wikipedia
- MapReduce: Simplified Dsta Processing on Large Clusters
- Presentazione e documento su MapReduce da parte di Google Labs
- Operator
- Strumenti sugli operatori tipo
itemgetter