Streams

Källkod: Lib/asyncio/streams.py


Streams är async/await-ready-primitiver på hög nivå för att arbeta med nätverksanslutningar. Streams gör det möjligt att skicka och ta emot data utan att använda callbacks eller lågnivåprotokoll och transporter.

Här är ett exempel på en TCP-ekoklient som skrivits med hjälp av asyncio-strömmar:

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

Se även avsnittet Examples nedan.

Strömfunktioner

Följande asynciofunktioner på högsta nivå kan användas för att skapa och arbeta med strömmar:

async asyncio.open_connection(host=None, port=None, *, limit=None, ssl=None, family=0, proto=0, flags=0, sock=None, local_addr=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, happy_eyeballs_delay=None, interleave=None)

Upprätta en nätverksanslutning och returnera ett par objekt av typen (reader, writer).

De returnerade reader- och writer-objekten är instanser av klasserna StreamReader och StreamWriter.

limit bestämmer den buffertstorleksgräns som används av den returnerade StreamReader-instansen. Som standard är limit inställd på 64 KiB.

Resten av argumenten skickas direkt till loop.create_connection().

Anteckning

Argumentet sock överför ägandet av sockeln till den skapade StreamWriter. För att stänga sockeln, anropa dess close() metod.

Ändrad i version 3.7: Parametern ssl_handshake_timeout har lagts till.

Ändrad i version 3.8: Lagt till parametrarna happy_eyeballs_delay och interleave.

Ändrad i version 3.10: Parametern loop har tagits bort.

Ändrad i version 3.11: Parametern ssl_shutdown_timeout har lagts till.

async asyncio.start_server(client_connected_cb, host=None, port=None, *, limit=None, family=socket.AF_UNSPEC, flags=socket.AI_PASSIVE, sock=None, backlog=100, ssl=None, reuse_address=None, reuse_port=None, keep_alive=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True)

Starta en socket-server.

Callbacken client_connected_cb anropas när en ny klientanslutning upprättas. Den får ett par (reader, writer) som två argument, instanser av klasserna StreamReader och StreamWriter.

client_connected_cb kan vara en vanlig callable eller en coroutine-funktion; om det är en coroutine-funktion kommer den automatiskt att schemaläggas som en Task.

limit bestämmer den buffertstorleksgräns som används av den returnerade StreamReader-instansen. Som standard är limit inställd på 64 KiB.

Resten av argumenten skickas direkt till loop.create_server().

Anteckning

Argumentet sock överför äganderätten till sockeln till den server som skapats. För att stänga sockeln anropar du serverns close()-metod.

Ändrad i version 3.7: Parametrarna ssl_handshake_timeout och start_serving har lagts till.

Ändrad i version 3.10: Parametern loop har tagits bort.

Ändrad i version 3.11: Parametern ssl_shutdown_timeout har lagts till.

Ändrad i version 3.13: Parametern keep_alive har lagts till.

Unix Sockets

