select - Gestisce con Efficienza l'Attesa per I/O

Scopo: Attende la notifica che un canale input od output è pronto.

Il modulo select fornisce accesso alle funzioni di monitoraggio I/O specifiche alla piattaforma. L'interfaccia più portabile è la funzione POSIX select(), la quale è disponibile su Unix e Windows. Il modulo include anche poll(), una API solo per Unix, e parecchie opzioni che funzionano solo con varianti specifiche di Unix.

Il nuovo modulo selectors fornisce una interfaccia a livello più alto costruita sopra l'API in select. E' più facile costruire codice portabile usando selectors, quindi si utilizzi quel modulo a meno che l'API di basso livello fornita da select sia in qualche modo richiesta.

Usare select()

La funzione select() di Python è una interfaccia diretta all'implementazione del sistema operativo sottostante. Monitora socket, file aperti e pipe (qualunque cosa con un metodo fileno() che ritorni un descrittore di file valido) fino a che diventano leggibili o scrivibili oppure si manifesta un errore di comunicazione. select() facilita il monitoraggio di connessioni multiple allo stesso tempo, ed è più efficiente rispetto alla scrivere una interrogazione ciclica in Python utilizzando i timeout del socket, in quanto il monitoraggio avviene nello strato di rete del sistema operativo, in luogo dell'interprete.

L'uso di oggetti Python di tipo file con select() funziona in Unix ma non è supportato in Windows

Questo esempio di server che ritorna i dati ricevuti, monitora più di una connessione alla volta usando select(). Questa versione inizia creando un socket TCP/IP non bloccante e lo configura in ascolto su di un indirizzo.

# select_echo_server.py

import select
import socket
import sys
import queue

# Crea un socket TCP/IP
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)

# Collega il socket alola  port
server_address = ('localhost', 10000)
print('in partenza sulla porta {} {}'.format(*server_address),
      file=sys.stderr)
server.bind(server_address)

# In ascolto per connessioni in arrivo
server.listen(5)

Gli argomenti per select() sono tre liste che contengono i canali da monitorare. La prima è una lista degli oggetti che devono essere controllati per la lettura dei dati in arrivo, la seconda contiene gli oggetti che riceveranno i dati in uscita quando vi è spazio nei propri buffer, e la terza è quella che potrebbe avere un errore (in genere una combinazione degli oggetti dei canali di input e output). Il passo successivo nel server è l'impostazione delle liste che contengono le sorgenti di input e le destinazioni di output da passare a select().

# Socket dai quali ci si attende una lettura
inputs = [server]

# Socket verso i quali ci si prevede di scrivere
outputs = []

Le connessioni sono aggiunte e rimosse da queste liste dal ciclo principale del server. Visto che questa versione del server attenderà che un socket diventi scrivibile prima di inviargli dati (in luogo di inviare immediatamente la risposta), ciascuna connessione in uscita necessita di una coda che funga da buffer per i dati da inviargli attraverso.

# Code di messaggio in uscita (socket:Queue)

message_queues = {}

La parte principale del programma server esegue un ciclo, chiamando select per interrompersi e attendere l'attività di rete.

while inputs:

    # Attende che almeno uno dei socket sia
    # pronto per l'elaborazione
    print('in attesa del prossimo evento', file=sys.stderr)
    readable, writable, exceptional = select.select(inputs,
                                                    outputs,
                                                    inputs)

select() ritorna tre nuove liste, che contengono sottoinsiemi dei contenuti delle liste ricevute in input. Tutti i socket nella list readable hanno dati in arrivo nel buffer e sono disponibili per la lettura. Tutti i socket della lista writable hanno spazio libero nel loro buffer e vi si può scrivere. I socket ritornati in exceptional hanno avuto un errore (la vera definizione di "condizione di eccezione" dipende dalla piattaforma).

I socket "leggibili" rappresentano tre possibili casi. Se il socket è quello principale del server, quello usato per ascoltare connessioni in arrivo, allora la condizione "leggibile" significa che è pronto ad accettare un'altra connessione in arrivo. Oltre ad aggiungere una nuova connessione alla lista di input da monitorare, questa parte di codice imposta il socket client per non bloccare.

