multiprocessing - Gestisce Processi come Thread

Scopo: Fornisce una API per la gestione dei processi

Il modulo multiprocessing fornisce un API per suddividere il lavoro tra processi molteplici basati sulla API di threading. In taluni casi multiprocessing può essere usato come rimpiazzo al posto di threading per trarre vantaggio dei core multipli di CPU per evitare colli di bottiglia computazionali associati con i bloccaggi dell'interprete globale di Python.

Viste le similarità, i primi pochi esempi di seguito sono tratti dagli esempi di threading. Le caratteristiche fornite da multiprocessing non disponibili in threading sono trattate successivamente.

Concetti base di multiprocessing

Il modo più semplice di generare un secondo processo è istanziare un oggetto Process con una funzione obiettivo. quindi chiamare start() per iniziare il lavoro.

# multiprocessing_simple.py

import multiprocessing


def worker():
    """Funzione elaboratore"""
    print('Elaboratore')


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker)
        jobs.append(p)
        p.start()

Il risultato include la parola "Worker" stampata cinque volte, sebbene possa non ancora essere completamente pulito, a seconda dell'ordine di esecuzione, visto che ogni processo sta tentando di accedere al flusso in uscita.

$ python3 multiprocessing_simple.py

Elaboratore
Elaboratore
Elaboratore
Elaboratore
Elaboratore

In genere è' più utile essere in grado di generare un processo con argomenti per dirgli che lavoro dovrà fare. A differenza di threading, per passare argomenti a un oggetto multiprocessing.Process, essi devono potere essere serializzati usando pickle. Questo esempio passa a ciascun elaboratore un numero da stampare.

# multiprocessing_simpleargs.py

import multiprocessing


def worker(num):
    """Funzione del thread elaboratore"""
    print('Elaboratore:', num)


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(target=worker, args=(i,))
        jobs.append(p)
        p.start()

L'intero passato come argomento ora viene incluso nel messaggio stampato da ogni elaboratore.

$ python3 multiprocessing_simpleargs.py

Elaboratore: 1
Elaboratore: 2
Elaboratore:Elaboratore:  43

Elaboratore: 0

Funzioni Obiettivo Importabili

Una differenza tra gli esempi per threading e multiprocessing è la protezione supplementare per __main__ usata negli esempi multiprocessing. A causa del modo con il quale i nuovi processi sono fatti partire, il processo figlio deve essere capace di importare lo script che contiene la funzione obiettivo. Impacchettando la parte principale dell'applicazione in una verifica per __main__ assicura che non venga eseguita in modo ricorsivo in ogni figlio quando il modulo viene importato. Un altro approccio è di importare la funzione obiettivo da uno script separato. Ad esempio multiprocessing_import_main.py usa una funzione elaboratore definita in un secondo modulo.

# multiprocessing_import_main.py

import multiprocessing
import multiprocessing_import_worker

if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = multiprocessing.Process(
            target=multiprocessing_import_worker.worker,
        )
        jobs.append(p)
        p.start()

La funzione elaboratore viene definita in multiprocessing_import_worker.py.

# multiprocessing_import_worker.py


def worker():
    """Funzione elaboratore"""
    print('Elaboratore')
    return

La chiamata del programma principale produce un risultato simile al primo esempio.

$ python3 multiprocessing_import_main.py

Elaboratore
Elaboratore
Elaboratore
Elaboratore
Elaboratore

Determinare il Processo Corrente

Passare argomenti per identificare o nominare il processo è difficoltoso e non necessario. Ogni istanza di Process ha un nome con un valore predefinito che può essere modificato alla creazione del processo. Attribuire nomi ai processi è utile per tenerne traccia, specialmente in applicazioni con molteplici tipi di processi in esecuzione simultaneamente.

# multiprocessing_names.py

import multiprocessing
import time


def worker():
    name = multiprocessing.current_process().name
    print(name, 'In partenza')
    time.sleep(2)
    print(name, 'In uscita')


def my_service():
    name = multiprocessing.current_process().name
    print(name, 'In partenza')
    time.sleep(3)
    print(name, 'In uscita')


if __name__ == '__main__':
    service = multiprocessing.Process(
        name='my_service',
        target=my_service,
    )
    worker_1 = multiprocessing.Process(
        name='Elaboratore 1',
        target=worker,
    )
    worker_2 = multiprocessing.Process(  # nome predefinito
        target=worker,
    )

    worker_1.start()
    worker_2.start()
    service.start()

Il risultato di debug include il nome del processo corrente su ogni riga. Le righe con Process-3 nella colonna del nome corrispondono al processo non nominato worker_2.

$ python3 multiprocessing_names.py

Elaboratore 1 In partenza
Process-3 In partenza
my_service In partenza
Process-3Elaboratore 1  In uscitaIn uscita

my_service In uscita

Processi Demone

Nella modalità predefinita, il programma principale non esce fino a quando non sono usciti tutti i figli. Ci sono volte nelle quali è utile far partire un processo in background che viene eseguito senza impedire al programma principale di uscire, come ad esempio in servizi dove potrebbe non essere facile interrompere l'elaboratore, o dove lasciarlo morire nel mezzo dell'esecuzione del suo lavoro non comporta perdita o corruzione di dati (ad esempio un compito che genera "battiti" per uno strumento di monitoraggio di servizi).

