selectors - Astrazioni Multiplexing per I/O

Scopo: Fornisce astrazioni indipendenti dalla piattaforma per multiplexing I/O basato sul modulo select

Il modulo selectors fornisce uno strato di astrazione indipendente dalla piattaforma sopra alle funzioni di monitoraggio I/O della piattaforma specifica in select.

Modello Operativo

Le API in selectors sono basate sugli eventi, simili alla funzione poll() di select. Ci sono parecchie implementazioni e il modulo imposta automaticamente l'alias DefaultSelector per fare riferimento a quella maggiormente efficiente per la configurazione corrente del sistema.

Un oggetto selettore fornisce metodi per specificare quali eventi cercare in un socket, quindi consente al chiamante di rimanere in attesa di eventi in una modalità indipendente della piattaforma. La registrazione di un evento crea una chiave di selettore SelectorKey, che contiene il socket, informazioni sugli eventi di interesse, e dati applicativi opzionali. Il proprietario del selettore chiama il suo metodo select() per apprendere riguardo agli eventi. Il valore di ritorno è una sequenza di oggetti chiave e una bitmask che indica quali eventi si sono verificati. Un programma che utilizzi un selettore dovrebbe chiamare select() ripetutamente, poi gestire gli eventi in modo appropriato.

Server che Ritorna Dati Inviati da Client

L'esempio qui sotto di un server che ritorna i dati inviati da un client usa i dati applicativi in SelectorKey per registrare una funzione callback da chiamarsi sul nuovo evento. Il ciclo principale ottiene il callback dalla chiave e passa a essa il socket e la bitmask dell'evento. Appena il server va in esecuzione, registra la funzione accept() da chiamarsi per eventi di lettura sul socket del server principale. L'accettazione di una connessione produce un nuovo socket, il quale viene poi registrato con la funzione read() come callback per gli eventi di lettura.

# selectors_echo_server.py

import selectors
import socket

mysel = selectors.DefaultSelector()
keep_running = True


def read(connection, mask):
    "Callback for eventi di lettura"
    global keep_running

    client_address = connection.getpeername()
    print('letti({})'.format(client_address))
    data = connection.recv(1024)
    if data:
        # Un socket client leggibile ha dati
        print('  ricevuti {!r}'.format(data))
        connection.sendall(data)
    else:
        # Interpreta un risultato vuoto come connessione chiusa
        print('  chiusura')
        mysel.unregister(connection)
        connection.close()
        # Dice al ciclo principale di fermarsi
        keep_running = False


def accept(sock, mask):
    "Callback per nuove connessioni"
    new_connection, addr = sock.accept()
    print('accetta({})'.format(addr))
    new_connection.setblocking(False)
    mysel.register(new_connection, selectors.EVENT_READ, read)


server_address = ('localhost', 10000)
print('In esecuzione su {} porta {}'.format(*server_address))
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.setblocking(False)
server.bind(server_address)
server.listen(5)

mysel.register(server, selectors.EVENT_READ, accept)

while keep_running:
    print('in attesa di I/O')
    for key, mask in mysel.select(timeout=1):
        callback = key.data
        callback(key.fileobj, mask)

print('terminato')
mysel.close()

Quando read() non riceve dati dal socket, interpreta l'evento di lettura come se l'altro capo della connessione sia stata chiusa, invece che spedire dati. Rimuove il socket dal selettore e lo chiude. Per evitare un ciclo infinito, il server chiude anche se stesso dopo che ha finito di comunicare con un singolo client.

Client che Riceve i Dati Inviati

L'esempio qui sotto di client che riceve di ritorno i dati da esso inviati elabora tutti gli eventi di I/O nel ciclo principale, invece che usare callback. Imposta il selettore per segnalare eventi di lettura sul socket e per segnalare quando il socket è pronto per l'invio di dati. Visto che stanno cercando due tipi di evento, il client deve verificare quale è occorso esaminando il valore di bitmask. Dopo che tutti i suoi dati in uscita sono stati spediti, modifica la configurazione del selettore per segnalare solo quando ci sono dati da leggere.

