Köer

Källkod: Lib/asyncio/queues.py


asyncio-köer är utformade för att likna klasser i modulen queue. Även om asyncio-köer inte är trådsäkra är de utformade för att användas specifikt i async/await-kod.

Observera att metoder för asyncioköer inte har en timeout-parameter; använd asyncio.wait_for()-funktionen för att göra köoperationer med en timeout.

Se även avsnittet Examples nedan.

class asyncio.Queue(maxsize=0)

En FIFO-kö (först in, först ut).

Om maxsize är mindre än eller lika med noll är köns storlek oändlig. Om det är ett heltal större än 0, blockerar await put() när kön når maxsize tills ett objekt tas bort med get().

Till skillnad från standardbibliotekets trådning queue är storleken på kön alltid känd och kan returneras genom att anropa metoden qsize().

Ändrad i version 3.10: Parametern loop har tagits bort.

Den här klassen är inte trådsäker.

maxsize

Antal objekt som tillåts i kön.

empty()

Returnerar True om kön är tom, annars False.

full()

Returnerar True om det finns maxsize objekt i kön.

Om kön initialiserades med maxsize=0 (standard) returnerar full() aldrig True.

async get()

Ta bort och returnera ett objekt från kön. Om kön är tom, vänta tills ett objekt är tillgängligt.

Utlöser QueueShutDown om kön har stängts av och är tom, eller om kön har stängts av omedelbart.

get_nowait()

Returnera ett objekt om det finns ett omedelbart tillgängligt, annars uppstår QueueEmpty.

async join()

Blockera tills alla artiklar i kön har tagits emot och behandlats.

Antalet oavslutade uppgifter ökar varje gång ett objekt läggs till i kön. Antalet sjunker när en consumer coroutine anropar task_done() för att ange att objektet har hämtats och att allt arbete med det är slutfört. När antalet oavslutade uppgifter sjunker till noll avblockeras join().

async put(item)

Lägg till ett objekt i kön. Om kön är full väntar du tills en ledig plats finns tillgänglig innan du lägger till objektet.

Utlöser QueueShutDown om kön har stängts av.

put_nowait(item)

Lägg in ett objekt i kön utan att blockera.

Om ingen ledig plats finns omedelbart tillgänglig, uppstår QueueFull.

qsize()

Returnerar antalet objekt i kön.

shutdown(immediate=False)

Stänger av kön, vilket gör att get() och put() ger upphov till QueueShutDown.

Som standard kommer get() på en avstängd kö inte att aktiveras förrän kön är tom. Sätt immediate till true för att få get() att starta omedelbart istället.

Alla blockerade anropare av put() och get() kommer att avblockeras. Om immediate är true, kommer en uppgift att markeras som utförd för varje återstående objekt i kön, vilket kan avblockera anropare av join().

Tillagd i version 3.13.

task_done()

Anger att ett arbetsobjekt som tidigare var i kö är klart.

Används av köanvändare. För varje get() som används för att hämta ett arbetsobjekt, talar ett efterföljande anrop till task_done() om för kön att bearbetningen av arbetsobjektet är klar.

Om en join() för närvarande blockeras, kommer den att återupptas när alla objekt har bearbetats (vilket innebär att ett task_done()-anrop mottogs för varje objekt som hade put() i kön).

shutdown(immediate=True) anropar task_done() för varje återstående objekt i kön.

Utlöser ValueError om den anropas fler gånger än det finns objekt i kön.

Prioriterad kö

class asyncio.PriorityQueue

En variant av Queue; hämtar poster i prioritetsordning (lägst först).

Posterna är vanligtvis tupler av formen (priority_number, data).

LIFO-kö

class asyncio.LifoQueue

En variant av Queue som hämtar de senast tillagda posterna först (sist in, först ut).

Undantag

exception asyncio.QueueEmpty

Detta undantag uppstår när metoden get_nowait() anropas på en tom kö.

exception asyncio.QueueFull

Exception som uppstår när metoden put_nowait() anropas på en kö som har nått sin maxsize.

exception asyncio.QueueShutDown

Exception som uppstår när put() eller get() anropas på en kö som har stängts ner.

Tillagd i version 3.13.

Exempel

Köer kan användas för att fördela arbetsbelastningen mellan flera samtidiga uppgifter:

import asyncio
import random
import time


async def worker(name, queue):
    while True:
        # Get a "work item" out of the queue.
        sleep_for = await queue.get()

        # Sleep for the "sleep_for" seconds.
        await asyncio.sleep(sleep_for)

        # Notify the queue that the "work item" has been processed.
        queue.task_done()

        print(f'{name} has slept for {sleep_for:.2f} seconds')


async def main():
    # Create a queue that we will use to store our "workload".
    queue = asyncio.Queue()

    # Generate random timings and put them into the queue.
    total_sleep_time = 0
    for _ in range(20):
        sleep_for = random.uniform(0.05, 1.0)
        total_sleep_time += sleep_for
        queue.put_nowait(sleep_for)

    # Create three worker tasks to process the queue concurrently.
    tasks = []
    for i in range(3):
        task = asyncio.create_task(worker(f'worker-{i}', queue))
        tasks.append(task)

    # Wait until the queue is fully processed.
    started_at = time.monotonic()
    await queue.join()
    total_slept_for = time.monotonic() - started_at

    # Cancel our worker tasks.
    for task in tasks:
        task.cancel()
    # Wait until all worker tasks are cancelled.
    await asyncio.gather(*tasks, return_exceptions=True)

    print('====')
    print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
    print(f'total expected sleep time: {total_sleep_time:.2f} seconds')


asyncio.run(main())