Per marcare un processo come demone si imposti il suo attributo daemon a True. Nella modalità predefinita i processi non sono demoni.

# multiprocessing_daemon.py

import multiprocessing
import time
import sys


def daemon():
    p = multiprocessing.current_process()
    print('In partenza:', p.name, p.pid)
    sys.stdout.flush()
    time.sleep(2)
    print('In uscita :', p.name, p.pid)
    sys.stdout.flush()


def non_daemon():
    p = multiprocessing.current_process()
    print('In partenza:', p.name, p.pid)
    sys.stdout.flush()
    print('In uscita :', p.name, p.pid)
    sys.stdout.flush()


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='demone',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non demone',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    time.sleep(1)
    n.start()

Il risultato non comprende il messaggio "In uscita" dal processo demone, visto che tutti i processi non demoni (compreso il programma principale) escono prima che il processo demone si attivi dopo la sua seconda pausa.

$ python3 multiprocessing_daemon.py

In partenza: demone 8965
In partenza: non demone 8966
In uscita : non demone 8966

Il processo demone viene terminato automaticamente prima che il programma principale esca, il che evita di lasciare processi orfani in esecuzione. Questo può essere verificato cercando l'identificativo del processo stampato quando il programma è in esecuzione, poi cercando quel processo con un comando tipo ps.

Attendere Processi

Per attendere fino a quando un processo ha completato il suo lavoro ed è uscito, si usi il metodo join().

# multiprocessing_daemon_join.py

import multiprocessing
import time
import sys


def daemon():
    name = multiprocessing.current_process().name
    print('In partenza:', name)
    time.sleep(2)
    print('In uscita :', name)


def non_daemon():
    name = multiprocessing.current_process().name
    print('In partenza:', name)
    print('In uscita :', name)


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    time.sleep(1)
    n.start()

    d.join()
    n.join()

Visto che usando join() il processo principale attende che il demone esca, il messaggio "In uscita" questa volta viene stampato.

$ python3 multiprocessing_daemon_join.py

In partenza: daemon
In partenza: non-daemon
In uscita : non-daemon
In uscita : daemon

Nella modalità predefinita, join() blocca a tempo indeterminato. E' anche possibile passare un argomento di timeout (un numero a virgola mobile che rappresenta il numero di secondi da attendere affinchè il processo divenga inattivo). Se il processo non si completa entro il periodo di timeout, join() ritorna comunque.

# multiprocessing_daemon_join_timeout.py

import multiprocessing
import time
import sys


def daemon():
    name = multiprocessing.current_process().name
    print('In partenza:', name)
    time.sleep(2)
    print('In uscita :', name)


def non_daemon():
    name = multiprocessing.current_process().name
    print('In partenza:', name)
    print('In uscita :', name)


if __name__ == '__main__':
    d = multiprocessing.Process(
        name='daemon',
        target=daemon,
    )
    d.daemon = True

    n = multiprocessing.Process(
        name='non-daemon',
        target=non_daemon,
    )
    n.daemon = False

    d.start()
    n.start()

    d.join(1)
    print('d.is_alive()', d.is_alive())
    n.join()

Visto che il tempo nel quale il demone è in pausa è maggiore rispetto al timeout passato, il processo è ancora "vivo" dopo che join() ritorna.

$ python3 multiprocessing_daemon_join_timeout.py

In partenza: daemon
In partenza: non-daemon
In uscita : non-daemon
d.is_alive() True

Terminare i Processi

Quantunque sia meglio usare il metodo della pillola avvelenata di segnalare a un processo che dovrebbe uscire (vedere Passare Messaggi ai Processi), se un processo appare bloccato potrebbe essere utile essere capaci di eliminarlo forzatamente. La chiamata di terminate() su di un oggetto processo elimina il processo figlio.

# multiprocessing_terminate.py

import multiprocessing
import time


def slow_worker():
    print('Elaboratore in partenza')
    time.sleep(0.1)
    print('Elaborazione conclusa')


if __name__ == '__main__':
    p = multiprocessing.Process(target=slow_worker)
    print('PRIMA:', p, p.is_alive())

    p.start()
    print('DURANTE:', p, p.is_alive())

    p.terminate()
    print('TERMINATO:', p, p.is_alive())

    p.join()
    print('CON JOIN:', p, p.is_alive())
E' importante usare join() con il processo dopo averlo terminato per poter dare al codice che gestisce il processo il tempo di aggiornare lo stato dell'oggetto per rifletterne l'eliminazione.
$ python3 multiprocessing_terminate.py

PRIMA: <Process name='Process-1' parent=8978 initial> False
DURANTE: <Process name='Process-1' pid=8979 parent=8978 started> True
TERMINATO: <Process name='Process-1' pid=8979 parent=8978 started> True
CON JOIN: <Process name='Process-1' pid=8979 parent=8978 stopped exitcode=-SIGTERM> False