# Gestione input
    for s in readable:

        if s is server:
            # Un socket "leggibile" è pronto ad accettare una connessione
            connection, client_address = s.accept()
            print('  connessione da', client_address,
                  file=sys.stderr)
            connection.setblocking(0)
            inputs.append(connection)

            # Fornisce alla connessione una coda per i dati che si
            # vogliono inviare
            message_queues[connection] = queue.Queue()

Il prossimo caso è una connessione stabilita con un client che ha inviato dati. I dati sono letti con recv(), quindi piazzati nella coda in modo che possano essere inviati attraverso il socket e ritornati al client.

        else:
            data = s.recv(1024)
            if data:
                # Un socket client leggibile ha dati
                print('  received {!r} from {}'.format(
                    data, s.getpeername()), file=sys.stderr,
                )
                message_queues[s].put(data)
                # Aggiunge il canale in output per la risposta
                if s not in outputs:
                    outputs.append(s)

Un socket leggibile senza dati disponibili proviene da un client che è stato disconnesso, e il canale è pronto per essere chiuso.

            else:
                # Interpreta un risultato vuoto come connessione chiusa
                print('  chiusura', client_address,
                      file=sys.stderr)
                # Interrompe l'ascolto per input sulla connessione
                if s in outputs:
                    outputs.remove(s)
                inputs.remove(s)
                s.close()

                # Rimuove la coda di messaggi
                del message_queues[s]

Ci sono meno casistiche per le connessioni in scrittura. Se ci sono dati in una coda per una connessione, viene inviato il messaggio successivo. Altrimenti la connessione viene rimossa dalla lista delle connessioni in uscita in modo che la prossima volta che si passa attraverso select() nel ciclo, non indichi che il socket è pronto per inviare dati.

    # Gestione output
    for s in writable:
        try:
            next_msg = message_queues[s].get_nowait()
        except queue.Empty:
            # Non ci son omessaggi in attesa, quindi si interrompe la verifica
            # di "scrivibilità"
            print('  ', s.getpeername(), 'coda vuota',
                  file=sys.stderr)
            outputs.remove(s)
        else:
            print('  in invio {!r} verso {}'.format(next_msg,
                                                    s.getpeername()),
                  file=sys.stderr)
            s.send(next_msg)

Infine, se vi è un errore con un socket, esso viene chiuso.

    # Gestione delle "condizioni di eccezione"
    for s in exceptional:
        print('condizione di eccezione', s.getpeername(),
              file=sys.stderr)
        # Interrompe l'ascolto in input per la connessione
        inputs.remove(s)
        if s in outputs:
            outputs.remove(s)
        s.close()

        # Rimuove la coda di messaggio
        del message_queues[s]

Il programma di esempio client usa due socket per dimostrare come il server gestisca con select() connessioni multiple allo stesso tempo. Il client inizia connettendo ciascun socket TCP/IP al server.

# select_echo_multiclient.py

import socket
import sys

messages = [
    'Questo è il messaggio. ',
    'Verrà inviato ',
    'in parti.',
]
server_address = ('localhost', 10000)

# Crea un socket TCP/IP
socks = [
    socket.socket(socket.AF_INET, socket.SOCK_STREAM),
    socket.socket(socket.AF_INET, socket.SOCK_STREAM),
]

# Connette il socket alla porta dove il server è in ascolto
print('connessione a {} porta {}'.format(*server_address),
      file=sys.stderr)
for s in socks:
    s.connect(server_address)

Quindi invia una parte di messaggio alla volta tramite ciascun socket e legge tutte le risposte disponibili dopo aver scritto nuovi dati.

for message in messages:
    outgoing_data = message.encode()

    # Invia messaggi a entrambi i socket
    for s in socks:
        print('{}: in invio {!r}'.format(s.getsockname(),
                                        outgoing_data),
              file=sys.stderr)
        s.send(outgoing_data)

    # Legge le risposte da entrambi i socket
    for s in socks:
        data = s.recv(1024)
        print('{}: ricevuto {!r}'.format(s.getsockname(),
                                         data),
              file=sys.stderr)
        if not data:
            print('chiusura socket', s.getsockname(),
                  file=sys.stderr)
            s.close()

