Transporter och protokoll

Förord

Transporter och protokoll används av API:er för händelseslingor på låg nivå, t.ex. loop.create_connection(). De använder en callback-baserad programmeringsstil och möjliggör högpresterande implementeringar av nätverks- eller IPC-protokoll (t.ex. HTTP).

I grund och botten bör transporter och protokoll endast användas i bibliotek och ramverk och aldrig i asyncio-applikationer på hög nivå.

Denna dokumentationssida omfattar både Transports och Protocols.

Introduktion

På högsta nivå handlar transporten om hur bytes överförs, medan protokollet avgör vilka bytes som ska överföras (och i viss mån när).

Ett annat sätt att säga samma sak: en transport är en abstraktion för en socket (eller liknande I/O-slutpunkt) medan ett protokoll är en abstraktion för en applikation, ur transportens synvinkel.

En annan syn är att transport- och protokollgränssnitten tillsammans definierar ett abstrakt gränssnitt för användning av nätverks-I/O och interprocess-I/O.

Det finns alltid ett 1:1-förhållande mellan transport- och protokollobjekt: protokollet anropar transportmetoder för att skicka data, medan transporten anropar protokollmetoder för att vidarebefordra data som har mottagits.

De flesta anslutningsorienterade event loop-metoder (t.ex. loop.create_connection()) accepterar vanligtvis ett protocol_factory-argument som används för att skapa ett Protocol-objekt för en accepterad anslutning, representerad av ett Transport-objekt. Sådana metoder returnerar vanligtvis en tupel av (transport, protocol).

Innehåll

Denna dokumentationssida innehåller följande avsnitt:

Transporter

Källkod: Lib/asyncio/transports.py


Transporter är klasser som tillhandahålls av asyncio för att abstrahera olika typer av kommunikationskanaler.

Transportobjekt instansieras alltid av en asyncio event loop.

asyncio implementerar transporter för TCP, UDP, SSL och subprocessrör. Vilka metoder som är tillgängliga för en transport beror på transportens typ.

Transportklasserna är not thread safe.

Transporter Hierarki

class asyncio.BaseTransport

Basklass för alla transporter. Innehåller metoder som alla asynciotransporter delar.

class asyncio.WriteTransport(BaseTransport)

En bastransport för skrivskyddade anslutningar.

Instanser av klassen WriteTransport returneras från händelseslingans metod loop.connect_write_pipe() och används även av subprocessrelaterade metoder som loop.subprocess_exec().

class asyncio.ReadTransport(BaseTransport)

En bastransport för skrivskyddade anslutningar.

Instanser av klassen ReadTransport returneras från händelseslingans metod loop.connect_read_pipe() och används även av subprocessrelaterade metoder som loop.subprocess_exec().

class asyncio.Transport(WriteTransport, ReadTransport)

Gränssnitt som representerar en dubbelriktad transport, t.ex. en TCP-anslutning.

Användaren instansierar inte en transport direkt, utan anropar en utility-funktion som får en protokollfabrik och annan information som krävs för att skapa transport och protokoll.

Instanser av klassen Transport returneras från eller används av metoder i händelseslingor som loop.create_connection(), loop.create_unix_connection(), loop.create_server(), loop.sendfile(), etc.

class asyncio.DatagramTransport(BaseTransport)

En transport för datagram (UDP)-anslutningar.

Instanser av klassen DatagramTransport returneras från metoden loop.create_datagram_endpoint() i händelseslingan.

class asyncio.SubprocessTransport(BaseTransport)

En abstraktion för att representera en koppling mellan en överordnad och en underordnad OS-process.

Instanser av klassen SubprocessTransport returneras från eventloop-metoderna loop.subprocess_shell() och loop.subprocess_exec().

Bas transport

BaseTransport.close()

Stäng transporten.

Om transporten har en buffert för utgående data kommer buffrade data att tömmas asynkront. Inga fler data kommer att tas emot. När alla buffrade data har spolats kommer protokollets metod protocol.connection_lost() att anropas med None som argument. Transporten bör inte användas när den är stängd.

BaseTransport.is_closing()

Returnerar True om transporten håller på att stängas eller är stängd.

BaseTransport.get_extra_info(name, default=None)

Returnera information om transporten eller underliggande resurser som den använder.

name är en sträng som representerar den transportspecifika information som ska hämtas.