Stato di Uscita di un Processo

Il codice di stato prodotto quando un processo esce può essere indirizzato tramite l'attributo exitcode. L'intervallo di valori consentito è elencato nella tabella che segue.

CODICE USCITA DESCRIZIONE
== 0 nessun errore prodotto
> 0 il processo ha un errore, ed è uscito con quel codice
< 0 il processo è stato eliminato con un segnale di -1 * exitcode
# multiprocessing_exitcode.py

import multiprocessing
import sys
import time


def exit_error():
    sys.exit(1)


def exit_ok():
    return


def return_value():
    return 1


def raises():
    raise RuntimeError('Si è verificato un errore!')


def terminated():
    time.sleep(3)


if __name__ == '__main__':
    jobs = []
    funcs = [
        exit_error,
        exit_ok,
        return_value,
        raises,
        terminated,
    ]
    for f in funcs:
        print('Processo in partenza per', f.__name__)
        j = multiprocessing.Process(target=f, name=f.__name__)
        jobs.append(j)
        j.start()

    jobs[-1].terminate()

    for j in jobs:
        j.join()
        print('{:>15}.exitcode = {}'.format(j.name, j.exitcode))

I processi che sollevano automaticamente una eccezione ottengono un exitcode di 1.

$ python3 multiprocessing_exitcode.py

Processo in partenza per exit_error
Processo in partenza per exit_ok
Processo in partenza per return_value
Processo in partenza per raises
Processo in partenza per terminated
     exit_error.exitcode = 1
        exit_ok.exitcode = 0
   return_value.exitcode = 0
Process raises:
Traceback (most recent call last):
  File "/usr/lib/python3.8/multiprocessing/process.py", line 315, in _bootstrap
    self.run()
  File "/usr/lib/python3.8/multiprocessing/process.py", line 108, in run
    self._target(*self._args, **self._kwargs)
  File "multiprocessing_exitcode.py", line 21, in raises
    raise RuntimeError('Si è verificato un errore!')
RuntimeError: Si è verificato un errore!
         raises.exitcode = 1
     terminated.exitcode = -15

Registrazione

Quando si deve effettuare un debug per problemi di concorrenza, può essere utile avere accesso ai dati interni degli oggetti forniti da multiprocessing. Esiste una funzione di convenienza a livello di modulo per abilitare la registrazione chiamata log_to_stderr(). Essa imposta un oggetto logger usando logging ad aggiunge un gestore in modo che i messaggi registrati siano inviati al canale di errore standard.

# multiprocessing_log_to_stderr.py

import multiprocessing
import logging
import sys


def worker():
    print('Esecuzione di qualche attività')
    sys.stdout.flush()


if __name__ == '__main__':
    multiprocessing.log_to_stderr(logging.DEBUG)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()

Nella modalità predefinita, il livello di registrazione è impostato a NOTSET, quindi non viene prodotto alcun messaggio. Si passi un livello differente al logger in fase di inizializzazione per ottenere il livello di dettaglio desiderato.

$ python3 multiprocessing_log_to_stderr.py

[INFO/Process-1] child process calling self.run()
Esecuzione di qualche attività
[INFO/Process-1] process shutting down
[DEBUG/Process-1] running all "atexit" finalizers with priority >= 0
[DEBUG/Process-1] running the remaining "atexit" finalizers
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down
[DEBUG/MainProcess] running all "atexit" finalizers with priority >= 0
[DEBUG/MainProcess] running the remaining "atexit" finalizers

Per manipolare direttamente il logger (modificare il livello o aggiungere gestori), si usi get_logger().

# multiprocessing_get_logger.py

import multiprocessing
import logging
import sys


def worker():
    print('Esecuzione di qualche attività')
    sys.stdout.flush()


if __name__ == '__main__':
    multiprocessing.log_to_stderr()
    logger = multiprocessing.get_logger()
    logger.setLevel(logging.INFO)
    p = multiprocessing.Process(target=worker)
    p.start()
    p.join()

Il logger può anche essere configurato tramite l'API di configurazione file di logging, usando il nome "multiprocessing".

$ python3 multiprocessing_get_logger.py

[INFO/Process-1] child process calling self.run()
Esecuzione di qualche attività
[INFO/Process-1] process shutting down
[INFO/Process-1] process exiting with exitcode 0
[INFO/MainProcess] process shutting down

Derivare Processi

Sebbene il modo più semplice di far partire una elaborazione in un processo separato sia usare Process passando una funzione obiettivo, è anche possibile usare una sottoclasse personalizzata.

# multiprocessing_subclass.py
import multiprocessing


class Worker(multiprocessing.Process):

    def run(self):
        print('In {}'.format(self.name))
        return


if __name__ == '__main__':
    jobs = []
    for i in range(5):
        p = Worker()
        jobs.append(p)
        p.start()
    for j in jobs:
        j.join()

La classe derivata dovrebbe riscrivere run() per eseguire il proprio lavoro.

$ python3 multiprocessing_subclass.py

In Worker-1
In Worker-2
In Worker-3
In Worker-4
In Worker-5

.

Passare Messaggi ai Processi

