Köer¶
Källkod: Lib/asyncio/queues.py
asyncio-köer är utformade för att likna klasser i modulen queue
. Även om asyncio-köer inte är trådsäkra är de utformade för att användas specifikt i async/await-kod.
Observera att metoder för asyncioköer inte har en timeout-parameter; använd asyncio.wait_for()
-funktionen för att göra köoperationer med en timeout.
Se även avsnittet Examples nedan.
Kö¶
- class asyncio.Queue(maxsize=0)¶
En FIFO-kö (först in, först ut).
Om maxsize är mindre än eller lika med noll är köns storlek oändlig. Om det är ett heltal större än
0
, blockerarawait put()
när kön når maxsize tills ett objekt tas bort medget()
.Till skillnad från standardbibliotekets trådning
queue
är storleken på kön alltid känd och kan returneras genom att anropa metodenqsize()
.Ändrad i version 3.10: Parametern loop har tagits bort.
Den här klassen är inte trådsäker.
- maxsize¶
Antal objekt som tillåts i kön.
- empty()¶
Returnerar
True
om kön är tom, annarsFalse
.
- full()¶
Returnerar
True
om det finnsmaxsize
objekt i kön.Om kön initialiserades med
maxsize=0
(standard) returnerarfull()
aldrigTrue
.
- async get()¶
Ta bort och returnera ett objekt från kön. Om kön är tom, vänta tills ett objekt är tillgängligt.
Utlöser
QueueShutDown
om kön har stängts av och är tom, eller om kön har stängts av omedelbart.
- get_nowait()¶
Returnera ett objekt om det finns ett omedelbart tillgängligt, annars uppstår
QueueEmpty
.
- async join()¶
Blockera tills alla artiklar i kön har tagits emot och behandlats.
Antalet oavslutade uppgifter ökar varje gång ett objekt läggs till i kön. Antalet sjunker när en consumer coroutine anropar
task_done()
för att ange att objektet har hämtats och att allt arbete med det är slutfört. När antalet oavslutade uppgifter sjunker till noll avblockerasjoin()
.
- async put(item)¶
Lägg till ett objekt i kön. Om kön är full väntar du tills en ledig plats finns tillgänglig innan du lägger till objektet.
Utlöser
QueueShutDown
om kön har stängts av.
- put_nowait(item)¶
Lägg in ett objekt i kön utan att blockera.
Om ingen ledig plats finns omedelbart tillgänglig, uppstår
QueueFull
.
- qsize()¶
Returnerar antalet objekt i kön.
- shutdown(immediate=False)¶
Stänger av kön, vilket gör att
get()
ochput()
ger upphov tillQueueShutDown
.Som standard kommer
get()
på en avstängd kö inte att aktiveras förrän kön är tom. Sätt immediate till true för att fåget()
att starta omedelbart istället.Alla blockerade anropare av
put()
ochget()
kommer att avblockeras. Om immediate är true, kommer en uppgift att markeras som utförd för varje återstående objekt i kön, vilket kan avblockera anropare avjoin()
.Tillagd i version 3.13.
- task_done()¶
Anger att ett arbetsobjekt som tidigare var i kö är klart.
Används av köanvändare. För varje
get()
som används för att hämta ett arbetsobjekt, talar ett efterföljande anrop tilltask_done()
om för kön att bearbetningen av arbetsobjektet är klar.Om en
join()
för närvarande blockeras, kommer den att återupptas när alla objekt har bearbetats (vilket innebär att etttask_done()
-anrop mottogs för varje objekt som hadeput()
i kön).shutdown(immediate=True)
anropartask_done()
för varje återstående objekt i kön.Utlöser
ValueError
om den anropas fler gånger än det finns objekt i kön.
Prioriterad kö¶
LIFO-kö¶
Undantag¶
- exception asyncio.QueueEmpty¶
Detta undantag uppstår när metoden
get_nowait()
anropas på en tom kö.
- exception asyncio.QueueFull¶
Exception som uppstår när metoden
put_nowait()
anropas på en kö som har nått sin maxsize.
Exempel¶
Köer kan användas för att fördela arbetsbelastningen mellan flera samtidiga uppgifter:
import asyncio
import random
import time
async def worker(name, queue):
while True:
# Get a "work item" out of the queue.
sleep_for = await queue.get()
# Sleep for the "sleep_for" seconds.
await asyncio.sleep(sleep_for)
# Notify the queue that the "work item" has been processed.
queue.task_done()
print(f'{name} has slept for {sleep_for:.2f} seconds')
async def main():
# Create a queue that we will use to store our "workload".
queue = asyncio.Queue()
# Generate random timings and put them into the queue.
total_sleep_time = 0
for _ in range(20):
sleep_for = random.uniform(0.05, 1.0)
total_sleep_time += sleep_for
queue.put_nowait(sleep_for)
# Create three worker tasks to process the queue concurrently.
tasks = []
for i in range(3):
task = asyncio.create_task(worker(f'worker-{i}', queue))
tasks.append(task)
# Wait until the queue is fully processed.
started_at = time.monotonic()
await queue.join()
total_slept_for = time.monotonic() - started_at
# Cancel our worker tasks.
for task in tasks:
task.cancel()
# Wait until all worker tasks are cancelled.
await asyncio.gather(*tasks, return_exceptions=True)
print('====')
print(f'3 workers slept in parallel for {total_slept_for:.2f} seconds')
print(f'total expected sleep time: {total_sleep_time:.2f} seconds')
asyncio.run(main())