async asyncio.open_unix_connection(path=None, *, limit=None, ssl=None, sock=None, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

Upprätta en Unix-socketanslutning och returnera ett par (reader, writer).

Liknar open_connection() men fungerar på Unix-sockets.

Se även dokumentationen av loop.create_unix_connection().

Anteckning

Argumentet sock överför ägandet av sockeln till den skapade StreamWriter. För att stänga sockeln, anropa dess close() metod.

Ändrad i version 3.7: Parametern ssl_handshake_timeout har lagts till. Parametern path kan nu vara en path-like object

Ändrad i version 3.10: Parametern loop har tagits bort.

Ändrad i version 3.11: Parametern ssl_shutdown_timeout har lagts till.

async asyncio.start_unix_server(client_connected_cb, path=None, *, limit=None, sock=None, backlog=100, ssl=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None, start_serving=True, cleanup_socket=True)

Starta en Unix-socket-server.

Liknar start_server() men fungerar med Unix-sockets.

Om cleanup_socket är true kommer Unix-sockeln automatiskt att tas bort från filsystemet när servern stängs, såvida inte sockeln har bytts ut efter att servern skapades.

Se även dokumentationen av loop.create_unix_server().

Anteckning

Argumentet sock överför äganderätten till sockeln till den server som skapats. För att stänga sockeln anropar du serverns close()-metod.

Ändrad i version 3.7: Parametrarna ssl_handshake_timeout och start_serving har lagts till. Parametern path kan nu vara en path-like object.

Ändrad i version 3.10: Parametern loop har tagits bort.

Ändrad i version 3.11: Parametern ssl_shutdown_timeout har lagts till.

Ändrad i version 3.13: Parametern cleanup_socket har lagts till.

Strömläsare

class asyncio.StreamReader

Representerar ett läsarobjekt som tillhandahåller API:er för att läsa data från IO-strömmen. Som en asynkron iterabel stöder objektet async for-satsen.

Det är inte rekommenderat att instansiera StreamReader-objekt direkt; använd istället open_connection() och start_server().

feed_eof()

Bekräfta EOF.

async read(n=-1)

Läs upp till n byte från strömmen.

Om n inte anges eller sätts till -1, läs tills EOF, returnera sedan alla lästa bytes. Om EOF mottogs och den interna bufferten är tom, returneras ett tomt bytes-objekt.

Om n är 0, returneras ett tomt bytes-objekt omedelbart.

Om n är positiv, returneras högst n tillgängliga bytes så snart minst 1 byte finns tillgängligt i den interna bufferten. Om EOF tas emot innan någon byte har lästs, returneras ett tomt bytes-objekt.

async readline()

Läs en rad, där ”rad” är en sekvens av bytes som slutar med \n.

Om EOF tas emot och \n inte hittades, returnerar metoden delvis lästa data.

Om EOF tas emot och den interna bufferten är tom, returneras ett tomt bytes-objekt.

async readexactly(n)

Läs exakt n byte.

Skapa ett IncompleteReadError om EOF nås innan n kan läsas. Använd attributet IncompleteReadError.partial för att hämta delvis lästa data.

async readuntil(separator=b'\n')

Läs data från strömmen tills separator hittas.

Vid framgång kommer data och separator att tas bort från den interna bufferten (konsumeras). Returnerade data kommer att innehålla separatorn i slutet.

Om mängden data som läses överskrider den konfigurerade streamgränsen, uppstår ett LimitOverrunError-undantag och data lämnas kvar i den interna bufferten och kan läsas igen.

Om EOF nås innan den fullständiga separatorn hittats, uppstår ett IncompleteReadError undantag och den interna bufferten nollställs. Attributet IncompleteReadError.partial kan innehålla en del av separatorn.

Separator kan också vara en tupel av separatorer. I detta fall kommer returvärdet att vara det kortaste möjliga som har någon separator som suffix. När det gäller LimitOverrunError anses den kortaste möjliga separatorn vara den som matchade.

Tillagd i version 3.5.2.

Ändrad i version 3.13: Parametern separator kan nu vara en tuple av separatorer.

at_eof()

Returnerar True om bufferten är tom och feed_eof() anropades.

StreamWriter

class asyncio.StreamWriter

Representerar ett writer-objekt som tillhandahåller API:er för att skriva data till IO-strömmen.

Det är inte rekommenderat att instansiera StreamWriter-objekt direkt; använd istället open_connection() och start_server().

write(data)

Metoden försöker skriva data till det underliggande uttaget omedelbart. Om det misslyckas köas data i en intern skrivbuffert tills det kan skickas.

Metoden bör användas tillsammans med metoden drain():

stream.write(data)
await ström.dränera()

Anteckning

Databuffert ska vara en C-kontinuerlig endimensionell bytes-liknande objekt.

writelines(data)

Metoden skriver en lista (eller en iterabel) med bytes till den underliggande sockeln omedelbart. Om det misslyckas köas data i en intern skrivbuffert tills den kan skickas.

Metoden bör användas tillsammans med metoden drain():

stream.writelines(linjer)
await ström.dränera()
close()

Metoden stänger strömmen och det underliggande uttaget.

Metoden bör användas, men är inte obligatorisk, tillsammans med metoden wait_closed():

stream.stäng()
await stream.wait_closed()
can_write_eof()

Returnerar True om den underliggande transporten stöder metoden write_eof(), annars False.

write_eof()

Stäng skrivänden av strömmen efter att den buffrade skrivdatan har spolats.

transport

Returnerar den underliggande asynciotransporten.

get_extra_info(name, default=None)

Tillgång till valfri transportinformation; se BaseTransport.get_extra_info() för detaljer.

async drain()

Vänta tills det är lämpligt att återuppta skrivningen till strömmen. Exempel:

writer.write(data)
await skribent.dränera()

Detta är en metod för flödeskontroll som interagerar med den underliggande IO-skrivbufferten. När storleken på bufferten når den höga vattenstämpeln blockerar drain() tills storleken på bufferten har tömts ner till den låga vattenstämpeln och skrivningen kan återupptas. När det inte finns något att vänta på återgår drain() omedelbart.

async start_tls(sslcontext, *, server_hostname=None, ssl_handshake_timeout=None, ssl_shutdown_timeout=None)

Uppgradera en befintlig strömbaserad anslutning till TLS.

Parametrar:

  • sslcontext: en konfigurerad instans av SSLContext.

  • server_hostname: anger eller åsidosätter det värdnamn som målserverns certifikat ska matchas mot.

  • ssl_handshake_timeout är den tid i sekunder som TLS-handskakningen ska vänta på att slutföras innan anslutningen avbryts. 60,0 sekunder om None (standard).

  • ssl_shutdown_timeout är den tid i sekunder som ska vänta på att SSL-avstängningen ska slutföras innan anslutningen avbryts. 30.0 sekunder om None (standard).

Tillagd i version 3.11.

Ändrad i version 3.12: Parametern ssl_shutdown_timeout har lagts till.

is_closing()

Returnerar True om strömmen är stängd eller håller på att stängas.

Tillagd i version 3.7.

async wait_closed()

Vänta tills strömmen är stängd.

Bör anropas efter close() för att vänta tills den underliggande anslutningen har stängts, vilket säkerställer att alla data har spolats innan programmet avslutas.

Tillagd i version 3.7.

Exempel

TCP-ekoklient som använder strömmar

TCP-ekoklient som använder funktionen asyncio.open_connection():

import asyncio

async def tcp_echo_client(message):
    reader, writer = await asyncio.open_connection(
        '127.0.0.1', 8888)

    print(f'Send: {message!r}')
    writer.write(message.encode())
    await writer.drain()

    data = await reader.read(100)
    print(f'Received: {data.decode()!r}')

    print('Close the connection')
    writer.close()
    await writer.wait_closed()

asyncio.run(tcp_echo_client('Hello World!'))

Se även

I exemplet TCP echo client protocol används lågnivåmetoden loop.create_connection().

TCP-ekoserver med hjälp av strömmar

TCP-ekoserver med hjälp av funktionen asyncio.start_server():

import asyncio

async def handle_echo(reader, writer):
    data = await reader.read(100)
    message = data.decode()
    addr = writer.get_extra_info('peername')

    print(f"Received {message!r} from {addr!r}")

    print(f"Send: {message!r}")
    writer.write(data)
    await writer.drain()

    print("Close the connection")
    writer.close()
    await writer.wait_closed()

async def main():
    server = await asyncio.start_server(
        handle_echo, '127.0.0.1', 8888)

    addrs = ', '.join(str(sock.getsockname()) for sock in server.sockets)
    print(f'Serving on {addrs}')

    async with server:
        await server.serve_forever()

asyncio.run(main())

Se även

I exemplet TCP echo server protocol används metoden loop.create_server().

Hämta HTTP-rubriker

Enkelt exempel på en fråga om HTTP-rubriker för den URL som skickas på kommandoraden:

import asyncio
import urllib.parse
import sys

async def print_http_headers(url):
    url = urllib.parse.urlsplit(url)
    if url.scheme == 'https':
        reader, writer = await asyncio.open_connection(
            url.hostname, 443, ssl=True)
    else:
        reader, writer = await asyncio.open_connection(
            url.hostname, 80)

    query = (
        f"HEAD {url.path or '/'} HTTP/1.0\r\n"
        f"Host: {url.hostname}\r\n"
        f"\r\n"
    )

    writer.write(query.encode('latin-1'))
    while True:
        line = await reader.readline()
        if not line:
            break

        line = line.decode('latin1').rstrip()
        if line:
            print(f'HTTP header> {line}')

    # Ignore the body, close the socket
    writer.close()
    await writer.wait_closed()

url = sys.argv[1]
asyncio.run(print_http_headers(url))

Användning:

python example.py http://example.com/path/page.html

eller med HTTPS:

python example.py https://example.com/path/page.html

Registrera ett öppet uttag för att vänta på data med hjälp av strömmar

Coroutine som väntar tills ett uttag tar emot data med hjälp av funktionen open_connection():

import asyncio
import socket

async def wait_for_data():
    # Get a reference to the current event loop because
    # we want to access low-level APIs.
    loop = asyncio.get_running_loop()

    # Create a pair of connected sockets.
    rsock, wsock = socket.socketpair()

    # Register the open socket to wait for data.
    reader, writer = await asyncio.open_connection(sock=rsock)

    # Simulate the reception of data from the network
    loop.call_soon(wsock.send, 'abc'.encode())

    # Wait for data
    data = await reader.read(100)

    # Got data, we are done: close the socket
    print("Received:", data.decode())
    writer.close()
    await writer.wait_closed()

    # Close the second socket
    wsock.close()

asyncio.run(wait_for_data())

Se även

Exemplet registrera ett öppet uttag för att vänta på data med hjälp av ett protokoll använder ett protokoll på låg nivå och metoden loop.create_connection().

I exemplet watch a file descriptor for read events används lågnivåmetoden loop.add_reader() för att bevaka en filbeskrivare.