Analogamente ai thread, un modello di uso comune per processi multipli è dividere una attività tra diversi esecutori che sono in esecuzione in parallelo. Un uso efficace di processi multipli in genere richiede un certo livello di comunicazione tra di essi, in modo che il lavoro possa essere diviso e i risultati aggregati. Un semplice modo di comunicare tra processi con multiprocessing è usare un oggetto Queue per passare messaggi. Qualunque oggetto che possa essere serializzato con pickle può essere passato attraverso Queue.

# multiprocessing_queue.py

import multiprocessing


class MyFancyClass:

    def __init__(self, name):
        self.name = name

    def do_something(self):
        proc_name = multiprocessing.current_process().name
        print('Qualcosa in esecuzione in {} per {}!'.format(
            proc_name, self.name))


def worker(q):
    obj = q.get()
    obj.do_something()


if __name__ == '__main__':
    queue = multiprocessing.Queue()

    p = multiprocessing.Process(target=worker, args=(queue,))
    p.start()

    queue.put(MyFancyClass('Fancy Dan'))

    # Wait for the worker to finish
    queue.close()
    queue.join_thread()
    p.join()

Questo breve esempio passa solo un singolo messaggio a un singolo esecutore, poi il processo principale attende che l'esecutore finisca.

$ python3 multiprocessing_queue.py

Qualcosa in esecuzione in Process-1 per Fancy Dan!

Un esempio più complesso mostra come gestire parecchi esecutori che consumano dati da un oggetto JoinableQueue e passano i risultati di nuovo al processo genitore. Viene usata la tecnica della pillola avvelenata per fermare gli esecutori. Dopo l'impostazione dei compiti reali, il programma principale aggiunge una valore di "arresto" per ogni esecutore alla coda dei lavori. Quando un esecutore trova quel valore speciale esce dal suo ciclo di elaborazione. Il processo principale usa il metodo join() della coda dei compiti per attendere che tutti i compiti siano terminati prima di elaborare i risultati.

# multiprocessing_producer_consumer.py

import multiprocessing
import time


class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue
        self.result_queue = result_queue

    def run(self):
        proc_name = self.name
        while True:
            next_task = self.task_queue.get()
            if next_task is None:
                # Pillola avvelenata provoca l'arresto
                print('{}: In uscita'.format(proc_name))
                self.task_queue.task_done()
                break
            print('{}: {}'.format(proc_name, next_task))
            answer = next_task()
            self.task_queue.task_done()
            self.result_queue.put(answer)


class Task:

    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        time.sleep(0.1)  # simula il tempo impiegato per eseguire il lavoro
        return '{self.a} * {self.b} = {product}'.format(
            self=self, product=self.a * self.b)

    def __str__(self):
        return '{self.a} * {self.b}'.format(self=self)


if __name__ == '__main__':
    # Costituisce le code di comunicazione
    tasks = multiprocessing.JoinableQueue()
    results = multiprocessing.Queue()

    # Partono i consulmatori
    num_consumers = multiprocessing.cpu_count() * 2
    print('Creazione di {} consumatori'.format(num_consumers))
    consumers = [
        Consumer(tasks, results)
        for i in range(num_consumers)
    ]
    for w in consumers:
        w.start()

    # Accodamento lavori
    num_jobs = 10
    for i in range(num_jobs):
        tasks.put(Task(i, i))

    # Aggiunge una pillola avvelenata per ogni consumatore
    for i in range(num_consumers):
        tasks.put(None)

    # Attende la fine di tutti i compiti
    tasks.join()

    # Inizia la stampa dei risultati
    while num_jobs:
        result = results.get()
        print('Resultato:', result)
        num_jobs -= 1

Sebbene i lavori entrino nella coda in ordine, la loro esecuzione è parallelizzata in modo che non vi è garanzia circa l'ordine di completamento.

$ python3 multiprocessing_producer_consumer.py

Creazione di 8 consumatori
Consumer-1: 0 * 0
Consumer-2: 1 * 1
Consumer-3: 2 * 2
Consumer-4: 3 * 3
Consumer-5: 4 * 4
Consumer-6: 5 * 5
Consumer-7: 6 * 6
Consumer-8: 7 * 7
Consumer-1: 8 * 8
Consumer-2: 9 * 9
Consumer-6: In uscita
Consumer-5: In uscita
Consumer-4: In uscita
Consumer-3: In uscita
Consumer-7: In uscita
Consumer-8: In uscita
Consumer-1: In uscita
Consumer-2: In uscita
Resultato: 0 * 0 = 0
Resultato: 1 * 1 = 1
Resultato: 5 * 5 = 25
Resultato: 4 * 4 = 16
Resultato: 2 * 2 = 4
Resultato: 6 * 6 = 36
Resultato: 3 * 3 = 9
Resultato: 7 * 7 = 49
Resultato: 8 * 8 = 64
Resultato: 9 * 9 = 81

Segnalazioni tra Processi

La classe Event fornisce un semplice modo per comunicare informazioni di stato tra i processi. Un evento può alternare il suo stato tra impostato e non impostato. Gli utenti dell'oggetto evento possono attendere che lo stesso passi da non impostato a impostato, usando un valore opzionale di pausa.