default är det värde som ska returneras om informationen inte är tillgänglig, eller om transporten inte stöder att den efterfrågas med den givna implementeringen av händelseslingan från tredje part eller på den aktuella plattformen.

Följande kod försöker till exempel hämta det underliggande socket-objektet för transporten:

sock = transport.get_extra_info('socket')
om sock inte är None:
    print(sock.getsockopt(...))

Kategorier av information som kan efterfrågas på vissa transporter:

BaseTransport.set_protocol(protocol)

Ställ in ett nytt protokoll.

Byte av protokoll bör endast göras när båda protokollen är dokumenterade för att stödja bytet.

BaseTransport.get_protocol()

Returnerar det aktuella protokollet.

Skrivskyddade transporter

ReadTransport.is_reading()

Returnerar True om transporten tar emot nya data.

Tillagd i version 3.7.

ReadTransport.pause_reading()

Pausa den mottagande änden av transporten. Inga data kommer att skickas till protokollets protocol.data_received()-metod förrän resume_reading() anropas.

Ändrad i version 3.7: Metoden är idempotent, d.v.s. den kan anropas när transporten redan är pausad eller stängd.

ReadTransport.resume_reading()

Återuppta den mottagande änden. Protokollets protocol.data_received()-metod kommer att anropas igen om några data är tillgängliga för läsning.

Ändrad i version 3.7: Metoden är idempotent, d.v.s. den kan anropas när transporten redan läser.

Skrivskyddade transporter

WriteTransport.abort()

Stäng transporten omedelbart, utan att vänta på att väntande operationer ska slutföras. Buffrade data kommer att gå förlorade. Inga fler data kommer att tas emot. Protokollets metod protocol.connection_lost() kommer så småningom att anropas med None som argument.

WriteTransport.can_write_eof()

Returnerar True om transporten stöder write_eof(), False om inte.

WriteTransport.get_write_buffer_size()

Returnerar den aktuella storleken på den utmatningsbuffert som används av transporten.

WriteTransport.get_write_buffer_limits()

Hämta vattenmärkena hög och låg för skrivflödeskontroll. Returnerar en tupel (låg, hög) där låg och hög är positiva antal byte.

Använd set_write_buffer_limits() för att ställa in gränserna.

Tillagd i version 3.4.2.

WriteTransport.set_write_buffer_limits(high=None, low=None)

Ställ in vattenmärkena hög och låg för skrivflödeskontroll.

Dessa två värden (mätt i antal byte) styr när protokollets metoder protocol.pause_writing() och protocol.resume_writing() anropas. Om det anges måste det låga vattenmärket vara mindre än eller lika med det höga vattenmärket. Varken high eller low kan vara negativa.

pause_writing() anropas när buffertstorleken blir större än eller lika med värdet high. Om skrivningen har pausats anropas resume_writing() när buffertstorleken blir mindre än eller lika med värdet low.

Standardvärdena är implementationsspecifika. Om endast det höga vattenmärket anges, är det låga vattenmärket ett implementationsspecifikt standardvärde som är mindre än eller lika med det höga vattenmärket. Om high sätts till noll sätts även low till noll, vilket gör att pause_writing() anropas när bufferten inte längre är tom. Om low sätts till noll kommer resume_writing() att anropas först när bufferten är tom. Att använda noll för någon av gränserna är i allmänhet suboptimalt eftersom det minskar möjligheterna att göra I/O och beräkningar samtidigt.

Använd get_write_buffer_limits() för att få fram gränserna.

WriteTransport.write(data)

Skriv några data-bytes till transporten.

Den här metoden blockerar inte, utan buffrar data och ser till att den skickas ut asynkront.

WriteTransport.writelines(list_of_data)

Skriv en lista (eller en iterabel) med databytes till transporten. Detta är funktionellt likvärdigt med att anropa write() på varje element som iterabeln ger, men kan implementeras mer effektivt.

WriteTransport.write_eof()

Stäng transportens skrivände efter att ha spolat ut alla buffrade data. Data kan fortfarande tas emot.

Denna metod kan ge upphov till NotImplementedError om transporten (t.ex. SSL) inte stöder halvstängda anslutningar.

Datagramtransporter

DatagramTransport.sendto(data, addr=None)

Skicka data byte till den fjärrpeer som anges av addr (en transportberoende måladress). Om addr är None skickas data till den måladress som anges när transporten skapas.