# selectors_echo_client.py

import selectors
import socket

mysel = selectors.DefaultSelector()
keep_running = True
outgoing = [
    b'Viene ripetuto.',
    b'Questo messaggio.  ',
]
bytes_sent = 0
bytes_received = 0

# La connessione è una operazione bloccante, quindi si chiama setblocking()
# dopo il ritorno
server_address = ('localhost', 10000)
print('connecting to {} port {}'.format(*server_address))
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect(server_address)
sock.setblocking(False)

# Imposta il selettore per verificare quando il socket è pronto
# per inviare dati oppure per verificare se ci sono dati da leggere
mysel.register(
    sock,
    selectors.EVENT_READ | selectors.EVENT_WRITE,
)

while keep_running:
    print('in attesa di I/O')
    for key, mask in mysel.select(timeout=1):
        connection = key.fileobj
        client_address = connection.getpeername()
        print('client({})'.format(client_address))

        if mask & selectors.EVENT_READ:
            print('  pronto per legger')
            data = connection.recv(1024)
            if data:
                # Un socket client leggibile ha dati
                print('  ricevuti {!r}'.format(data))
                bytes_received += len(data)

            # Interpreta un risultato vuoto come connessione chiusa
            # e chiude anche quando si è ricevuta una copia di tutti
            # i dati inviati.
            keep_running = not (
                data or
                (bytes_received and
                 (bytes_received == bytes_sent))
            )

        if mask & selectors.EVENT_WRITE:
            print('  pronto per scrivere')
            if not outgoing:
                # I messaggi sono esauriti, quindi non serve più
                # scrivere qualcosa. Si modifica la registrazione per
                # consentirci di leggere le risposte dal server.
                print('  si passa a sola lettura')
                mysel.modify(sock, selectors.EVENT_READ)
            else:
                # Invia il messaggio successivo
                next_msg = outgoing.pop()
                print('  in invio {!r}'.format(next_msg))
                sock.sendall(next_msg)
                bytes_sent += len(next_msg)

print('terminato')
mysel.unregister(connection)
connection.close()
mysel.close()

Il client tiene traccia di quanti dati ha inviato e di quanti ha ricevuto. Quando questi valori corrispondono e non sono zero, il client esce dal ciclo di elaborazione e chiude se stesso rimuovendo il socket dal selettore e chiudendo sia il socket che il selettore.

Client e Server Insieme

Il client e il server dovrebbero essere eseguiti in finestre di terminale separate, in modo che possano comunicare tra di loro. Il risultato del server mostra la connessione e i dati in arrivo, così come la risposta restituita al client.

$ python3 selectors_echo_server.py

In esecuzione su localhost porta 10000
in attesa di I/O
in attesa di I/O
in attesa di I/O
accetta(('127.0.0.1', 41536))
in attesa di I/O
letti(('127.0.0.1', 41536))
  ricevuti b'Questo messaggio.  Viene ripetuto.'
in attesa di I/O
letti(('127.0.0.1', 41536))
  chiusura
terminato

Il risultato del client mostra il messaggio in uscita e la risposta dal server.

$ python3 selectors_echo_client.py

connessione a localhost porta 10000
in attesa di I/O
client(('127.0.0.1', 10000))
  pronto per scrivere
  in invio b'Questo messaggio.  '
in attesa di I/O
client(('127.0.0.1', 10000))
  pronto per scrivere
  in invio b'Viene ripetuto.'
in attesa di I/O
client(('127.0.0.1', 10000))
  pronto per scrivere
  si passa a sola lettura
in attesa di I/O
client(('127.0.0.1', 10000))
  pronto per legger
  ricevuti b'Questo messaggio.  Viene ripetuto.'
terminato

Vedere anche:

selectors
La documentazione della libreria standard per questo modulo
select
API di basso livello per gestire con efficienza I/O