# multiprocessing_event.py

import multiprocessing
import time


def wait_for_event(e):
    """Attence che l'evento sia impostato prima di fare qualcosa"""
    print('wait_for_event: in partenza')
    e.wait()
    print('wait_for_event: e.is_set()->', e.is_set())


def wait_for_event_timeout(e, t):
    """Attende t secondi, poi va in pausa"""
    print('wait_for_event_timeout: starting')
    e.wait(t)
    print('wait_for_event_timeout: e.is_set()->', e.is_set())


if __name__ == '__main__':
    e = multiprocessing.Event()
    w1 = multiprocessing.Process(
        name='block',
        target=wait_for_event,
        args=(e,),
    )
    w1.start()

    w2 = multiprocessing.Process(
        name='nonblock',
        target=wait_for_event_timeout,
        args=(e, 2),
    )
    w2.start()

    print('principale: in attesa prima di chiamare Event.set()')
    time.sleep(3)
    e.set()
    print('principale: evento impostato')

Quando wait() termina il periodo di pausa ritorna senza errori. Il chiamante è responsabile per la verifica dello stato dell'evento usando is_set().

$ python3 multiprocessing_event.py

principale: in attesa prima di chiamare Event.set()
wait_for_event: in partenza
wait_for_event_timeout: starting
wait_for_event_timeout: e.is_set()-> False
principale: evento impostato
wait_for_event: e.is_set()-> True

Controllare Accesso alle Risorse

In situazioni dove una singola risorsa debba essere condivisa tra processi multipli, si può utilizzare un oggetto Lock per evitare conflitti di accesso.

# multiprocessing_lock.py

import multiprocessing
import sys


def worker_with(lock, stream):
    with lock:
        stream.write('Lock acquisito via with\n')


def worker_no_with(lock, stream):
    lock.acquire()
    try:
        stream.write('Lock acquisito direttamente\n')
    finally:
        lock.release()


lock = multiprocessing.Lock()
w = multiprocessing.Process(
    target=worker_with,
    args=(lock, sys.stdout),
)
nw = multiprocessing.Process(
    target=worker_no_with,
    args=(lock, sys.stdout),
)

w.start()
nw.start()

w.join()
nw.join()

In questo esempio, i messaggi stampati alla console possono essere mescolati assieme se i due processi non sincronizzano il proprio accesso al flusso in uscita con Lock.

$ python3 multiprocessing_lock.py

Lock acquisito via with
Lock acquisito direttamente

Sincronizzare le Operazioni

Si possono usare oggetti Condition per sincronizzare parti di flusso di lavoro in modo che alcune vengano eseguite in parallelo e altri in modo sequenziale, anche se sono in processi separati.

# multiprocessing_condition.py

import multiprocessing
import time


def stage_1(cond):
    """esegue il primo segmento del lavoro
    poi notifica al secondo segmento stage_2 di continuare
    """
    name = multiprocessing.current_process().name
    print('In partenza', name)
    with cond:
        print('{} finito e pronto per il secondo segmento'.format(name))
        cond.notify_all()


def stage_2(cond):
    """attende la condizione che dice che stage_1 è completato"""
    name = multiprocessing.current_process().name
    print('In partenza', name)
    with cond:
        cond.wait()
        print('{} in esecuzione'.format(name))


if __name__ == '__main__':
    condition = multiprocessing.Condition()
    s1 = multiprocessing.Process(name='s1',
                                 target=stage_1,
                                 args=(condition,))
    s2_clients = [
        multiprocessing.Process(
            name='stage_2[{}]'.format(i),
            target=stage_2,
            args=(condition,),
        )
        for i in range(1, 3)
    ]

    for c in s2_clients:
        c.start()
        time.sleep(1)
    s1.start()

    s1.join()
    for c in s2_clients:
        c.join()

In questo esempio, due processi eseguono in parallelo il secondo segmento di un lavoro, ma solo dopo che il primo segmento è terminato.,.

$ python3 multiprocessing_condition.py

In partenza stage_2[1]
In partenza stage_2[2]
In partenza s1
s1 finito e pronto per il secondo segmento
stage_2[2] in esecuzione
stage_2[1] in esecuzione

Controllare l'Accesso Simultaneo alle Risorse

Talvolta è utile consentire a più di un esecutore alla volta di accedere a una risorsa, limitando comunque il numero di accessi totale. Ad esempio un pool di connessioni potrebbe supportare un numero fisso di connessioni simultanee, oppure una applicazione di rete potrebbe supportare un numero fisso di scaricamenti concorrenti. Un modo per gestire queste connessioni è usare Semaphore.

# multiprocessing_semaphore.py

import random
import multiprocessing
import time


class ActivePool:

    def __init__(self):
        super(ActivePool, self).__init__()
        self.mgr = multiprocessing.Manager()
        self.active = self.mgr.list()
        self.lock = multiprocessing.Lock()

    def makeActive(self, name):
        with self.lock:
            self.active.append(name)

    def makeInactive(self, name):
        with self.lock:
            self.active.remove(name)

    def __str__(self):
        with self.lock:
            return str(self.active)