Den här metoden blockerar inte, utan buffrar data och ser till att den skickas ut asynkront.

Ändrad i version 3.13: Denna metod kan anropas med ett tomt bytes-objekt för att skicka ett datagram med noll längd. Beräkningen av buffertstorleken som används för flödeskontroll uppdateras också för att ta hänsyn till datagramhuvudet.

DatagramTransport.abort()

Stäng transporten omedelbart, utan att vänta på att väntande operationer ska slutföras. Buffrade data kommer att gå förlorade. Inga fler data kommer att tas emot. Protokollets metod protocol.connection_lost() kommer så småningom att anropas med None som argument.

Underprocess transporter

SubprocessTransport.get_pid()

Returnerar subprocessens process-ID som ett heltal.

SubprocessTransport.get_pipe_transport(fd)

Returnera transporten för kommunikationsröret som motsvarar filbeskrivaren för heltal fd:

  • 0: skrivbar strömtransport av standardinmatningen (stdin), eller None om underprocessen inte skapades med stdin=PIPE

  • 1: läsbar strömmande transport av standardutdata (stdout), eller None om underprocessen inte skapades med stdout=PIPE

  • 2: läsbar strömtransport av standardfel (stderr), eller None om underprocessen inte skapades med stderr=PIPE

  • andra fd: None

SubprocessTransport.get_returncode()

Returnerar subprocessens returkod som ett heltal eller None om den inte har returnerats, vilket liknar attributet subprocess.Popen.returncode.

SubprocessTransport.kill()

Stäng av underprocessen.

På POSIX-system skickar funktionen SIGKILL till subprocessen. I Windows är den här metoden ett alias för terminate().

Se även subprocess.Popen.kill().

SubprocessTransport.send_signal(signal)

Skicka signal-numret till underprocessen, som i subprocess.Popen.send_signal().

SubprocessTransport.terminate()

Stoppa underprocessen.

På POSIX-system skickar den här metoden SIGTERM till subprocessen. På Windows anropas Windows API-funktionen TerminateProcess() för att stoppa subprocessen.

Se även subprocess.Popen.terminate().

SubprocessTransport.close()

Döda subprocessen genom att anropa metoden kill().

Om underprocessen inte har återvänt ännu, och stäng transporterna av rören stdin, stdout och stderr.

Protokoll

Källkod: Lib/asyncio/protocols.py


asyncio tillhandahåller en uppsättning abstrakta basklasser som bör användas för att implementera nätverksprotokoll. Dessa klasser är avsedda att användas tillsammans med transports.

Underklasser av abstrakta basprotokollklasser kan implementera vissa eller alla metoder. Alla dessa metoder är callbacks: de anropas av transporter vid vissa händelser, t.ex. när data tas emot. En basprotokollmetod bör anropas av motsvarande transport.

Basprotokoll

class asyncio.BaseProtocol

Basprotokoll med metoder som är gemensamma för alla protokoll.

class asyncio.Protocol(BaseProtocol)

Basklassen för implementering av streamingprotokoll (TCP, Unix-sockets, etc).

class asyncio.BufferedProtocol(BaseProtocol)

En basklass för implementering av streamingprotokoll med manuell kontroll av mottagningsbufferten.

class asyncio.DatagramProtocol(BaseProtocol)

Basklassen för implementering av datagram (UDP)-protokoll.

class asyncio.SubprocessProtocol(BaseProtocol)

Basklass för implementering av protokoll som kommunicerar med underordnade processer (enkelriktade pipes).

Basprotokoll

Alla asyncioprotokoll kan implementera Base Protocol callbacks.

Callbacks för anslutning

Connection callbacks anropas på alla protokoll, exakt en gång per lyckad anslutning. Alla andra protokollåterkallelser kan bara anropas mellan dessa två metoder.

BaseProtocol.connection_made(transport)

Anropas när en anslutning har gjorts.

Argumentet transport är den transport som representerar anslutningen. Protokollet är ansvarigt för att lagra referensen till sin transport.

BaseProtocol.connection_lost(exc)

Anropas när anslutningen bryts eller stängs.

Argumentet är antingen ett undantagsobjekt eller None. Det senare innebär att en vanlig EOF tas emot, eller att anslutningen avbröts eller stängdes av den här sidan av anslutningen.

Återkallelser för flödeskontroll

Återanrop för flödeskontroll kan anropas av transporter för att pausa eller återuppta skrivning som utförs av protokollet.