Eseguire il server in una finestra di terminale e il client in un'altra. L'output dovrebbe essere tipo questo, con numeri di porte diverse.

$ python3 select_echo_server.py

in partenza sulla porta localhost 10000
in attesa dell'evento successivo
  connessione da ('127.0.0.1', 59178)
in attesa dell'evento successivo
  connessione da ('127.0.0.1', 59180)
  ricevuti b'Ecco il messaggio. ' da ('127.0.0.1', 59178)
in attesa dell'evento successivo
  ricevuti b'Ecco il messaggio. ' da ('127.0.0.1', 59180)
  in invio b'Ecco il messaggio. ' verso ('127.0.0.1', 59178)
in attesa dell'evento successivo
   ('127.0.0.1', 59178) coda vuota
  in invio b'Ecco il messaggio. ' verso ('127.0.0.1', 59180)
in attesa dell'evento successivo
   ('127.0.0.1', 59180) coda vuota
in attesa dell'evento successivo
  ricevuti b'che viene inviato ' da ('127.0.0.1', 59178)
in attesa dell'evento successivo
  ricevuti b'che viene inviato ' da ('127.0.0.1', 59180)
  in invio b'che viene inviato ' verso ('127.0.0.1', 59178)
in attesa dell'evento successivo
   ('127.0.0.1', 59178) coda vuota
  in invio b'che viene inviato ' verso ('127.0.0.1', 59180)
in attesa dell'evento successivo
   ('127.0.0.1', 59180) coda vuota
in attesa dell'evento successivo
  ricevuti b'in parti.' da ('127.0.0.1', 59178)
in attesa dell'evento successivo
  ricevuti b'in parti.' da ('127.0.0.1', 59180)
  in invio b'in parti.' verso ('127.0.0.1', 59178)
in attesa dell'evento successivo
   ('127.0.0.1', 59178) coda vuota
  in invio b'in parti.' verso ('127.0.0.1', 59180)
in attesa dell'evento successivo
   ('127.0.0.1', 59180) coda vuota
in attesa dell'evento successivo
  chiusura ('127.0.0.1', 59180)
in attesa dell'evento successivo
  chiusura ('127.0.0.1', 59180)
in attesa dell'evento successivo

L'output del client mostra che i dati sono stati inviati e ricevuti usando entrambi i socket.

$ python3 select_echo_multiclient.py

connessione a localhost porta 10000
('127.0.0.1', 59178): in invio b'Ecco il messaggio. '
('127.0.0.1', 59180): in invio b'Ecco il messaggio. '
('127.0.0.1', 59178): ricevuto b'Ecco il messaggio. '
('127.0.0.1', 59180): ricevuto b'Ecco il messaggio. '
('127.0.0.1', 59178): in invio b'che viene inviato '
('127.0.0.1', 59180): in invio b'che viene inviato '
('127.0.0.1', 59178): ricevuto b'che viene inviato '
('127.0.0.1', 59180): ricevuto b'che viene inviato '
('127.0.0.1', 59178): in invio b'in parti.'
('127.0.0.1', 59180): in invio b'in parti.'
('127.0.0.1', 59178): ricevuto b'in parti.'
('127.0.0.1', 59180): ricevuto b'in parti.'

I/O Non Bloccante con Timeout

select() riceve anche un quarto parametro opzionale, che è il numero di secondi di attesa prima di interrompere il monitoraggio se nessun canale è diventato attivo. Usando un valore di timeout si consente al programma principale di chiamare select() come parte di un ciclo di elaborazione più grande, intraprendendo altre azioni inframmezzate alla verifica di input dalla rete.

Quando scade il timeout, select() ritorna tre liste vuote. L'esempio del server è stato aggiornato per utilizzare un timeout aggiungendo l'argomento extra alla chiamata di select() e la gestione delle liste vuote dopo che select() ritorna.

    readable, writable, exceptional = select.select(inputs,
                                                    outputs,
                                                    inputs,
                                                    timeout)

    if not (readable or writable or exceptional):
        print('  tempo esaurito, si eseguono altri compiti',
              file=sys.stderr)
        continue

La versione "lenta" del programma client va in pausa dopo l'invio di ciascun messaggio per simulare latenza o qualche altro ritardo nella trasmissione.