def worker(s, pool):
    name = multiprocessing.current_process().name
    with s:
        pool.makeActive(name)
        print('Attivazione di {} ora in esecuzione {}'.format(
            name, pool))
        time.sleep(random.random())
        pool.makeInactive(name)


if __name__ == '__main__':
    pool = ActivePool()
    s = multiprocessing.Semaphore(3)
    jobs = [
        multiprocessing.Process(
            target=worker,
            name=str(i),
            args=(s, pool),
        )
        for i in range(10)
    ]

    for j in jobs:
        j.start()

    while True:
        alive = 0
        for j in jobs:
            if j.is_alive():
                alive += 1
                j.join(timeout=0.1)
                print('Ora in esecuzione {}'.format(pool))
        if alive == 0:
            # tutto fatto
            break

In questo esempio, la classe ActivePool serve semplicemente come un sistema conveniente per tracciare quali processi sono in esecuzione a un dato momento. Un vero pool di risorse probabilmente avrebbe allocato una connessione o un qualche altro valore al nuovo processo attivo, e recuperato il valore alla fine dell'esecuzione del compito. Qui il pool è usato semplicemente per mantenere i nomi dei processi attivi per mostrare che sono tre sono in esecuzione contemporanea.

$ python3 multiprocessing_semaphore.py

Attivazione di 0 ora in esecuzione ['0']
Attivazione di 2 ora in esecuzione ['0', '2', '1']
Attivazione di 1 ora in esecuzione ['0', '2', '1']
Ora in esecuzione ['0', '2', '1']
Attivazione di 3 ora in esecuzione ['2', '1', '3']
Ora in esecuzione ['2', '1', '3']
Ora in esecuzione ['2', '1', '3']
Ora in esecuzione ['2', '1', '3']
Attivazione di 4 ora in esecuzione ['1', '3', '4']
Ora in esecuzione ['1', '3', '4']
Ora in esecuzione ['1', '3', '4']
Attivazione di 5 ora in esecuzione ['1', '3', '5']
Ora in esecuzione ['1', '3', '5']
Attivazione di 6 ora in esecuzione ['1', '5', '6']
Attivazione di 8 ora in esecuzione ['5', '6', '8']
Attivazione di 9 ora in esecuzione ['6', '8', '9']
Ora in esecuzione ['6', '8', '9']
Ora in esecuzione ['6', '8', '9']
Attivazione di 7 ora in esecuzione ['8', '9', '7']
Ora in esecuzione ['8', '9', '7']
Ora in esecuzione ['9', '7']
Ora in esecuzione ['9', '7']
Ora in esecuzione ['9', '7']
Ora in esecuzione ['9', '7']
Ora in esecuzione ['9', '7']
Ora in esecuzione ['9', '7']
Ora in esecuzione ['7']
Ora in esecuzione []

Gestire lo Stato Condiviso

Nell'esempio precedente, la lista di processi attivi viene mantenuta centralmente nell'istanza ActivePool grazie a un tipo di oggetto speciale creato da un Manager. Questo oggetto è responsabile del coordinamento delle informazioni di stato condivise tra tutti gli utenti.

# multiprocessing_manager_dict.py

import multiprocessing
import pprint


def worker(d, key, value):
    d[key] = value


if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    d = mgr.dict()
    jobs = [
        multiprocessing.Process(
            target=worker,
            args=(d, i, i * 2),
        )
        for i in range(10)
    ]
    for j in jobs:
        j.start()
    for j in jobs:
        j.join()
    print('Risultati:', d)

Creando la lista con il Manager, essa viene condivisa e gli aggiornamenti sono visti da tutti i processi. Sono supportati anche i dizionari.

$ python3 multiprocessing_manager_dict.py

Risultati: {0: 0, 1: 2, 2: 4, 5: 10, 3: 6, 6: 12, 7: 14, 9: 18, 8: 16, 4: 8}

Spazi dei nomi condivisi

Oltre ai dizionari e alle liste, Manager può creare anche spazi dei nomi condivisi con Namespace.

# multiprocessing_namespaces.py

import multiprocessing


def producer(ns, event):
    ns.value = 'Questo è il valore'
    event.set()


def consumer(ns, event):
    try:
        print('Prima dell\'evento: {}'.format(ns.value))
    except Exception as err:
        print('Prima dell\'evento, errore:', str(err))
    event.wait()
    print('Dopo l\'evento:', ns.value)


if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    event = multiprocessing.Event()
    p = multiprocessing.Process(
        target=producer,
        args=(namespace, event),
    )
    c = multiprocessing.Process(
        target=consumer,
        args=(namespace, event),
    )

    c.start()
    p.start()

    c.join()
    p.join()

Qualunque valore aggiunto a Namespace è visibile a chiunque riceva una istanza di Namespace.

$ python3 multiprocessing_namespaces.py

Prima dell'evento, errore: 'Namespace' object has no attribute 'value'
Dopo l'evento: Questo è il valore