Se dokumentationen för metoden set_write_buffer_limits() för mer information.

BaseProtocol.pause_writing()

Anropas när transportens buffert går över det högsta vattenmärket.

BaseProtocol.resume_writing()

Anropas när transportens buffert töms under lågvattenmärket.

Om buffertstorleken är lika med den höga vattenstämpeln anropas inte pause_writing(): buffertstorleken måste gå strikt över.

Omvänt anropas resume_writing() när buffertstorleken är lika med eller lägre än det låga vattenmärket. Dessa slutvillkor är viktiga för att säkerställa att saker och ting går som förväntat när något av märkena är noll.

Protokoll för streaming

Händelsemetoder som loop.create_server(), loop.create_unix_server(), loop.create_connection(), loop.create_unix_connection(), loop.connect_accepted_socket(), loop.connect_read_pipe() och loop.connect_write_pipe() accepterar fabriker som returnerar streamingprotokoll.

Protocol.data_received(data)

Anropas när data har tagits emot. data är ett icke-tomt bytes-objekt som innehåller inkommande data.

Om data buffras, delas upp i bitar eller sammanställs på nytt beror på transporten. I allmänhet bör du inte förlita dig på specifik semantik utan i stället göra din parsning generisk och flexibel. Data tas dock alltid emot i rätt ordning.

Metoden kan anropas ett godtyckligt antal gånger medan en anslutning är öppen.

Men protocol.eof_received() anropas högst en gång. När eof_received() har anropats anropas inte data_received() längre.

Protocol.eof_received()

Anropas när den andra änden signalerar att den inte kommer att skicka mer data (t.ex. genom att anropa transport.write_eof(), om den andra änden också använder asyncio).

Denna metod kan returnera ett falskt värde (inklusive None), i vilket fall transporten kommer att stänga sig själv. Omvänt, om denna metod returnerar ett sant värde, avgör det protokoll som används om transporten ska stängas. Eftersom standardimplementeringen returnerar None, stänger den implicit anslutningen.

Vissa transporter, inklusive SSL, stöder inte halvstängda anslutningar, i vilket fall returnering av true från denna metod kommer att resultera i att anslutningen stängs.

Statlig maskin:

start -> anslutning_gjord
    [-> data_mottagen]*
    [-> eof_received]?
-> anslutning_förlorad -> slut

Protokoll för buffrad strömning

Tillagd i version 3.7.

Buffrade protokoll kan användas med alla metoder för händelseslingor som stöder Streaming Protocols.

implementeringar av BufferedProtocol tillåter explicit manuell tilldelning och kontroll av mottagningsbufferten. Händelseslingor kan då använda den buffert som tillhandahålls av protokollet för att undvika onödiga datakopior. Detta kan resultera i märkbara prestandaförbättringar för protokoll som tar emot stora mängder data. Sofistikerade protokollimplementeringar kan avsevärt minska antalet buffertallokeringar.

Följande callbacks anropas på BufferedProtocol-instanser:

BufferedProtocol.get_buffer(sizehint)

Anropas för att allokera en ny mottagningsbuffert.

sizehint är den rekommenderade minimistorleken för den returnerade bufferten. Det är acceptabelt att returnera mindre eller större buffertar än vad sizehint föreslår. När den är satt till -1 kan buffertstorleken vara godtycklig. Det är ett fel att returnera en buffert med noll i storlek.

get_buffer() måste returnera ett objekt som implementerar buffer protocol.

BufferedProtocol.buffer_updated(nbytes)

Anropas när bufferten har uppdaterats med de mottagna uppgifterna.

nbytes är det totala antalet bytes som skrevs till bufferten.

BufferedProtocol.eof_received()

Se dokumentationen för metoden protocol.eof_received().

get_buffer() kan anropas ett godtyckligt antal gånger under en anslutning. Men protocol.eof_received() anropas högst en gång och om den anropas kommer get_buffer() och buffer_updated() inte att anropas efter den.

Statlig maskin:

start -> anslutning_gjord
    [-> hämta_buffer
        [-> buffert_uppdaterad]?
    ]*
    [-> eof_received]?
-> anslutning_förlorad -> slut

Datagram-protokoll

Datagram-protokollinstanser bör konstrueras av protokollfabriker som skickas till metoden loop.create_datagram_endpoint().

DatagramProtocol.datagram_received(data, addr)