# select_echo_slow_client.py

import socket
import sys
import time

# Crea un socket TCP/IP
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)

# Connette il socket alla porta dove il server è in ascolto
server_address = ('localhost', 10000)
print('connessione a {} porta {}'.format(*server_address),
      file=sys.stderr)
sock.connect(server_address)

time.sleep(1)

messages = [
    'Prima parte del messaggio.',
    'Seconda parte del messaggio.',
]
amount_expected = len(''.join(messages))

try:

    # Invio dati
    for message in messages:
        data = message.encode()
        print('in invio {!r}'.format(data), file=sys.stderr)
        sock.sendall(data)
        time.sleep(1.5)

    # Ricerca della risposta
    amount_received = 0

    while amount_received < amount_expected:
        data = sock.recv(16)
        amount_received += len(data)
        print('recevuti {!r}'.format(data), file=sys.stderr)

finally:
    print('chiusura socket', file=sys.stderr)
    sock.close()

L'esecuzione del nuovo server con il client "lento" produce:

$ python3 select_echo_server_timeout.py

in partenza sulla porta localhost 10000
in attesa dell'evento successivo
  tempo esaurito, si eseguono altri compiti
in attesa dell'evento successivo
  tempo esaurito, si eseguono altri compiti
in attesa dell'evento successivo
  connessione da ('127.0.0.1', 52732)
in attesa dell'evento successivo
  tempo esaurito, si eseguono altri compiti
in attesa dell'evento successivo
  ricevuti b'Prima parte del messaggio.' da ('127.0.0.1', 52732)
in attesa dell'evento successivo
  in invio b'Prima parte del messaggio.' verso ('127.0.0.1', 52732)
in attesa dell'evento successivo
   ('127.0.0.1', 52732) coda vuota
in attesa dell'evento successivo
  tempo esaurito, si eseguono altri compiti
in attesa dell'evento successivo
  ricevuti b'Seconda parte del messaggio.' da ('127.0.0.1', 52732)
in attesa dell'evento successivo
  in invio b'Seconda parte del messaggio.' verso ('127.0.0.1', 52732)
in attesa dell'evento successivo
   ('127.0.0.1', 52732) coda vuota
in attesa dell'evento successivo
  tempo esaurito, si eseguono altri compiti
in attesa dell'evento successivo
  chiusura ('127.0.0.1', 52732)
in attesa dell'evento successivo
  tempo esaurito, si eseguono altri compiti

Questo è l'output del client.

$ python3 select_echo_slow_client.py

connessione a localhost porta 10000
in invio b'Prima parte del messaggio.'
in invio b'Seconda parte del messaggio.'
recevuti b'Prima parte del '
recevuti b'messaggio.Second'
recevuti b'a parte del mess'
recevuti b'aggio.'
chiusura socket

Utilizzare poll()

La funzione poll fornisce caratteristiche simili a select(), ma l'implementazione sottostante è più efficiente. Il punto negativo è che poll() non è supportata sotto Windows, quindi i programmi che usano poll() sono meno portabili.

Un server che ripete quanto ricevuto costruito su poll() inizia con lo stesso codice di configurazione di socket usata negli altri esempi.

# select_poll_echo_server.py

import select
import socket
import sys
import queue

# Crea un socket TCP/IP
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(0)

# Collega il socket alla porta
server_address = ('localhost', 10000)
print('in attivazione su {} porta {}'.format(*server_address),
      file=sys.stderr)
server.bind(server_address)

# In ascolto per connessioni in arrivo
server.listen(5)

# Mantiene le code dei messaggi in uscita
message_queues = {}

Il valore di timeout passato a poll() è rappresentato in millisecondi, invece che secondi, quindi per mettere in pausa per un secondo il valore deve essere impostato a 1000.

# Non si blocca per sempre (millisecondi)
TIMEOUT = 1000

Python implementa poll() con una classe che gestisce i canali dati registrati che sono monitorati. I canali sono aggiunti chiamando register() con flag che indicano quali eventi siano di interesse per quel canale. L'intero insieme di flag è elencato nella tabella qui sotto:

EVENTO DESCRIZIONE
POLLIN Input pronto
POLLPRI Input prioritario pronto
POLLOUT In grado di ricevere output
POLLERR Errore
POLLHUP Canale chiuso
POLLNVAL Canale non aperto

Il server imposterà alcuni socket solo per la lettura e altri per lettura e scrittura. Le combinazioni appropriate di flag sono salvate nelle variabili locali READ_ONLY e READ_WRITE.

# Insiemi di flag comunemente usati
READ_ONLY = (
    select.POLLIN |
    select.POLLPRI |
    select.POLLHUP |
    select.POLLERR
)
READ_WRITE = READ_ONLY | select.POLLOUT

Il socket server viene registrato in modo che qualunque connessione in arrivo o dati inneschino un evento.

# Imposta il poller
poller = select.poll()
poller.register(server, READ_ONLY)

Visto che poll() ritorna una lista di tuple che contengono il descrittore di file per il socket e il flag di evento, è necessaria una mappatura tra i numeri dei descrittori di file e gli oggetti per recuperare il socket da cui leggere o su cui scrivere.

# Mappa i descrittori di file agli oggetti socket
fd_to_socket = {
    server.fileno(): server,
}

Il ciclo del server chiama poll(), quindi elabora gli "eventi" ritornati cercando il socket ed eseguendo l'azione in base al flag nell'evento.

while True:

    # Attende che almeno uno dei socket sia
    # pronto per l'elaborazione
    print('in attesa del prossimo evento', file=sys.stderr)
    events = poller.poll(TIMEOUT)

    for fd, flag in events:

        # Recupera il socket effettivo dal suo descrittore di file
        s = fd_to_socket[fd]

Come con select(), quando il socket del server principale è "leggibile", significa in realtà che esiste una connessione pendente dal client. La nuova connessione è registrata con i flag READ_ONLY per monitorare i nuovi dati dalla quale perverranno.

        # Gestione input
        if flag & (select.POLLIN | select.POLLPRI):

            if s is server:
                # UN socket leggibile è pronto
                # per accettare una connessione
                connection, client_address = s.accept()
                print('  connessione', client_address,
                      file=sys.stderr)
                connection.setblocking(0)
                fd_to_socket[connection.fileno()] = connection
                poller.register(connection, READ_ONLY)

                # Diamo alla connessione una coda per i dati da inviare
                message_queues[connection] = queue.Queue()

I socket che non sono server sono i client esistenti e recv() viene usato per accedere ai dati in attesa di lettura.

            else:
                data = s.recv(1024)

Se recv() ritorna dei dati, vengono piazzati nella coda in uscita per il socket, e i flag per quel socket sono modificati tramite modify() in modo che poll() possa monitorare i socket che sono pronti per ricevere dati.

                if data:
                    # Un socket client leggibile ha dei dati
                    print('  recevuti {!r} da {}'.format(
                        data, s.getpeername()), file=sys.stderr,
                    )
                    message_queues[s].put(data)
                    # Si aggiunge un canale di output per la risposta
                    poller.modify(s, READ_WRITE)

Una stringa vuota ritornata da recv() significa che il client è disconnesso, quindi si usa unregistered() per indicare all'oggetto poll di ignorare il socket.

                else:
                    # Un risultato vuoto si interpreta come una
                    # connessione chiusa
                    print('  chiusura', client_address,
                          file=sys.stderr)
                    # Si interrompe l'ascolto per in input sulla connessione
                    poller.unregister(s)
                    s.close()

                    # Rimozione della coda dei messaggi.
                    del message_queues[s]

Il flag POLLHYUP indica un client che ha interrotto la connessione senza chiuderla adeguatamente. Il server interrompe l'interrogazione di client che spariscono.

        elif flag & select.POLLHUP:
            # Il client ha interrotto la connessione
            print('  chiusura', client_address, '(HUP)',
                  file=sys.stderr)
            # Si interrompe l'ascolto per in input sulla connessione
            poller.unregister(s)
            s.close()