E' importante sapere che gli aggiornamenti al contenuto di valori mutabili nello spazio dei nomi non vengono propagati automaticamente.

# multiprocessing_namespaces_mutable.py

import multiprocessing


def producer(ns, event):
    # DOES NOT UPDATE GLOBAL VALUE!
    ns.my_list.append('Questo è il valroe')
    event.set()


def consumer(ns, event):
    print('Prima dell\' evento:', ns.my_list)
    event.wait()
    print('Dopo l\' evento :', ns.my_list)


if __name__ == '__main__':
    mgr = multiprocessing.Manager()
    namespace = mgr.Namespace()
    namespace.my_list = []

    event = multiprocessing.Event()
    p = multiprocessing.Process(
        target=producer,
        args=(namespace, event),
    )
    c = multiprocessing.Process(
        target=consumer,
        args=(namespace, event),
    )

    c.start()
    p.start()

    c.join()
    p.join()

Per aggiornare la lista occorre attaccare nuovamente allo spazio dei nomi l'oggetto.

$ python3 multiprocessing_namespaces_mutable.py

Prima dell' evento: []
Dopo l' evento : []

Pool di Processi

La classe Pool può essere usata per gestire un numero fisso di esecutori per casi semplici nei quali il lavoro da compiere può essere diviso e distribuito indipendentemente tra gli esecutori. I valori di ritorno dai lavori sono raccolti e ritornati come lista. Gli argomenti per Pool includono il numero di processi e la funzione da eseguire quando si fa partire il compito (chiamata una volta per figlio).

# multiprocessing_pool.py

import multiprocessing


def do_calculation(data):
    return data * 2


def start_process():
    print('In partenza', multiprocessing.current_process().name)


if __name__ == '__main__':
    inputs = list(range(10))
    print('Input   :', inputs)

    builtin_outputs = map(do_calculation, inputs)
    print('Built-in:', builtin_outputs)

    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(
        processes=pool_size,
        initializer=start_process,
    )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()  # compiti esauriti
    pool.join()  # chiude il task corrente

    print('Pool    :', pool_outputs)

Il risultato del metodo map() è funzionalmente equivalente al metodo built-in map() eccetto che i singoli compiti vengono eseguiti in parallelo. Visto che il pool sta elaborando i suoi input in parallelo, close() e join() possono essere usati per sincronizzare il processo principale con quelli che eseguono i compiti per consentire una corretta pulizia.

$ python3 multiprocessing_pool.py

Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: <map object at 0x7f1c15fcb3a0>
In partenza ForkPoolWorker-1
In partenza ForkPoolWorker-2
In partenza ForkPoolWorker-3
In partenza ForkPoolWorker-4
In partenza ForkPoolWorker-5
In partenza ForkPoolWorker-6
In partenza ForkPoolWorker-7
In partenza ForkPoolWorker-8
Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Nella modalità predefinita, Pool crea un numero fisso di processi esecutori e a essi passa lavori fino a quando sono terminati. L'impostazione del parametro maxtaskperchild indica al pool di far ripartire un processo esecutore dopo che ha finito alcuni compiti, prevenendo esecutori che di lunga esecuzione dal consumare ulteriori risorse di sistema.

# multiprocessing_pool_maxtasksperchild.py

import multiprocessing


def do_calculation(data):
    return data * 2


def start_process():
    print('In partenza', multiprocessing.current_process().name)


if __name__ == '__main__':
    inputs = list(range(10))
    print('Input   :', inputs)

    builtin_outputs = map(do_calculation, inputs)
    print('Built-in:', builtin_outputs)

    pool_size = multiprocessing.cpu_count() * 2
    pool = multiprocessing.Pool(
        processes=pool_size,
        initializer=start_process,
        maxtasksperchild=2,
    )
    pool_outputs = pool.map(do_calculation, inputs)
    pool.close()  # compiti esauriti
    pool.join()

    print('Pool    :', pool_outputs)

Il pool fa ripartire gli esecutori quando hanno completato i compiti assegnati, anche se non ci sono più lavori. In questo risultato, sono creati otto esecutori, anche se ci sono solo 10 compiti, e ciascun esecutore può completarne due alla volta.

$ python3 multiprocessing_pool_maxtasksperchild.py