Anropas när ett datagram tas emot. data är ett bytes-objekt som innehåller inkommande data. addr är adressen till den peer som skickar data; det exakta formatet beror på transportmedlet.

DatagramProtocol.error_received(exc)

Anropas när en tidigare sändnings- eller mottagningsoperation ger upphov till ett OSError. exc är en instans av OSError.

Denna metod anropas i sällsynta fall när transporten (t.ex. UDP) upptäcker att ett datagram inte kunde levereras till mottagaren. Under många förhållanden kommer dock datagram som inte kan levereras att släppas i tysthet.

Anteckning

På BSD-system (macOS, FreeBSD etc.) stöds inte flödeskontroll för datagramprotokoll, eftersom det inte finns något tillförlitligt sätt att upptäcka sändningsfel som orsakas av att för många paket skrivs.

Uttaget verkar alltid ”klart” och överflödiga paket släpps. Ett OSError med errno satt till errno.ENOBUFS kan eller inte kan uppstå; om det uppstår kommer det att rapporteras till DatagramProtocol.error_received() men annars ignoreras.

Protokoll för delprocesser

Subprocessprotokollinstanser bör konstrueras av protokollfabriker som skickas till metoderna loop.subprocess_exec() och loop.subprocess_shell().

SubprocessProtocol.pipe_data_received(fd, data)

Anropas när barnprocessen skriver data till sin stdout- eller stderr-pipeline.

fd är pipens filbeskrivare i heltal.

data är ett icke-tomt bytesobjekt som innehåller de mottagna data.

SubprocessProtocol.pipe_connection_lost(fd, exc)

Anropas när ett av de rör som kommunicerar med barnprocessen stängs.

fd är den heltalsfilsbeskrivare som stängdes.

SubprocessProtocol.process_exited()

Anropas när den underordnade processen har avslutats.

Den kan anropas före metoderna pipe_data_received() och pipe_connection_lost().

Exempel

TCP Echo-server

Skapa en TCP-ekoserver med hjälp av metoden loop.create_server(), skicka tillbaka mottagna data och stäng anslutningen:

import asyncio


class EchoServerProtocol(asyncio.Protocol):
    def connection_made(self, transport):
        peername = transport.get_extra_info('peername')
        print('Connection from {}'.format(peername))
        self.transport = transport

    def data_received(self, data):
        message = data.decode()
        print('Data received: {!r}'.format(message))

        print('Send: {!r}'.format(message))
        self.transport.write(data)

        print('Close the client socket')
        self.transport.close()


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    server = await loop.create_server(
        EchoServerProtocol,
        '127.0.0.1', 8888)

    async with server:
        await server.serve_forever()


asyncio.run(main())

Se även

I exemplet TCP echo server using streams används funktionen asyncio.start_server() på hög nivå.

TCP Echo-klient

En TCP-ekoklient som använder metoden loop.create_connection(), skickar data och väntar tills anslutningen stängs:

import asyncio


class EchoClientProtocol(asyncio.Protocol):
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        transport.write(self.message.encode())
        print('Data sent: {!r}'.format(self.message))

    def data_received(self, data):
        print('Data received: {!r}'.format(data.decode()))

    def connection_lost(self, exc):
        print('The server closed the connection')
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = 'Hello World!'

    transport, protocol = await loop.create_connection(
        lambda: EchoClientProtocol(message, on_con_lost),
        '127.0.0.1', 8888)

    # Wait until the protocol signals that the connection
    # is lost and close the transport.
    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

Se även

I exemplet TCP echo client using streams används högnivåfunktionen asyncio.open_connection().

UDP Echo-server

En UDP-ekoserver, som använder metoden loop.create_datagram_endpoint(), skickar tillbaka mottagna data:

import asyncio


class EchoServerProtocol:
    def connection_made(self, transport):
        self.transport = transport

    def datagram_received(self, data, addr):
        message = data.decode()
        print('Received %r from %s' % (message, addr))
        print('Send %r to %s' % (message, addr))
        self.transport.sendto(data, addr)


async def main():
    print("Starting UDP server")

    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    # One protocol instance will be created to serve all
    # client requests.
    transport, protocol = await loop.create_datagram_endpoint(
        EchoServerProtocol,
        local_addr=('127.0.0.1', 9999))

    try:
        await asyncio.sleep(3600)  # Serve for 1 hour.
    finally:
        transport.close()