La gestione dei socket scrivibili assomiglia alla versione usata nell'esempio per select() a parte che viene usato modify() per modificare i flag del socket invece che rimuoverlo dalla lista di output.

        elif flag & select.POLLOUT:
            # Il socket è pronto per l'invio dati
            # se ce ne sono da spedire
            try:
                next_msg = message_queues[s].get_nowait()
            except queue.Empty:
                # Nessun messaggio in atttesa, quindi si interrompe
                # la verifica
                print(s.getpeername(), 'coda vuota',
                      file=sys.stderr)
                poller.modify(s, READ_ONLY)
            else:
                print('  inviati {!r} a {}'.format(
                    next_msg, s.getpeername()), file=sys.stderr,
                )
                s.send(next_msg)

In ultimo qualsiasi evento con POLLERR fa sì che il server chiuda il socket.

        elif flag & select.POLLERR:
            print('  eccezione su', s.getpeername(),
                  file=sys.stderr)
            # Si interrompe l'ascolto per in input sulla connessione
            poller.unregister(s)
            s.close()

            # Rimozione della coda dei messaggi.
            del message_queues[s]

Ecco cosa esce quando il server basato su poll viene eseguito assieme a select_echo_multiclient.py (il programma client che usa socket multipli).

$ python3 select_poll_echo_server.py
in attivazione su localhost porta 10000
in attesa del prossimo evento
in attesa del prossimo evento
in attesa del prossimo evento
in attesa del prossimo evento
  connessione ('127.0.0.1', 38756)
in attesa del prossimo evento
  connessione ('127.0.0.1', 38758)
  recevuti b'Ecco il messaggio. ' da ('127.0.0.1', 38756)
in attesa del prossimo evento
  inviati b'Ecco il messaggio. ' a ('127.0.0.1', 38756)
  recevuti b'Ecco il messaggio. ' da ('127.0.0.1', 38758)
in attesa del prossimo evento
('127.0.0.1', 38756) coda vuota
  inviati b'Ecco il messaggio. ' a ('127.0.0.1', 38758)
in attesa del prossimo evento
('127.0.0.1', 38758) coda vuota
in attesa del prossimo evento
  recevuti b'che viene inviato ' da ('127.0.0.1', 38756)
in attesa del prossimo evento
  inviati b'che viene inviato ' a ('127.0.0.1', 38756)
  recevuti b'che viene inviato ' da ('127.0.0.1', 38758)
in attesa del prossimo evento
('127.0.0.1', 38756) coda vuota
  inviati b'che viene inviato ' a ('127.0.0.1', 38758)
in attesa del prossimo evento
('127.0.0.1', 38758) coda vuota
in attesa del prossimo evento
  recevuti b'in parti.' da ('127.0.0.1', 38756)
in attesa del prossimo evento
  inviati b'in parti.' a ('127.0.0.1', 38756)
  recevuti b'in parti.' da ('127.0.0.1', 38758)
in attesa del prossimo evento
('127.0.0.1', 38756) coda vuota
  inviati b'in parti.' a ('127.0.0.1', 38758)
in attesa del prossimo evento
('127.0.0.1', 38758) coda vuota
in attesa del prossimo evento
  chiusura ('127.0.0.1', 38758)
in attesa del prossimo evento
  chiusura ('127.0.0.1', 38758)

Opzioni Specifiche di Piattaforma

Le opzioni meno portabili fornite da select sono epoll, l'API di edge polling supportata da Linux, kqueue, che usa la code del kernel di BSD, e kevent, l'interfaccia eventi di BSD. Fare riferimento alla documentazione della libreria del sistema operativo per maggiori dettagli sul loro funzionamento.

Vedere anche:

select
La documentazione della libreria standard per questo modulo
selectors
Astrazioni Multiplexing di I/O
socket
Comunicazione di rete a basso livello
asyncio
Strumenti di I/O e concorrenza
Socket Programming HOWTO
Una guida pratica di Gordon McMillan, inclusa nella documentazione della libreria standard.
Unix Network Programming, Volume 1: The Sockets Networking API, 3/E
di W. Richard Stevens, Bill Fenner, e Andrew M. Rudoff. Pubblicato da Addison-Wesley Professional, 2004. ISBN-10: 0131411551
Foundations of Python Network Programminng, 3/E
di Brandon Rhodes e John Goerzen. Pubblicato da Apress, 2014. ISBN-10: 1430258543