Input   : [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Built-in: <map object at 0x7fcb8c41b3a0>
In partenza ForkPoolWorker-1
In partenza ForkPoolWorker-2
In partenza ForkPoolWorker-3
In partenza ForkPoolWorker-4
In partenza ForkPoolWorker-5
In partenza ForkPoolWorker-6
In partenza ForkPoolWorker-7
In partenza ForkPoolWorker-8
In partenza ForkPoolWorker-9
In partenza ForkPoolWorker-10
Pool    : [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Implementare MapReduce

La classe Pool può essere usata per creare una semplice implementazione MapReduce di un server singolo. Sebbene non fornisca i pieni benefici di una distribuzione del processo, illustra quanto sia facile suddividere e distribuire alcuni problemi in unità di elaborazione.

In un sistema basato su MapReduce, i dati in entrata sono suddivisi in spezzoni da elaborare da diverse istanze di esecutori. Ogni spezzone viene mappato a uno stato intermedio usando una semplice trasformazione. I dati intermedi vengono poi raccolti insieme e partizionati in base a un valore chiave in modo che tutti i valori collegati siano insieme. In ultimo il dato partizionato viene ridotto a un insieme di risultati.

# multiprocessing_mapreduce.py

import collections
import itertools
import multiprocessing


class SimpleMapReduce:

    def __init__(self, map_func, reduce_func, num_workers=None):
        """
        map_func

          Funzione che mappa gli input a dati intermedi. Riceve
          come argomento un valore in input e ritorna una tupla
          con la chiave e il valore da ridurre.

        reduce_func

          Funzione per ridurre la versione partizionata di dati
          intermedi verso il risultato finale. Riceve come
          argomento una chiave prodotta da map_func e una sequenza
          dei valori associati a quella chiave

        num_workers

          Il numero di esecutori da creare nel pool. Il valore
          predefinito è il numero di CPU disponibili nell'host
          corrente.
        """
        self.map_func = map_func
        self.reduce_func = reduce_func
        self.pool = multiprocessing.Pool(num_workers)

    def partition(self, mapped_values):
        """Sistema i valori mappati in base alle loro chiavi.
        Ritorna una sequenza non ordinata di tuple con una chiave
        e una sequenza di valori
        """
        partitioned_data = collections.defaultdict(list)
        for key, value in mapped_values:
            partitioned_data[key].append(value)
        return partitioned_data.items()

    def __call__(self, inputs, chunksize=1):
        """Elabora l'input tramite le funzioni di map e reduce
        passate.

        inputs
          Un iterabile che contiene i dati in input da elaborare

        chunksize=1
          La porzione di dati in input da passare a ciascun esecutore
          Può essere usato per sintonizzare le prestazioni durante
          la fase di mappatura.
        """
        map_responses = self.pool.map(
            self.map_func,
            inputs,
            chunksize=chunksize,
        )
        partitioned_data = self.partition(
            itertools.chain(*map_responses)
        )
        reduced_values = self.pool.map(
            self.reduce_func,
            partitioned_data,
        )
        return reduced_values

Lo script di esempio che segue usa SimpleMapReduce per contare le parole più frequenti nel sorgente di questo articolo, ignorando i marcatori.

# multiprocessing_wordcount.py

import multiprocessing
import string

from multiprocessing_mapreduce import SimpleMapReduce


def file_to_words(filename):
    """Legge un file e ritorna una sequenza di valori
    (parole, occorrenze).
    """
    STOP_WORDS = set([word.strip() for word in open('ita_stopwords.txt')])
    TR = str.maketrans({
        p: ' '
        for p in string.punctuation
    })

    print('{} in lettura {}'.format(
        multiprocessing.current_process().name, filename))
    output = []

    with open(filename, 'rt') as f:
        for line in f:
            # Salta le righe di commento.
            if line.lstrip().startswith(('<', '#', '$')):
                continue
            line = line.translate(TR)  # Elimina la punteggiatura
            for word in line.split():
                word = word.lower()
                if word.isalpha() and word not in STOP_WORDS:
                    output.append((word, 1))
    return output


def count_words(item):
    """Converte i dati partizionati per una parola in una tupla
    che contiene la parola e il numero di occorrenze.
    """
    word, occurences = item
    return (word, sum(occurences))


if __name__ == '__main__':
    import operator
    import glob

    input_files = glob.glob('../tran/multiprocessing.xml')

    mapper = SimpleMapReduce(file_to_words, count_words)
    word_counts = mapper(input_files)
    word_counts.sort(key=operator.itemgetter(1))
    word_counts.reverse()

    print('\nLE 20 PAROLE PIU\' FREQUENTI\n')
    top20 = word_counts[:20]
    longest = max(len(word) for word, count in top20)
    for word, count in top20:
        print('{word:<{len}}: {count:5}'.format(
            len=longest + 1,
            word=word,
            count=count)
        )

La funzione file_to_words converte ogni file in input in una sequenza di tuple che contengono la parola e il numero 1 che rappresenta una singola occorrenza. I dati sono poi divisi da partition usando la parola come chiave, in modo che la struttura risultante consiste in una chiave e una sequenza di valori 1 che rappresentano ciascuna occorrenza della parola. I dati partizionati sono convertiti in un insieme di tuple che contengono una parola e il conteggio per quella parola effettuato da count_words() durante la fase di riduzione.

$ python3 multiprocessing_wordcount.py


LE 20 PAROLE PIU' FREQUENTI

Traceback (most recent call last):
  File "multiprocessing_wordcount.py", line 57, in <module>
    longest = max(len(word) for word, count in top20)
ValueError: max() arg is an empty sequence

Vedere anche:

multiprocessing
La documentazione della libreria standard per questo modulo.
threading
API di alto livello per lvaorare con i thread
MapReduce - Wikipedia
Panoramica di MapReduce su Wikipedia
MapReduce: Simplified Dsta Processing on Large Clusters
Presentazione e documento su MapReduce da parte di Google Labs
Operator
Strumenti sugli operatori tipo itemgetter