asyncio.run(main())

UDP Echo-klient

En UDP-eko-klient, som använder metoden loop.create_datagram_endpoint(), skickar data och stänger transporten när den får svaret:

import asyncio


class EchoClientProtocol:
    def __init__(self, message, on_con_lost):
        self.message = message
        self.on_con_lost = on_con_lost
        self.transport = None

    def connection_made(self, transport):
        self.transport = transport
        print('Send:', self.message)
        self.transport.sendto(self.message.encode())

    def datagram_received(self, data, addr):
        print("Received:", data.decode())

        print("Close the socket")
        self.transport.close()

    def error_received(self, exc):
        print('Error received:', exc)

    def connection_lost(self, exc):
        print("Connection closed")
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    on_con_lost = loop.create_future()
    message = "Hello World!"

    transport, protocol = await loop.create_datagram_endpoint(
        lambda: EchoClientProtocol(message, on_con_lost),
        remote_addr=('127.0.0.1', 9999))

    try:
        await on_con_lost
    finally:
        transport.close()


asyncio.run(main())

Anslutning av befintliga uttag

Vänta tills ett uttag tar emot data med hjälp av metoden loop.create_connection() med protokollet:

import asyncio
import socket


class MyProtocol(asyncio.Protocol):

    def __init__(self, on_con_lost):
        self.transport = None
        self.on_con_lost = on_con_lost

    def connection_made(self, transport):
        self.transport = transport

    def data_received(self, data):
        print("Received:", data.decode())

        # We are done: close the transport;
        # connection_lost() will be called automatically.
        self.transport.close()

    def connection_lost(self, exc):
        # The socket has been closed
        self.on_con_lost.set_result(True)


async def main():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()
    on_con_lost = loop.create_future()

    # Create a pair of connected sockets
    rsock, wsock = socket.socketpair()

    # Register the socket to wait for data.
    transport, protocol = await loop.create_connection(
        lambda: MyProtocol(on_con_lost), sock=rsock)

    # Simulate the reception of data from the network.
    loop.call_soon(wsock.send, 'abc'.encode())

    try:
        await protocol.on_con_lost
    finally:
        transport.close()
        wsock.close()

asyncio.run(main())

Se även

I exemplet watch a file descriptor for read events används lågnivåmetoden loop.add_reader() för att registrera en FD.

I exemplet registrera ett öppet uttag för att vänta på data med hjälp av strömmar används strömmar på hög nivå som skapas av funktionen open_connection() i en coroutine.

loop.subprocess_exec() och SubprocessProtocol

Ett exempel på ett subprocessprotokoll som används för att få utdata från en subprocess och för att vänta på att subprocessen ska avslutas.

Subprocessen skapas med metoden loop.subprocess_exec():

import asyncio
import sys

class DateProtocol(asyncio.SubprocessProtocol):
    def __init__(self, exit_future):
        self.exit_future = exit_future
        self.output = bytearray()
        self.pipe_closed = False
        self.exited = False

    def pipe_connection_lost(self, fd, exc):
        self.pipe_closed = True
        self.check_for_exit()

    def pipe_data_received(self, fd, data):
        self.output.extend(data)

    def process_exited(self):
        self.exited = True
        # process_exited() method can be called before
        # pipe_connection_lost() method: wait until both methods are
        # called.
        self.check_for_exit()

    def check_for_exit(self):
        if self.pipe_closed and self.exited:
            self.exit_future.set_result(True)

async def get_date():
    # Get a reference to the event loop as we plan to use
    # low-level APIs.
    loop = asyncio.get_running_loop()

    code = 'import datetime; print(datetime.datetime.now())'
    exit_future = asyncio.Future(loop=loop)

    # Create the subprocess controlled by DateProtocol;
    # redirect the standard output into a pipe.
    transport, protocol = await loop.subprocess_exec(
        lambda: DateProtocol(exit_future),
        sys.executable, '-c', code,
        stdin=None, stderr=None)

    # Wait for the subprocess exit using the process_exited()
    # method of the protocol.
    await exit_future

    # Close the stdout pipe.
    transport.close()

    # Read the output which was collected by the
    # pipe_data_received() method of the protocol.
    data = bytes(protocol.output)
    return data.decode('ascii').rstrip()

date = asyncio.run(get_date())
print(f"Current date: {date}")

Se även samme exempel skrivet med hjälp av API:er på hög nivå.