multiprocessing
— Processbaserad parallellism¶
Källkod: Lib/multiprocessing/
Tillgänglighet: not Android, not iOS, not WASI.
Denna modul stöds inte på mobile platforms eller WebAssembly platforms.
Introduktion¶
multiprocessing
är ett paket med stöd för att skapa processer med hjälp av ett API som liknar modulen threading
. Paketet multiprocessing
erbjuder både lokal och fjärransluten samtidighet, vilket effektivt kringgår Global Interpreter Lock genom att använda underprocesser istället för trådar. På grund av detta tillåter multiprocessing
-modulen programmeraren att fullt ut utnyttja flera processorer på en given maskin. Den körs på både POSIX och Windows.
Modulen multiprocessing
introducerar också API:er som inte har någon motsvarighet i modulen threading
. Ett utmärkt exempel på detta är Pool
-objektet som erbjuder ett bekvämt sätt att parallellisera exekveringen av en funktion över flera indatavärden genom att distribuera indata över processer (dataparallellism). Följande exempel visar hur det är vanligt att definiera sådana funktioner i en modul så att underordnade processer kan importera den modulen. Detta grundläggande exempel på dataparallellism använder Pool
,
från multiprocessing import Pool
def f(x):
returnerar x*x
om __name__ == '__main__':
med Pool(5) som p:
print(p.map(f, [1, 2, 3]))
kommer att skrivas ut till standardutmatningen
[1, 4, 9]
Se även
concurrent.futures.ProcessPoolExecutor
erbjuder ett gränssnitt på högre nivå för att flytta uppgifter till en bakgrundsprocess utan att blockera körningen av den anropande processen. Jämfört med att använda gränssnittet Pool
direkt, gör API:t concurrent.futures
det lättare att separera inlämning av arbete till den underliggande processpoolen från väntan på resultat.
Klassen Process
¶
I multiprocessing
startas processer genom att skapa ett Process
-objekt och sedan anropa dess start()
-metod. Process
följer API:et för threading.Thread
. Ett trivialt exempel på ett multiprocessprogram är
från multiprocessing import Process
def f(namn):
print('hallå', namn)
if __name__ == '__main__':
p = Process(mål=f, args=('bob',))
p.start()
p.join()
För att visa de enskilda process-ID:n som är inblandade följer här ett utökat exempel:
from multiprocessing import Process
import os
def info(title):
print(title)
print('module name:', __name__)
print('parent process:', os.getppid())
print('process id:', os.getpid())
def f(name):
info('function f')
print('hello', name)
if __name__ == '__main__':
info('main line')
p = Process(target=f, args=('bob',))
p.start()
p.join()
För en förklaring till varför if __name__ == '__main__'
-delen är nödvändig, se Riktlinjer för programmering.
Argumenten till Process
måste vanligtvis vara omöjliga att plocka upp från barnprocessen. Om du försökte skriva in ovanstående exempel direkt i en REPL skulle det kunna leda till ett AttributeError
i barnprocessen som försöker hitta funktionen f i modulen __main__
.
Sammanhang och startmetoder¶
Beroende på plattform har multiprocessing
stöd för tre sätt att starta en process. Dessa startmetoder är
- spawn
Den överordnade processen startar en ny Python-tolkprocess. Barnprocessen kommer endast att ärva de resurser som krävs för att köra processobjektets
run()
-metod. I synnerhet kommer onödiga filbeskrivare och handtag från den överordnade processen inte att ärvas. Att starta en process med den här metoden är ganska långsamt jämfört med att använda fork eller forkserver.Finns på POSIX- och Windows-plattformar. Standard på Windows och macOS.
- gaffel
Den överordnade processen använder
os.fork()
för att förgrena Python-tolken. När barnprocessen startar är den i praktiken identisk med den överordnade processen. Alla resurser från den överordnade processen ärvs av barnprocessen. Observera att det är problematiskt att på ett säkert sätt forka en flertrådad process.Tillgänglig på POSIX-system.
Ändrad i version 3.14: Detta är inte längre standardstartmetoden på någon plattform. Kod som kräver fork måste uttryckligen ange det via
get_context()
ellerset_start_method()
.Ändrad i version 3.12: Om Python kan upptäcka att din process har flera trådar, kommer funktionen
os.fork()
som denna startmetod anropar internt att ge upphov till enDeprecationWarning
. Använd en annan startmetod. Se dokumentationen föros.fork()
för ytterligare förklaringar.
- forkserver
När programmet startas och startmetoden forkserver väljs, skapas en serverprocess. Från och med då, när en ny process behövs, ansluter den överordnade processen till servern och begär att den forkar en ny process. Fork-serverprocessen är enkeltrådad om inte systembibliotek eller förinstallerade importer skapar trådar som en bieffekt, så det är i allmänhet säkert för den att använda
os.fork()
. Inga onödiga resurser ärvs.Finns på POSIX-plattformar som stöder överföring av filbeskrivare via Unix pipes, t.ex. Linux. Standard på dessa.
Ändrad i version 3.14: Detta blev standardstartmetoden på POSIX-plattformar.
Ändrad i version 3.4: spawn har lagts till på alla POSIX-plattformar och forkserver har lagts till på vissa POSIX-plattformar. Barnprocesser ärver inte längre alla föräldrarnas ärftliga handtag i Windows.
Ändrad i version 3.8: På macOS är startmetoden spawn nu standard. Startmetoden fork bör betraktas som osäker eftersom den kan leda till att underprocessen kraschar eftersom systembiblioteken i macOS kan starta trådar. Se bpo-33725.
Ändrad i version 3.14: På POSIX-plattformar ändrades standardstartmetoden från fork till forkserver för att bibehålla prestandan men undvika vanliga inkompatibiliteter med flertrådade processer. Se gh-84559.
På POSIX startar startmetoderna spawn eller forkserver även en resource tracker-process som spårar de olänkade namngivna systemresurser (t.ex. namngivna semaforer eller SharedMemory
-objekt) som skapas av processer i programmet. När alla processer har avslutats kopplar resursspåraren bort eventuella kvarvarande spårningsobjekt. Vanligtvis bör det inte finnas några, men om en process dödades av en signal kan det finnas några ”läckta” resurser. (Varken läckta semaforer eller delade minnessegment kommer automatiskt att avlänkas förrän vid nästa omstart. Detta är problematiskt för båda objekten eftersom systemet endast tillåter ett begränsat antal namngivna semaforer, och delade minnessegment upptar en del utrymme i huvudminnet)
För att välja en startmetod använder du set_start_method()
i if __name__ == '__main__'
i huvudmodulens klausul. Till exempel:
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
mp.set_start_method('spawn')
q = mp.Queue()
p = mp.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
set_start_method()
bör inte användas mer än en gång i programmet.
Alternativt kan du använda get_context()
för att hämta ett kontextobjekt. Context-objekt har samma API som multiprocessing-modulen och gör att man kan använda flera startmetoder i samma program.
import multiprocessing as mp
def foo(q):
q.put('hello')
if __name__ == '__main__':
ctx = mp.get_context('spawn')
q = ctx.Queue()
p = ctx.Process(target=foo, args=(q,))
p.start()
print(q.get())
p.join()
Observera att objekt som är relaterade till en kontext kanske inte är kompatibla med processer för en annan kontext. I synnerhet kan lås som skapats med kontexten fork inte skickas till processer som startats med startmetoderna spawn eller forkserver.
Bibliotek som använder multiprocessing
eller ProcessPoolExecutor
bör utformas så att de tillåter sina användare att tillhandahålla sin egen multiprocessing-kontext. Om du använder en egen specifik kontext i ett bibliotek kan det leda till inkompatibilitet med resten av biblioteksanvändarens applikation. Dokumentera alltid om ditt bibliotek kräver en specifik startmetod.
Varning
Startmetoderna 'spawn'
och 'forkserver'
kan i allmänhet inte användas med ”frysta” körbara filer (dvs. binära filer som produceras av paket som PyInstaller och cx_Freeze) på POSIX-system. Startmetoden 'fork'
kan fungera om koden inte använder trådar.
Utbyte av objekt mellan processer¶
multiprocessing
stöder två typer av kommunikationskanaler mellan processer:
Köer
Klassen
Queue
är en nära klon avqueue.Queue
. Till exempel:från multiprocessing import Process, Kö def f(q): q.put([42, None, 'hallå']) if __name__ == '__main__': q = kö() p = Process(mål=f, args=(q,)) p.start() print(q.get()) # skriver ut "[42, None, 'hello']" p.join()Köer är tråd- och processäkra. Alla objekt som sätts in i en
multiprocessing
-kö kommer att serialiseras.
Pipor
Funktionen
Pipe()
returnerar ett par anslutningsobjekt som är anslutna med en pipe som standard är duplex (tvåvägs). Till exempel:från multiprocessing import Process, Rör def f(conn): conn.send([42, None, 'hallå']) conn.close() if __name__ == '__main__': parent_conn, child_conn = Pipe() p = Process(mål=f, args=(barn_conn,)) p.start() print(parent_conn.recv()) # skriver ut "[42, None, 'hello']" p.join()De två anslutningsobjekt som returneras av
Pipe()
representerar rörets två ändar. Varje anslutningsobjekt har metodernasend()
ochrecv()
(bland andra). Observera att data i en pipe kan bli korrumperade om två processer (eller trådar) försöker läsa från eller skriva till samma ände av pipen samtidigt. Naturligtvis finns det ingen risk för korruption från processer som använder olika ändar av pipen samtidigt.Metoden
send()
serialiserar objektet ochrecv()
återskapar objektet.
Synkronisering mellan processer¶
multiprocessing
innehåller motsvarigheter till alla synkroniseringsprimitiver från threading
. Till exempel kan man använda ett lås för att säkerställa att endast en process skriver ut till standardutdata åt gången:
från multiprocessing import Process, Lås
def f(l, i):
l.acquire()
försök:
print('hello world', i)
slutligen:
l.release()
if __name__ == '__main__':
lock = Lock()
för num i intervall(10):
Process(mål=f, args=(lock, num)).start()
Om man inte använder låset kan utdata från de olika processerna blandas ihop.
Använda en pool av arbetstagare¶
Klassen Pool
representerar en pool av arbetsprocesser. Den har metoder som gör det möjligt att avlasta uppgifter till arbetsprocesserna på några olika sätt.
Till exempel:
from multiprocessing import Pool, TimeoutError
import time
import os
def f(x):
return x*x
if __name__ == '__main__':
# start 4 worker processes
with Pool(processes=4) as pool:
# print "[0, 1, 4,..., 81]"
print(pool.map(f, range(10)))
# print same numbers in arbitrary order
for i in pool.imap_unordered(f, range(10)):
print(i)
# evaluate "f(20)" asynchronously
res = pool.apply_async(f, (20,)) # runs in *only* one process
print(res.get(timeout=1)) # prints "400"
# evaluate "os.getpid()" asynchronously
res = pool.apply_async(os.getpid, ()) # runs in *only* one process
print(res.get(timeout=1)) # prints the PID of that process
# launching multiple evaluations asynchronously *may* use more processes
multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
print([res.get(timeout=1) for res in multiple_results])
# make a single worker sleep for 10 seconds
res = pool.apply_async(time.sleep, (10,))
try:
print(res.get(timeout=1))
except TimeoutError:
print("We lacked patience and got a multiprocessing.TimeoutError")
print("For the moment, the pool remains available for more work")
# exiting the 'with'-block has stopped the pool
print("Now the pool is closed and no longer available")
Observera att metoderna i en pool endast ska användas av den process som skapade den.
Anteckning
Funktionalitet inom detta paket kräver att modulen __main__
kan importeras av underordnade program. Detta behandlas i Riktlinjer för programmering men det är värt att påpeka här. Detta innebär att vissa exempel, såsom multiprocessing.pool.Pool
exemplen inte kommer att fungera i den interaktiva tolken. Till exempel:
>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
... return x*x
...
>>> with p:
... p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
(Om du försöker detta kommer det faktiskt att matas ut tre fullständiga spårningar som är sammanflätade på ett halvt slumpmässigt sätt, och då kanske du måste stoppa den överordnade processen på något sätt)
Referens¶
Paketet multiprocessing
replikerar till största delen API:et för modulen threading
.
Process
och undantag¶
- class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)¶
Processobjekt representerar aktivitet som körs i en separat process. Klassen
Process
har motsvarigheter till alla metoder ithreading.Thread
.Konstruktorn ska alltid anropas med nyckelordsargument. group bör alltid vara
None
; den existerar enbart för kompatibilitet medthreading.Thread
. target är det anropsbara objekt som skall anropas avrun()
-metoden. Standardvärdet ärNone
, vilket betyder att ingenting anropas. name är processnamnet (sename
för mer information). args är argumenttupeln för målinkallningen. kwargs är en ordbok med nyckelordsargument för målanropet. Om det tillhandahålls sätter det enda nyckelordsargumentet daemon processens flaggadaemon
tillTrue
ellerFalse
. OmNone
(standard), kommer denna flagga att ärvas från den skapande processen.Som standard skickas inga argument till target. Argumentet args, som som standard är
()
, kan användas för att ange en lista eller tupel av de argument som ska skickas till target.Om en subklass åsidosätter konstruktorn måste den se till att den anropar basklassens konstruktor (
super().__init__()
) innan den gör något annat med processen.Anteckning
I allmänhet måste alla argument till
Process
vara plockbara. Detta observeras ofta när man försöker skapa enProcess
eller använda enconcurrent.futures.ProcessPoolExecutor
från en REPL med en lokalt definierad target-funktion.Att skicka ett anropsbart objekt som definierats i den aktuella REPL-sessionen gör att barnprocessen dör via ett icke fångat
AttributeError
-undantag när den startas eftersom target måste ha definierats i en importerbar modul för att kunna laddas under unpickling.Exempel på detta oavhjälpliga fel från barnet:
>>> import multiprocessing as mp >>> def knigit(): ... print("Ni!") ... >>> process = mp.Process(target=knigit) >>> process.start() >>> Traceback (most recent call last): File ".../multiprocessing/spawn.py", line ..., in spawn_main File ".../multiprocessing/spawn.py", line ..., in _main AttributeError: module '__main__' has no attribute 'knigit' >>> process <SpawnProcess name='SpawnProcess-1' pid=379473 parent=378707 stopped exitcode=1>
Se Startmetoderna spawn och forkserver. Även om denna begränsning inte är sann om man använder startmetoden
"fork"
, så är den inte längre standard på någon plattform från och med Python3.14
. Se Sammanhang och startmetoder. Se även gh-132898.Ändrad i version 3.3: Parametern daemon har lagts till.
- run()¶
Metod som representerar processens aktivitet.
Du kan åsidosätta denna metod i en subklass. Standardmetoden
run()
anropar det anropsbara objekt som skickats till objektets konstruktör som målargument, om något, med sekventiella argument och nyckelordsargument som hämtas från argumenten args respektive kwargs.Samma effekt uppnås genom att använda en lista eller tupel som args-argument i
Process
.Exempel:
>>> from multiprocessing import Process >>> p = Process(target=print, args=[1]) >>> p.run() 1 >>> p = Process(target=print, args=(1,)) >>> p.run() 1
- start()¶
Starta processens aktivitet.
Detta måste anropas högst en gång per processobjekt. Det ordnar så att objektets
run()
-metod anropas i en separat process.
- join([timeout])¶
Om det valfria argumentet timeout är
None
(standard) blockeras metoden tills den process varsjoin()
-metod anropas avslutas. Om timeout är ett positivt tal blockeras den i högst timeout sekunder. Observera att metoden returnerarNone
om dess process avslutas eller om metoden tidsbegränsas. Kontrollera processensexitcode
för att avgöra om den avslutades.En process kan anslutas flera gånger.
En process kan inte ansluta sig till sig själv eftersom detta skulle orsaka ett dödläge. Det är ett fel att försöka ansluta sig till en process innan den har startats.
- name¶
Processens namn. Namnet är en sträng som endast används i identifieringssyfte. Det har ingen semantik. Flera processer kan ges samma namn.
Det initiala namnet sätts av konstruktören. Om inget explicit namn anges till konstruktören, konstrueras ett namn av formen ”Process-N1:N2:…:Nk”, där varje Nk är det N:te barnet till sin förälder.
- is_alive()¶
Returnerar om processen är vid liv.
Ett processobjekt är i princip vid liv från det ögonblick då metoden
start()
returneras till dess att barnprocessen avslutas.
- daemon¶
Processens daemonflagga, ett booleanskt värde. Detta måste anges innan
start()
anropas.Det initiala värdet ärvs från skapandeprocessen.
När en process avslutas försöker den avsluta alla sina daemoniska underordnade processer.
Observera att en daemonisk process inte får skapa underordnade processer. I annat fall skulle en daemonisk process lämna sina barn föräldralösa om den avslutas när dess föräldraprocess avslutas. Dessutom är detta inte Unix daemons eller tjänster, det är normala processer som kommer att avslutas (och inte anslutas) om icke-daemoniska processer har avslutats.
Förutom API:et
threading.Thread
stöderProcess
-objekten även följande attribut och metoder:- pid¶
Returnerar processens ID. Innan processen startas kommer detta att vara
None
.
- exitcode¶
Barnets utgångskod. Denna kommer att vara
None
om processen ännu inte har avslutats.Om barnets
run()
-metod returnerades normalt blir utgångskoden 0. Om den avslutades viasys.exit()
med ett heltalsargument N blir utgångskoden N.Om barnet avslutades på grund av ett undantag som inte fångades upp inom
run()
, kommer utgångskoden att vara 1. Om det avslutades av signalen N kommer utgångskoden att vara det negativa värdet -N.
- authkey¶
Processens autentiseringsnyckel (en byte-sträng).
När
multiprocessing
initieras tilldelas huvudprocessen en slumpmässig sträng med hjälp avos.urandom()
.När ett
Process
-objekt skapas kommer det att ärva autentiseringsnyckeln för sin överordnade process, även om detta kan ändras genom att angeauthkey
till en annan byte-sträng.
- sentinel¶
Ett numeriskt handtag för ett systemobjekt som kommer att bli ”klart” när processen avslutas.
Du kan använda det här värdet om du vill vänta på flera händelser samtidigt med hjälp av
multiprocessing.connection.wait()
. Annars är det enklare att anropajoin()
.I Windows är detta ett OS-handtag som kan användas med API-anropen
WaitForSingleObject
ochWaitForMultipleObjects
. På POSIX är detta en filbeskrivare som kan användas med primitiver från modulenselect
.Tillagd i version 3.3.
- interrupt()¶
Avslutar processen. Fungerar på POSIX med hjälp av signalen
SIGINT
. Beteendet på Windows är odefinierat.Som standard avslutas den underordnade processen genom att
KeyboardInterrupt
aktiveras. Detta beteende kan ändras genom att ställa in respektive signalhanterare i barnprocessensignal.signal()
förSIGINT
.Obs: om barnprocessen fångar upp och kastar
KeyboardInterrupt
, kommer processen inte att avslutas.Notera: standardbeteendet kommer också att sätta
exitcode
till1
som om ett undantag utan åtgärd har uppstått i barnprocessen. För att ha en annanexitcode
kan du helt enkelt fångaKeyboardInterrupt
och anropaexit(your_code)
.Tillagd i version 3.14.
- terminate()¶
Avsluta processen. På POSIX görs detta med hjälp av signalen
SIGTERM
; på Windows användsTerminateProcess()
. Observera att exit-handlers och finally-klausuler etc. inte kommer att exekveras.Observera att processer som härstammar från processen inte kommer att avslutas - de kommer helt enkelt att bli föräldralösa.
Varning
Om denna metod används när den associerade processen använder en pipe eller en kö, kan pipen eller kön bli skadad och bli oanvändbar för andra processer. På samma sätt, om processen har förvärvat ett lås eller en semafor etc., kan avslutningen av den leda till att andra processer fastnar.
- kill()¶
Samma som
terminate()
men använder signalenSIGKILL
på POSIX.Tillagd i version 3.7.
- close()¶
Stänger
Process
-objektet och frigör alla resurser som är associerade med det.ValueError
visas om den underliggande processen fortfarande körs. Närclose()
har returnerats framgångsrikt kommer de flesta andra metoder och attribut förProcess
-objektet att ge upphov tillValueError
.Tillagd i version 3.7.
Observera att metoderna
start()
,join()
,is_alive()
,terminate()
ochexitcode
endast ska anropas av den process som skapade processobjektet.Exempel på användning av några av metoderna i
Process
:>>> import multiprocessing, time, signal >>> mp_context = multiprocessing.get_context('spawn') >>> p = mp_context.Process(target=time.sleep, args=(1000,)) >>> print(p, p.is_alive()) <...Process ... initial> False >>> p.start() >>> print(p, p.is_alive()) <...Process ... started> True >>> p.terminate() >>> time.sleep(0.1) >>> print(p, p.is_alive()) <...Process ... stopped exitcode=-SIGTERM> False >>> p.exitcode == -signal.SIGTERM True
- exception multiprocessing.ProcessError¶
Basklassen för alla
multiprocessing
-undantag.
- exception multiprocessing.BufferTooShort¶
Exception orsakad av
Connection.recv_bytes_into()
när det medföljande buffertobjektet är för litet för det lästa meddelandet.Om
e
är en instans avBufferTooShort
så kommere.args[0]
att ge meddelandet som en byte-sträng.
- exception multiprocessing.AuthenticationError¶
Uppstår när det finns ett autentiseringsfel.
- exception multiprocessing.TimeoutError¶
Utlöses av metoder med en timeout när timeouten löper ut.
Pipes och köer¶
När man använder flera processer använder man i allmänhet meddelandepassning för kommunikation mellan processerna och undviker att behöva använda synkroniseringsprimitiver som lås.
För att skicka meddelanden kan man använda Pipe()
(för en anslutning mellan två processer) eller en kö (som tillåter flera producenter och konsumenter).
Typerna Queue
, SimpleQueue
och JoinableQueue
är köer med flera producenter och flera konsumenter FIFO som är modellerade efter klassen queue.Queue
i standardbiblioteket. De skiljer sig åt genom att Queue
saknar metoderna task_done()
och join()
som introducerades i Python 2.5:s klass queue.Queue
.
Om du använder JoinableQueue
så måste du anropa JoinableQueue.task_done()
för varje uppgift som tas bort från kön, annars kan semaforen som används för att räkna antalet oavslutade uppgifter så småningom svämma över och orsaka ett undantag.
En skillnad från andra köimplementationer i Python är att multiprocessing
-köer serialiserar alla objekt som läggs in i dem med pickle
. Objektet som returneras av get-metoden är ett återskapat objekt som inte delar minne med det ursprungliga objektet.
Observera att man också kan skapa en delad kö genom att använda ett manager-objekt – se Ansvariga.
Anteckning
multiprocessing
använder de vanliga undantagen queue.Empty
och queue.Full
för att signalera en timeout. De är inte tillgängliga i namnrymden multiprocessing
så du måste importera dem från queue
.
Anteckning
När ett objekt sätts i en kö picklas objektet och en bakgrundstråd spolar senare ut den picklade datan till en underliggande pipe. Detta har några konsekvenser som är lite överraskande, men bör inte orsaka några praktiska problem - om de verkligen stör dig kan du istället använda en kö som skapats med en manager.
Efter att ha lagt ett objekt på en tom kö kan det bli en oändligt liten fördröjning innan köns metod
empty()
returnerarFalse
ochget_nowait()
kan returnera utan att ge upphov tillqueue.Empty
.Om flera processer ställer objekt i kö är det möjligt att objekten tas emot i den andra änden i fel ordning. Objekt som köas av samma process kommer dock alltid att vara i den förväntade ordningen i förhållande till varandra.
Varning
Om en process dödas med Process.terminate()
eller os.kill()
medan den försöker använda en Queue
, är det troligt att data i kön blir skadade. Detta kan leda till att en annan process får ett undantag när den försöker använda kön senare.
Varning
Som nämnts ovan, om en underordnad process har lagt objekt i en kö (och inte har använt JoinableQueue.cancel_join_thread
), kommer den processen inte att avslutas förrän alla buffrade objekt har spolats till pipen.
Det innebär att om du försöker ansluta dig till den processen kan du få ett dödläge om du inte är säker på att alla objekt som har ställts i kö har förbrukats. På samma sätt, om barnprocessen är icke-daemonisk, kan föräldraprocessen hänga sig vid utgången när den försöker ansluta sig till alla sina icke-daemoniska barn.
Observera att en kö som skapats med hjälp av en manager inte har detta problem. Se Riktlinjer för programmering.
För ett exempel på användning av köer för kommunikation mellan processer, se Exempel.
- multiprocessing.Pipe([duplex])¶
Returnerar ett par
(conn1, conn2)
avConnection
-objekt som representerar ändarna av ett rör.Om duplex är
True
(standard) är röret dubbelriktat. Om duplex ärFalse
är pipen enkelriktad:conn1
kan bara användas för att ta emot meddelanden ochconn2
kan bara användas för att skicka meddelanden.Metoden
send()
serialiserar objektet med hjälp avpickle
och metodenrecv()
återskapar objektet.
- class multiprocessing.Queue([maxsize])¶
Returnerar en processdelad kö som implementerats med hjälp av en pipe och några lås/semaphores. När en process först lägger ett objekt i kön startas en matartråd som överför objekt från en buffert till pipen.
De vanliga undantagen
queue.Empty
ochqueue.Full
från standardbibliotekets modulqueue
används för att signalera tidsavbrott.Queue
implementerar alla metoder iqueue.Queue
utomtask_done()
ochjoin()
.- qsize()¶
Returnerar den ungefärliga storleken på kön. På grund av semantiken för multithreading/multiprocessing är detta tal inte tillförlitligt.
Observera att detta kan ge upphov till
NotImplementedError
på plattformar som macOS därsem_getvalue()
inte är implementerad.
- empty()¶
Returnerar
True
om kön är tom, annarsFalse
. På grund av semantiken för multithreading/multiprocessing är detta inte tillförlitligt.Kan ge upphov till ett
OSError
på stängda köer. (inte garanterat)
- full()¶
Returnerar
True
om kön är full, annarsFalse
. På grund av semantiken för multithreading/multiprocessing är detta inte tillförlitligt.
- put(obj[, block[, timeout]])¶
Placerar obj i kön. Om det valfria argumentet block är
True
(standard) och timeout ärNone
(standard), blockeras vid behov tills en ledig plats finns tillgänglig. Om timeout är ett positivt tal blockerar den i högst timeout sekunder och ger upphov tillqueue.Full
-undantaget om ingen ledig plats fanns tillgänglig inom den tiden. I annat fall (block ärFalse
), lägg till ett objekt i kön om en ledig plats omedelbart finns tillgänglig, annars utlöses undantagetqueue.Full
(timeout ignoreras i det fallet).Ändrad i version 3.8: Om kön är stängd kommer
ValueError
att uppstå istället förAssertionError
.
- put_nowait(obj)¶
Motsvarar
put(obj, False)
.
- get([block[, timeout]])¶
Tar bort och returnerar ett objekt från kön. Om de valfria argen block är
True
(standard) och timeout ärNone
(standard), blockeras vid behov tills ett objekt är tillgängligt. Om timeout är ett positivt tal blockerar den högst timeout sekunder och ger upphov tillqueue.Empty
-undantaget om inget objekt var tillgängligt inom den tiden. I annat fall (block ärFalse
) returneras ett objekt om det finns ett omedelbart tillgängligt, annars uppstår undantagetqueue.Empty
(timeout ignoreras i det fallet).Ändrad i version 3.8: Om kön är stängd, genereras
ValueError
istället förOSError
.
- get_nowait()¶
Motsvarar
get(False)
.
multiprocessing.Queue
har några ytterligare metoder som inte finns iqueue.Queue
. Dessa metoder är vanligtvis onödiga för de flesta koder:- close()¶
Stäng kön: frigör interna resurser.
En kö får inte längre användas efter att den har stängts. Exempelvis får metoderna
get()
,put()
ochempty()
inte längre anropas.Bakgrundstråden kommer att avslutas när den har spolat all buffrad data till pipen. Detta anropas automatiskt när kön samlas in (garbage collected).
- join_thread()¶
Gå med i bakgrundstråden. Detta kan endast användas efter att
close()
har anropats. Den blockerar tills bakgrundstråden avslutas och säkerställer att all data i bufferten har spolats till röret.Om en process inte är skaparen av kön kommer den som standard att försöka ansluta sig till köns bakgrundstråd när den avslutas. Processen kan anropa
cancel_join_thread()
för att fåjoin_thread()
att inte göra någonting.
- cancel_join_thread()¶
Förhindrar
join_thread()
från att blockera. I synnerhet förhindrar detta att bakgrundstråden ansluts automatiskt när processen avslutas – sejoin_thread()
.Ett bättre namn på den här metoden skulle kunna vara
allow_exit_without_flush()
. Det är troligt att köade data går förlorade och du kommer nästan aldrig att behöva använda den. Den finns egentligen bara där om du vill att den aktuella processen ska avslutas omedelbart utan att vänta på att spola köade data till den underliggande pipen, och du inte bryr dig om förlorade data.
Anteckning
Funktionaliteten i denna klass kräver en fungerande implementation av delade semaforer i värdoperativsystemet. Utan en sådan kommer funktionaliteten i denna klass att inaktiveras, och försök att instansiera en
Queue
kommer att resultera i ettImportError
. Se bpo-3770 för ytterligare information. Detsamma gäller för någon av de specialiserade kötyper som listas nedan.
- class multiprocessing.SimpleQueue¶
Det är en förenklad
Queue
-typ, mycket nära en låstPipe
.- close()¶
Stäng kön: frigör interna resurser.
En kö får inte längre användas efter att den har stängts. Till exempel får metoderna
get()
,put()
ochempty()
inte längre anropas.Tillagd i version 3.9.
- empty()¶
Returnerar
True
om kön är tom, annarsFalse
.Ger alltid upphov till ett
OSError
om SimpleQueue stängs.
- get()¶
Ta bort och returnera ett objekt från kön.
- put(item)¶
Lägg objekt i kön.
- class multiprocessing.JoinableQueue([maxsize])¶
JoinableQueue
, en underklass tillQueue
, är en kö som dessutom har metodernatask_done()
ochjoin()
.- task_done()¶
Anger att en tidigare köad uppgift är slutförd. Används av köanvändare. För varje
get()
som används för att hämta en uppgift, talar ett efterföljande anrop tilltask_done()
om för kön att bearbetningen av uppgiften är klar.Om en
join()
för närvarande blockeras kommer den att återupptas när alla objekt har bearbetats (vilket innebär att etttask_done()
-anrop har mottagits för varje objekt som harput()
i kön).Utlöser ett
ValueError
om den anropas fler gånger än det finns objekt i kön.
- join()¶
Blockera tills alla objekt i kön har hämtats och behandlats.
Antalet oavslutade uppgifter ökar varje gång ett objekt läggs till i kön. Antalet sjunker när en konsument anropar
task_done()
för att ange att objektet hämtades och att allt arbete med det är slutfört. När antalet oavslutade uppgifter sjunker till noll avblockerasjoin()
.
Diverse¶
- multiprocessing.active_children()¶
Returnerar en lista över alla levande barn till den aktuella processen.
Om du anropar detta får det bieffekten att alla processer som redan har avslutats ”ansluts”.
- multiprocessing.cpu_count()¶
Returnerar antalet CPU:er i systemet.
Detta antal motsvarar inte det antal processorer som den aktuella processen kan använda. Antalet användbara processorer kan erhållas med
os.process_cpu_count()
(ellerlen(os.sched_getaffinity(0))
).När antalet CPU:er inte kan bestämmas uppstår ett
NotImplementedError
.Se även
Ändrad i version 3.13: Returvärdet kan också åsidosättas med flaggan
-X cpu_count
ellerPYTHON_CPU_COUNT
eftersom detta bara är ett omslag runt API:erna för cpu-räkning ios
.
- multiprocessing.current_process()¶
Returnerar
Process
-objektet som motsvarar den aktuella processen.En motsvarighet till
threading.current_thread()
.
- multiprocessing.parent_process()¶
Returnerar
Process
-objektet som motsvarar den överordnade processen förcurrent_process()
. För huvudprocessen kommerparent_process
att varaNone
.Tillagd i version 3.8.
- multiprocessing.freeze_support()¶
Lägg till stöd för när ett program som använder
multiprocessing
har frysts för att producera en körbar fil. (Har testats med py2exe, PyInstaller och cx_Freeze.)Man måste anropa den här funktionen direkt efter raden
if __name__ == '__main__'
i huvudmodulen. Till exempel:från multiprocessing import Process, freeze_support def f(): print('hej världen!') if __name__ == '__main__': freeze_support() Process(mål=f).start()
Om raden
freeze_support()
utelämnas kommer försök att köra den frysta körbara filen att ge upphov tillRuntimeError
.Anrop av
freeze_support()
har ingen effekt när startmetoden inte är spawn. Dessutom, om modulen körs normalt av Python-tolken (programmet har inte frysts), harfreeze_support()
ingen effekt.
- multiprocessing.get_all_start_methods()¶
Returnerar en lista över de startmetoder som stöds, varav den första är standard. De möjliga startmetoderna är
'fork'
,'spawn'
och'forkserver'
. Alla plattformar stöder inte alla metoder. Se Sammanhang och startmetoder.Tillagd i version 3.4.
- multiprocessing.get_context(method=None)¶
Returnerar ett kontextobjekt som har samma attribut som modulen
multiprocessing
.Om method är
None
returneras standardkontexten. Observera att om den globala startmetoden inte har ställts in, kommer detta att ställa in den till standardmetoden. Annars bör method vara'fork'
,'spawn'
,'forkserver'
.ValueError
uppstår om den angivna startmetoden inte är tillgänglig. Se Sammanhang och startmetoder.Tillagd i version 3.4.
- multiprocessing.get_start_method(allow_none=False)¶
Returnerar namnet på den startmetod som används för att starta processer.
Om den globala startmetoden inte har ställts in och allow_none är
False
, ställs startmetoden in på standardvärdet och namnet returneras. Om startmetoden inte har ställts in och allow_none ärTrue
returnerasNone
.Returvärdet kan vara
'fork'
,'spawn'
,'forkserver'
ellerNone
. Se Sammanhang och startmetoder.Tillagd i version 3.4.
Ändrad i version 3.8: På macOS är startmetoden spawn nu standard. Startmetoden fork bör betraktas som osäker eftersom den kan leda till att underprocessen kraschar. Se bpo-33725.
- multiprocessing.set_executable(executable)¶
Ange sökvägen till den Python-tolk som ska användas när en underordnad process startas. (Som standard används
sys.executable
). Inbäddare kommer förmodligen att behöva göra något i stil medset_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))
innan de kan skapa underordnade processer.
Ändrad i version 3.4: Stödjs nu på POSIX när startmetoden
'spawn'
används.Ändrad i version 3.11: Accepterar en path-like object.
- multiprocessing.set_forkserver_preload(module_names)¶
Ange en lista med modulnamn som huvudprocessen för forkserver ska försöka importera så att deras redan importerade tillstånd ärvs av förgrenade processer. Alla
ImportError
när detta görs ignoreras i tysthet. Detta kan användas som en prestandaförbättring för att undvika upprepat arbete i varje process.För att detta ska fungera måste det anropas innan forkserver-processen har startats (innan du skapar en
Pool
eller startar enProcess
).Endast meningsfullt när man använder startmetoden
'forkserver'
. Se Sammanhang och startmetoder.Tillagd i version 3.4.
- multiprocessing.set_start_method(method, force=False)¶
Ange vilken metod som ska användas för att starta underordnade processer. Argumentet method kan vara
'fork'
,'spawn'
eller'forkserver'
. UtlöserRuntimeError
om startmetoden redan har ställts in och force inte ärTrue
. Om method ärNone
och force ärTrue
så sätts startmetoden tillNone
. Om method ärNone
och force ärFalse
sätts kontexten till standardkontexten.Observera att detta ska anropas högst en gång och att det ska skyddas i
if __name__ == '__main__'
i huvudmodulens klausul.Se Sammanhang och startmetoder.
Tillagd i version 3.4.
Anteckning
multiprocessing
innehåller inga motsvarigheter till threading.active_count()
, threading.enumerate()
, threading.settrace()
, threading.setprofile()
, threading.Timer
, eller threading.local
.
Anslutningsobjekt¶
Connection-objekt gör det möjligt att skicka och ta emot plockbara objekt eller strängar. De kan betraktas som meddelandeorienterade anslutna socklar.
Anslutningsobjekt skapas vanligtvis med Pipe
– se även Lyssnare och kunder.
- class multiprocessing.connection.Connection¶
- send(obj)¶
Skicka ett objekt till den andra änden av anslutningen som ska läsas med hjälp av
recv()
.Objektet måste kunna picklas. Mycket stora pickles (ungefär 32 MiB+, men det beror på operativsystemet) kan ge upphov till ett
ValueError
-undantag.
- recv()¶
Returnerar ett objekt som skickats från den andra änden av anslutningen med
send()
. Blockerar tills det finns något att ta emot. UtlöserEOFError
om det inte finns något kvar att ta emot och den andra änden var stängd.
- fileno()¶
Returnerar filbeskrivaren eller handtaget som används av anslutningen.
- close()¶
Stäng anslutningen.
Detta anropas automatiskt när anslutningen garbage collectas.
- poll([timeout])¶
Returnerar om det finns någon data tillgänglig för läsning.
Om timeout inte anges kommer den att returnera omedelbart. Om timeout är ett tal anger detta den maximala tiden i sekunder för blockering. Om timeout är
None
används en oändlig timeout.Observera att flera anslutningsobjekt kan pollas samtidigt genom att använda
multiprocessing.connection.wait()
.
- send_bytes(buffer[, offset[, size]])¶
Skicka byte-data från ett bytesliknande objekt som ett komplett meddelande.
Om offset anges läses data från den positionen i buffer. Om size anges kommer så många byte att läsas från bufferten. Mycket stora buffertar (ungefär 32 MiB+, men det beror på operativsystemet) kan ge upphov till ett
ValueError
-undantag
- recv_bytes([maxlength])¶
Returnerar ett komplett meddelande med byte-data som skickats från den andra änden av anslutningen som en sträng. Blockerar tills det finns något att ta emot. Utlöser
EOFError
om det inte finns något kvar att ta emot och den andra änden har stängt.Om maxlength anges och meddelandet är längre än maxlength så uppstår
OSError
och anslutningen kommer inte längre att vara läsbar.
- recv_bytes_into(buffer[, offset])¶
Läser in i buffer ett komplett meddelande med byte-data som skickas från den andra änden av anslutningen och returnerar antalet byte i meddelandet. Blockerar tills det finns något att ta emot. Utlöser
EOFError
om det inte finns något kvar att ta emot och den andra änden var stängd.buffer måste vara ett skrivbart bytesliknande objekt. Om offset anges kommer meddelandet att skrivas in i bufferten från den positionen. Offset måste vara ett icke-negativt heltal som är mindre än längden på buffer (i byte).
Om bufferten är för kort uppstår ett
BufferTooShort
undantag och det fullständiga meddelandet finns tillgängligt some.args[0]
däre
är undantagsinstansen.
Ändrad i version 3.3: Själva anslutningsobjekten kan nu överföras mellan processer med hjälp av
Connection.send()
ochConnection.recv()
.Anslutningsobjekt stöder nu också kontexthanteringsprotokollet – se Typer av kontexthanterare.
__enter__()
returnerar anslutningsobjektet och__exit__()
anroparclose()
.
Till exempel:
>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])
Varning
Metoden Connection.recv()
avplockar automatiskt de data den tar emot, vilket kan vara en säkerhetsrisk om du inte kan lita på den process som skickade meddelandet.
Om inte anslutningsobjektet producerades med Pipe()
bör du därför endast använda metoderna recv()
och send()
efter att ha utfört någon form av autentisering. Se Autentiseringsnycklar.
Varning
Om en process dödas medan den försöker läsa eller skriva till en pipe kommer data i pipen sannolikt att korrumperas, eftersom det kan bli omöjligt att vara säker på var meddelandegränserna ligger.
Synkroniseringsprimitiver¶
Generellt sett är synkroniseringsprimitiver inte lika nödvändiga i ett multiprocessprogram som i ett flertrådat program. Se dokumentationen för modulen threading
.
Observera att man också kan skapa synkroniseringsprimitiver genom att använda ett manager-objekt – se Ansvariga.
- class multiprocessing.Barrier(parties[, action[, timeout]])¶
Ett barriärobjekt: en klon av
threading.Barrier
.Tillagd i version 3.3.
- class multiprocessing.BoundedSemaphore([value])¶
Ett begränsat semaforobjekt: en nära analog till
threading.BoundedSemaphore
.En enda skillnad från dess nära analog finns: dess
acquire
-metods första argument heter block, vilket är i överensstämmelse medLock.acquire()
.Anteckning
På macOS kan detta inte skiljas från
Semaphore
eftersomsem_getvalue()
inte är implementerat på den plattformen.
- class multiprocessing.Condition([lock])¶
En villkorsvariabel: ett alias för
threading.Condition
.Om lock anges bör det vara ett
Lock
- ellerRLock
-objekt frånmultiprocessing
.Ändrad i version 3.3: Metoden
wait_for()
lades till.
- class multiprocessing.Event¶
En klon av
threading.Event
.
- class multiprocessing.Lock¶
Ett icke-rekursivt låsobjekt: en nära motsvarighet till
threading.Lock
. När en process eller tråd har förvärvat ett lås kommer efterföljande försök att förvärva det från någon annan process eller tråd att blockeras tills det släpps; vilken process eller tråd som helst kan släppa det. Koncepten och beteendena ithreading.Lock
som gäller för trådar är replikerade här imultiprocessing.Lock
som gäller för antingen processer eller trådar, förutom vad som anges.Observera att
Lock
faktiskt är en fabriksfunktion som returnerar en instans avmultiprocessing.synchronize.Lock
initialiserad med en standardkontext.Lock
stöder protokollet context manager och kan därför användas iwith
-satser.- acquire(block=True, timeout=None)¶
Förvärva ett lås, blockerande eller icke-blockerande.
Med block-argumentet inställt på
True
(standard) kommer metodanropet att blockera tills låset är i ett olåst tillstånd, sedan ställa in det på låst och returneraTrue
. Observera att namnet på detta första argument skiljer sig från det ithreading.Lock.acquire()
.Om argumentet block är satt till
False
blockeras inte metodanropet. Om låset för närvarande är i låst tillstånd returnerasFalse
; annars sätts låset i låst tillstånd och returnerasTrue
.När timeout anropas med ett positivt värde i flyttal, blockeras låset i högst det antal sekunder som anges av timeout så länge låset inte kan förvärvas. Anrop med ett negativt värde för timeout är likvärdigt med en timeout på noll. Anrop med ett timeout-värde på
None
(standard) sätter timeout-perioden till oändlig. Observera att behandlingen av negativa ellerNone
-värden för timeout skiljer sig från det implementerade beteendet ithreading.Lock.acquire()
. Argumentet timeout har inga praktiska konsekvenser om argumentet block är satt tillFalse
och ignoreras därför. ReturnerarTrue
om låset har förvärvats ellerFalse
om timeout-perioden har löpt ut.
- release()¶
Frigör ett lås. Detta kan anropas från vilken process eller tråd som helst, inte bara från den process eller tråd som ursprungligen förvärvade låset.
Beteendet är detsamma som i
threading.Lock.release()
förutom att ettValueError
uppstår när det anropas på ett olåst lås.
- locked()¶
Returnerar en boolean som anger om detta objekt är låst just nu.
Tillagd i version 3.14.
- class multiprocessing.RLock¶
Ett rekursivt låsobjekt: en nära motsvarighet till
threading.RLock
. Ett rekursivt lås måste frigöras av den process eller tråd som förvärvade det. När en process eller tråd har förvärvat ett rekursivt lås kan samma process eller tråd förvärva det igen utan att blockera; den processen eller tråden måste släppa det en gång för varje gång det har förvärvats.Observera att
RLock
faktiskt är en fabriksfunktion som returnerar en instans avmultiprocessing.synchronize.RLock
initialiserad med en standardkontext.RLock
stöder protokollet context manager och kan därför användas iwith
-satser.- acquire(block=True, timeout=None)¶
Förvärva ett lås, blockerande eller icke-blockerande.
När den anropas med argumentet block inställt på
True
blockeras den tills låset är olåst (inte ägs av någon process eller tråd) om inte låset redan ägs av den aktuella processen eller tråden. Den aktuella processen eller tråden tar då över ägandet av låset (om den inte redan har ägandet) och rekursionsnivån inuti låset ökar med ett, vilket resulterar i ett returvärde påTrue
. Observera att det finns flera skillnader i detta första arguments beteende jämfört med implementeringen avthreading.RLock.acquire()
, som börjar med namnet på själva argumentet.När den anropas med argumentet block satt till
False
, blockeras inte. Om låset redan har förvärvats (och därmed ägs) av en annan process eller tråd, tar den aktuella processen eller tråden inte över ägandet och rekursionsnivån i låset ändras inte, vilket resulterar i ett returvärde påFalse
. Om låset är olåst tar den aktuella processen eller tråden över ägandet och rekursionsnivån ökas, vilket resulterar i ett returvärde påTrue
.Användning och beteenden för argumentet timeout är desamma som i
Lock.acquire()
. Observera att vissa av dessa beteenden hos timeout skiljer sig från de implementerade beteendena ithreading.RLock.acquire()
.
- release()¶
Frigör ett lås och dekrementera rekursionsnivån. Om rekursionsnivån är noll efter dekrementeringen, återställ låset till olåst (ägs inte av någon process eller tråd) och om några andra processer eller trådar är blockerade i väntan på att låset ska bli olåst, låt exakt en av dem fortsätta. Om rekursionsnivån efter dekrementeringen fortfarande inte är noll, förblir låset låst och ägs av den anropande processen eller tråden.
Anropa endast denna metod när den anropande processen eller tråden äger låset. Ett
AssertionError
uppstår om denna metod anropas av en annan process eller tråd än ägaren eller om låset är i ett olåst (oägt) tillstånd. Observera att den typ av undantag som uppstår i denna situation skiljer sig från det implementerade beteendet ithreading.RLock.release()
.
- locked()¶
Returnerar en boolean som anger om detta objekt är låst just nu.
Tillagd i version 3.14.
- class multiprocessing.Semaphore([value])¶
Ett semaforobjekt: en nära motsvarighet till
threading.Semaphore
.En enda skillnad från dess nära analog finns: dess
acquire
-metods första argument heter block, vilket är i överensstämmelse medLock.acquire()
.
Anteckning
På macOS stöds inte sem_timedwait
, så om du anropar acquire()
med en timeout kommer den funktionens beteende att emuleras med en sovande loop.
Anteckning
En del av funktionaliteten i detta paket kräver en fungerande implementation av delade semaforer i värdoperativsystemet. Utan en sådan kommer modulen multiprocessing.synchronize
att inaktiveras och försök att importera den kommer att resultera i ett ImportError
. Se bpo-3770 för ytterligare information.
Ansvariga¶
Managers är ett sätt att skapa data som kan delas mellan olika processer, inklusive delning över ett nätverk mellan processer som körs på olika maskiner. Ett manager-objekt kontrollerar en serverprocess som hanterar delade objekt. Andra processer kan komma åt de delade objekten genom att använda proxies.
- multiprocessing.Manager()¶
Returnerar ett startat
SyncManager
-objekt som kan användas för att dela objekt mellan processer. Det returnerade manager-objektet motsvarar en skapad underordnad process och har metoder som skapar delade objekt och returnerar motsvarande proxyer.
Managerprocesser kommer att stängas av så snart som de är garbage collected eller deras överordnade process avslutas. Managerklasserna är definierade i modulen multiprocessing.managers
:
- class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)¶
Skapa ett BaseManager-objekt.
När den har skapats bör man anropa
start()
ellerget_server().serve_forever()
för att säkerställa att managerobjektet refererar till en startad managerprocess.address är den adress på vilken manager-processen lyssnar efter nya anslutningar. Om address är
None
så väljs en godtycklig adress.authkey är autentiseringsnyckeln som kommer att användas för att kontrollera giltigheten av inkommande anslutningar till serverprocessen. Om authkey är
None
användscurrent_process().authkey
. Annars används authkey och den måste vara en byte-sträng.serializer måste vara
'pickle'
(användpickle
serialisering) eller'xmlrpclib'
(användxmlrpc.client
serialisering).ctx är ett kontextobjekt, eller
None
(använd aktuell kontext). Se funktionenget_context()
.shutdown_timeout är en timeout i sekunder som används för att vänta tills den process som används av förvaltaren avslutas i metoden
shutdown()
. Om tiden för nedstängning går ut avslutas processen. Om det också tar tid att avsluta processen dödas processen.Ändrad i version 3.11: Parametern shutdown_timeout har lagts till.
- start([initializer[, initargs]])¶
Starta en underprocess för att starta hanteraren. Om initializer inte är
None
kommer underprocessen att anropainitializer(*initargs)
när den startar.
- get_server()¶
Returnerar ett
Server
-objekt som representerar den faktiska servern under Managerns kontroll. ObjektetServer
har stöd för metodenserve_forever()
:>>> from multiprocessing.managers import BaseManager >>> manager = BaseManager(address=('', 50000), authkey=b'abc') >>> server = manager.get_server() >>> server.serve_forever()
Server
har dessutom ett attributaddress
.
- connect()¶
Anslut ett lokalt managerobjekt till en fjärrmanagerprocess:
>>> from multiprocessing.managers import BaseManager >>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc') >>> m.connect()
- shutdown()¶
Stoppar den process som används av hanteraren. Detta är endast tillgängligt om
start()
har använts för att starta serverprocessen.Detta kan anropas flera gånger.
- register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])¶
En klassmetod som kan användas för att registrera en typ eller en anropsbar kod hos managerklassen.
typeid är en ”typidentifierare” som används för att identifiera en viss typ av delat objekt. Detta måste vara en sträng.
callable är en callable som används för att skapa objekt för denna typidentifierare. Om en manager-instans kommer att anslutas till servern med
connect()
-metoden, eller om create_method-argumentet ärFalse
, kan detta lämnas somNone
.proxytype är en underklass till
BaseProxy
som används för att skapa proxy för delade objekt med detta typeid. OmNone
så skapas en proxyklass automatiskt.exposed används för att ange en sekvens av metodnamn som proxyer för detta typeid ska få tillgång till med hjälp av
BaseProxy._callmethod()
. (Om exposed ärNone
används iställetproxytype._exposed_
om den finns) Om ingen exponerad lista anges kommer alla ”offentliga metoder” i det delade objektet att vara tillgängliga. (Här betyder en ”publik metod” alla attribut som har en__call__()
-metod och vars namn inte börjar med'_'
)method_to_typeid är en mappning som används för att ange returtypen för de exponerade metoder som ska returnera en proxy. Den mappar metodnamn till typid-strängar. (Om method_to_typeid är
None
används iställetproxytype._method_to_typeid_
om den finns) Om en metods namn inte är en nyckel i denna mappning eller om mappningen ärNone
kommer det objekt som returneras av metoden att kopieras med värde.create_method avgör om en metod ska skapas med namnet typeid som kan användas för att tala om för serverprocessen att den ska skapa ett nytt delat objekt och returnera en proxy för det. Som standard är det
True
.
BaseManager
-instanser har också en skrivskyddad egenskap:- address¶
Den adress som används av chefen.
Ändrad i version 3.3: Manager-objekt stöder kontexthanteringsprotokollet – se Typer av kontexthanterare.
__enter__()
startar serverprocessen (om den inte redan har startat) och returnerar sedan manager-objektet.__exit__()
anroparshutdown()
.I tidigare versioner
__enter__()
startade inte managerns serverprocess om den inte redan var startad.
- class multiprocessing.managers.SyncManager¶
En underklass till
BaseManager
som kan användas för synkronisering av processer. Objekt av denna typ returneras avmultiprocessing.Manager()
.Dess metoder skapar och returnerar Proxy-objekt för ett antal vanligt förekommande datatyper som ska synkroniseras mellan processer. Detta inkluderar särskilt delade listor och lexikon.
- Barrier(parties[, action[, timeout]])¶
Skapa ett delat
threading.Barrier
-objekt och returnera en proxy för det.Tillagd i version 3.3.
- BoundedSemaphore([value])¶
Skapa ett delat
threading.BoundedSemaphore
-objekt och returnera en proxy för det.
- Condition([lock])¶
Skapa ett delat
threading.Condition
-objekt och returnera en proxy för det.Om lock anges bör det vara en proxy för ett
threading.Lock
- ellerthreading.RLock
-objekt.Ändrad i version 3.3: Metoden
wait_for()
lades till.
- Event()¶
Skapa ett delat
threading.Event
-objekt och returnera en proxy för det.
- Lock()¶
Skapa ett delat
threading.Lock
-objekt och returnera en proxy för det.
- Queue([maxsize])¶
Skapa ett delat
queue.Queue
-objekt och returnera en proxy för det.
- RLock()¶
Skapa ett delat
threading.RLock
-objekt och returnera en proxy för det.
- Semaphore([value])¶
Skapa ett delat
threading.Semaphore
-objekt och returnera en proxy för det.
- Array(typecode, sequence)¶
Skapa en array och returnera en proxy för den.
- Value(typecode, value)¶
Skapa ett objekt med ett skrivbart
value
-attribut och returnera en proxy för det.
- set()¶
- set(sequence)
- set(mapping)
Skapa ett delat
set
-objekt och returnera en proxy för det.Tillagd i version 3.14: stöd för
set
har lagts till.
Ändrad i version 3.6: Delade objekt kan vara nästlade. Ett delat containerobjekt, t.ex. en delad lista, kan innehålla andra delade objekt som alla hanteras och synkroniseras av
SyncManager
.
- class multiprocessing.managers.Namespace¶
En typ som kan registreras med
SyncManager
.Ett namespace-objekt har inga publika metoder, men har skrivbara attribut. Dess representation visar värdena för dess attribut.
När man använder en proxy för ett namnrymdsobjekt kommer dock ett attribut som börjar med
'_'
att vara ett attribut för proxyn och inte ett attribut för referenten:>>> mp_context = multiprocessing.get_context('spawn') >>> manager = mp_context.Manager() >>> Global = manager.Namespace() >>> Global.x = 10 >>> Global.y = 'hello' >>> Global._z = 12.3 # this is an attribute of the proxy >>> print(Global) Namespace(x=10, y='hello')
Skräddarsydda chefer¶
För att skapa en egen manager skapar man en subklass av BaseManager
och använder klassmetoden register()
för att registrera nya typer eller anropbara objekt i managerklassen. Till exempel:
från multiprocessing.managers import BaseManager
klass MathsClass:
def add(self, x, y):
returnerar x + y
def mul(self, x, y):
returnerar x * y
klass MyManager(BaseManager):
pass
MyManager.register('Maths', MathsClass)
if __name__ == '__main__':
med MyManager() som manager:
maths = manager.Maths()
print(maths.add(4, 3)) # skriver ut 7
print(maths.mul(7, 8)) # skriver ut 56
Använda en fjärrstyrd manager¶
Det är möjligt att köra en manager-server på en maskin och låta klienter använda den från andra maskiner (förutsatt att de berörda brandväggarna tillåter det).
Om du kör följande kommandon skapas en server för en enda delad kö som fjärrklienter kan komma åt:
>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
En klient kan komma åt servern på följande sätt:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')
En annan kund kan också använda den:
>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'
Lokala processer kan också komma åt den kön genom att använda koden från ovan på klienten för att komma åt den på distans:
>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
... def __init__(self, q):
... self.q = q
... super().__init__()
... def run(self):
... self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()
Proxy-objekt¶
En proxy är ett objekt som refererar till ett delat objekt som lever (förmodligen) i en annan process. Det delade objektet sägs vara proxyns referent. Flera proxyobjekt kan ha samma referent.
Ett proxyobjekt har metoder som anropar motsvarande metoder hos sin referent (även om inte alla metoder hos referenten nödvändigtvis är tillgängliga via proxyn). På så sätt kan en proxy användas precis som dess referent kan:
>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]
Observera att om du använder str()
på en proxy kommer representationen av referenten att returneras, medan om du använder repr()
kommer representationen av proxyn att returneras.
En viktig egenskap hos proxyobjekt är att de kan plockas upp så att de kan skickas mellan processer. Som sådan kan en referent innehålla Proxy-objekt. Detta tillåter nestning av dessa hanterade listor, dicts och andra Proxy-objekt:
>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b) # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']
På samma sätt kan dict- och list-proxies vara inbäddade i varandra:
>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}
Om standard (icke-proxy) list
- eller dict
-objekt finns i en referent, kommer ändringar av dessa föränderliga värden inte att spridas genom hanteraren eftersom proxyn inte har något sätt att veta när de värden som finns i den ändras. Att lagra ett värde i en containerproxy (vilket utlöser en __setitem__
på proxyobjektet) sprids dock genom hanteraren och för att effektivt modifiera ett sådant objekt kan man därför tilldela det modifierade värdet till containerproxyn igen:
# skapa en listproxy och lägg till ett föränderligt objekt (en ordbok)
lproxy = manager.list()
lproxy.append({})
# mutera nu ordboken
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# Vid den här tidpunkten är ändringarna i d ännu inte synkroniserade, men genom att
# uppdatering av ordlistan meddelas proxyn om ändringen
lproxy[0] = d
Detta tillvägagångssätt är kanske mindre bekvämt än att använda nästlade Proxy-objekt för de flesta användningsfall, men det ger också en viss kontroll över synkroniseringen.
Anteckning
Proxytyperna i multiprocessing
har inget stöd för värdejämförelser. Så, till exempel, vi har:
>>> manager.list([1,2,3]) == [1,2,3]
False
Man bör bara använda en kopia av referenten istället när man gör jämförelser.
- class multiprocessing.managers.BaseProxy¶
Proxyobjekt är instanser av underklasser till
BaseProxy
.- _callmethod(methodname[, args[, kwds]])¶
Anropa och returnera resultatet av en metod i proxyns referent.
Om
proxy
är en proxy vars referent ärobj
så är uttrycketproxy._callmethod(metodnamn, args, kwds)
kommer att utvärdera uttrycket
getattr(obj, metodnamn)(*args, **kwds)
i chefens process.
Det returnerade värdet är en kopia av resultatet av anropet eller en proxy till ett nytt delat objekt - se dokumentation för argumentet method_to_typeid i
BaseManager.register()
.Om ett undantag uppstår vid anropet, uppstår det på nytt genom
_callmethod()
. Om något annat undantag uppstår i chefens process så konverteras detta till ettRemoteError
undantag och uppstår genom_callmethod()
.Observera särskilt att ett undantag kommer att uppstå om methodname inte har exposed.
Ett exempel på användning av
_callmethod()
:>>> l = manager.list(range(10)) >>> l._callmethod('__len__') 10 >>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7] [2, 3, 4, 5, 6] >>> l._callmethod('__getitem__', (20,)) # equivalent to l[20] Traceback (most recent call last): ... IndexError: list index out of range
- _getvalue()¶
Returnera en kopia av referenten.
Om referensen inte är betningsbar kommer detta att leda till ett undantag.
- __repr__()¶
Returnerar en representation av proxyobjektet.
- __str__()¶
Returnera representationen av referenten.
Rensa upp¶
Ett proxyobjekt använder ett weakref-callback så att det avregistrerar sig från den manager som äger dess referent när det samlas in.
Ett delat objekt tas bort från managerprocessen när det inte längre finns några proxyer som hänvisar till det.
Processpooler¶
Med klassen Pool
kan man skapa en pool av processer som utför uppgifter som skickas till den.
- class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])¶
Ett processpoolobjekt som kontrollerar en pool av arbetsprocesser till vilka jobb kan skickas. Det stöder asynkrona resultat med timeouts och callbacks och har en parallell map-implementering.
processes är antalet arbetsprocesser som ska användas. Om processes är
None
används det antal som returneras avos.process_cpu_count()
.Om initializer inte är
None
kommer varje arbetsprocess att anropainitializer(*initargs)
när den startar.maxtasksperchild är antalet uppgifter som en arbetsprocess kan slutföra innan den avslutas och ersätts med en ny arbetsprocess, så att oanvända resurser kan frigöras. Standardvärdet för maxtasksperchild är
None
, vilket innebär att arbetsprocesser kommer att leva lika länge som poolen.context kan användas för att ange den kontext som används för att starta arbetsprocesserna. Vanligtvis skapas en pool med hjälp av funktionen
multiprocessing.Pool()
eller metodenPool()
i ett context-objekt. I båda fallen anges context på lämpligt sätt.Observera att metoderna i poolobjektet endast ska anropas av den process som skapade poolen.
Varning
multiprocessing.pool
-objekt har interna resurser som måste hanteras korrekt (som alla andra resurser) genom att använda poolen som en kontexthanterare eller genom att anropaclose()
ochterminate()
manuellt. Om detta inte görs kan det leda till att processen hänger sig när den ska slutföras.Observera att det är inte korrekt att förlita sig på att skräpsamlaren förstör poolen eftersom CPython inte garanterar att poolens finalizer kommer att anropas (se
object.__del__()
för mer information).Ändrad i version 3.2: Parametern maxtasksperchild har lagts till.
Ändrad i version 3.4: Parametern context har lagts till.
Ändrad i version 3.13: processes använder
os.process_cpu_count()
som standard, istället föros.cpu_count()
.Anteckning
Arbetsprocesser inom en
Pool
lever normalt under hela den tid som poolens arbetskö varar. Ett vanligt mönster i andra system (t.ex. Apache, mod_wsgi, etc.) för att frigöra resurser som hålls av arbetare är att låta en arbetare i en pool endast slutföra en viss mängd arbete innan den avslutas, rensas upp och en ny process skapas för att ersätta den gamla. Argumentet maxtasksperchild tillPool
gör denna möjlighet tillgänglig för slutanvändaren.- apply(func[, args[, kwds]])¶
Anropa func med argumenten args och nyckelordsargumenten kwds. Den blockerar tills resultatet är klart. Med tanke på dessa blockeringar är
apply_async()
bättre lämpad för att utföra arbete parallellt. Dessutom utförs func endast i en av poolens arbetare.
- apply_async(func[, args[, kwds[, callback[, error_callback]]]])¶
En variant av metoden
apply()
som returnerar ettAsyncResult
-objekt.Om callback specificeras ska det vara en callable som accepterar ett enda argument. När resultatet är klart används callback på det, såvida inte anropet misslyckades, i vilket fall error_callback används i stället.
Om error_callback anges bör det vara en callable som accepterar ett enda argument. Om målfunktionen misslyckas anropas error_callback med undantagsinstansen.
Återkallelser bör slutföras omedelbart eftersom den tråd som hanterar resultaten annars blockeras.
- map(func, iterable[, chunksize])¶
En parallell motsvarighet till den inbyggda funktionen
map()
(den stöder dock bara ett iterable-argument, för flera iterables sestarmap()
). Den blockerar tills resultatet är klart.Den här metoden delar upp iterabeln i ett antal bitar som den skickar till processpoolen som separata uppgifter. Den (ungefärliga) storleken på dessa bitar kan anges genom att chunksize sätts till ett positivt heltal.
Observera att det kan orsaka hög minnesanvändning för mycket långa iterabler. Överväg att använda
imap()
ellerimap_unordered()
med explicit chunksize-alternativ för bättre effektivitet.
- map_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
En variant av metoden
map()
som returnerar ettAsyncResult
-objekt.Om callback specificeras ska det vara en callable som accepterar ett enda argument. När resultatet är klart används callback på det, såvida inte anropet misslyckades, i vilket fall error_callback används i stället.
Om error_callback anges bör det vara en callable som accepterar ett enda argument. Om målfunktionen misslyckas anropas error_callback med undantagsinstansen.
Återkallelser bör slutföras omedelbart eftersom den tråd som hanterar resultaten annars blockeras.
- imap(func, iterable[, chunksize])¶
En latare version av
map()
.Argumentet chunksize är detsamma som det som används av metoden
map()
. För mycket långa iterabler kan ett stort värde för chunksize göra att jobbet slutförs mycket snabbare än om standardvärdet1
används.Även om chunksize är
1
så harnext()
-metoden för iteratorn som returneras avimap()
-metoden en valfri timeout-parameter:next(timeout)
kommer att ge upphov tillmultiprocessing.TimeoutError
om resultatet inte kan returneras inom timeout sekunder.
- imap_unordered(func, iterable[, chunksize])¶
Samma som
imap()
förutom att ordningsföljden på resultaten från den returnerade iteratorn bör betraktas som godtycklig. (Endast när det bara finns en arbetsprocess garanteras att ordningen är ”korrekt”)
- starmap(func, iterable[, chunksize])¶
Som
map()
förutom att elementen i iterable förväntas vara iterables som packas upp som argument.Därför resulterar en iterabel av
[(1,2), (3,4)]
i[func(1,2), func(3,4)]
.Tillagd i version 3.3.
- starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])¶
En kombination av
starmap()
ochmap_async()
som itererar över iterable av iterabler och anropar func med iterablerna uppackade. Returnerar ett resultatobjekt.Tillagd i version 3.3.
- close()¶
Förhindrar att fler uppgifter skickas till poolen. När alla uppgifter har slutförts kommer arbetsprocesserna att avslutas.
- terminate()¶
Stoppar arbetsprocesserna omedelbart utan att slutföra utestående arbete. När poolobjektet skräpsamlas kommer
terminate()
att anropas omedelbart.
- join()¶
Vänta på att arbetsprocesserna ska avslutas. Man måste anropa
close()
ellerterminate()
innan man använderjoin()
.
Ändrad i version 3.3: Poolobjekt stöder nu kontexthanteringsprotokollet – se Typer av kontexthanterare.
__enter__()
returnerar poolobjektet, och__exit__()
anroparterminate()
.
- class multiprocessing.pool.AsyncResult¶
Klassen för det resultat som returneras av
Pool.apply_async()
ochPool.map_async()
.- get([timeout])¶
Returnerar resultatet när det anländer. Om timeout inte är
None
och resultatet inte anländer inom timeout sekunder så kommermultiprocessing.TimeoutError
att uppstå. Om fjärranropet gav upphov till ett undantag kommer detta undantag att återskapas avget()
.
- wait([timeout])¶
Vänta tills resultatet är tillgängligt eller tills timeout sekunder har passerat.
- ready()¶
Returnerar om samtalet har slutförts.
- successful()¶
Returnerar om anropet slutfördes utan att ett undantag uppstod. Kommer att ge
ValueError
om resultatet inte är klart.Ändrad i version 3.7: Om resultatet inte är klart kommer
ValueError
att visas istället förAssertionError
.
Följande exempel visar hur en pool används:
from multiprocessing import Pool
import time
def f(x):
return x*x
if __name__ == '__main__':
with Pool(processes=4) as pool: # start 4 worker processes
result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
print(result.get(timeout=1)) # prints "100" unless your computer is *very* slow
print(pool.map(f, range(10))) # prints "[0, 1, 4,..., 81]"
it = pool.imap(f, range(10))
print(next(it)) # prints "0"
print(next(it)) # prints "1"
print(it.next(timeout=1)) # prints "4" unless your computer is *very* slow
result = pool.apply_async(time.sleep, (10,))
print(result.get(timeout=1)) # raises multiprocessing.TimeoutError
Lyssnare och kunder¶
Vanligtvis sker meddelandeöverföring mellan processer med hjälp av köer eller genom att använda Connection
-objekt som returneras av Pipe()
.
Modulen multiprocessing.connection
ger dock lite extra flexibilitet. Den ger i princip ett meddelandeorienterat API på hög nivå för hantering av sockets eller Windows named pipes. Den har också stöd för digest authentication med modulen hmac
, och för polling av flera anslutningar samtidigt.
- multiprocessing.connection.deliver_challenge(connection, authkey)¶
Skicka ett slumpmässigt genererat meddelande till den andra änden av anslutningen och vänta på svar.
Om svaret matchar sammanfattningen av meddelandet med authkey som nyckel skickas ett välkomstmeddelande till den andra änden av anslutningen. I annat fall genereras
AuthenticationError
.
- multiprocessing.connection.answer_challenge(connection, authkey)¶
Ta emot ett meddelande, beräkna sammanfattningen av meddelandet med authkey som nyckel och skicka sedan tillbaka sammanfattningen.
Om ett välkomstmeddelande inte tas emot kommer
AuthenticationError
att uppstå.
- multiprocessing.connection.Client(address[, family[, authkey]])¶
Försök att upprätta en anslutning till lyssnaren som använder adressen address, returnerar en
Connection
.Typen av anslutning bestäms av family-argumentet, men detta kan i allmänhet utelämnas eftersom det oftast kan härledas från formatet på address. (Se Adressformat)
Om authkey anges och inte
None
, bör den vara en bytesträng och kommer att användas som hemlig nyckel för en HMAC-baserad autentiseringsutmaning. Ingen autentisering görs om authkey ärNone
.AuthenticationError
genereras om autentiseringen misslyckas. Se Autentiseringsnycklar.
- class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])¶
Ett omslag för en bound socket eller Windows named pipe som ”lyssnar” på anslutningar.
address är den adress som ska användas av lyssnarobjektets bundna socket eller namngivna pipe.
Anteckning
Om en adress på ”0.0.0.0” används kommer adressen inte att vara en anslutningsbar slutpunkt i Windows. Om du behöver en anslutningsbar slutpunkt bör du använda ”127.0.0.1”.
family är den typ av socket (eller named pipe) som ska användas. Detta kan vara en av strängarna
'AF_INET'
(för ett TCP-socket),'AF_UNIX'
(för ett Unix-domän-socket) eller'AF_PIPE'
(för en Windows named pipe). Av dessa är det bara den första som garanterat är tillgänglig. Om family ärNone
så härleds familjen från formatet på address. Om address också ärNone
väljs ett standardvärde. Denna standard är den familj som antas vara den snabbaste tillgängliga. Se Adressformat. Observera att om family är'AF_UNIX'
och adress ärNone
så kommer uttaget att skapas i en privat temporär katalog som skapas medtempfile.mkstemp()
.Om lyssnarobjektet använder ett uttag skickas backlog (1 som standard) till
listen()
-metoden för uttaget när det har bundits.Om authkey anges och inte
None
, bör den vara en bytesträng och kommer att användas som hemlig nyckel för en HMAC-baserad autentiseringsutmaning. Ingen autentisering görs om authkey ärNone
.AuthenticationError
genereras om autentiseringen misslyckas. Se Autentiseringsnycklar.- accept()¶
Acceptera en anslutning på lyssnarobjektets bundna socket eller namngivna pipe och returnera ett
Connection
-objekt. Om autentiseringsförsök misslyckas genererasAuthenticationError
.
- close()¶
Stäng lyssnarobjektets bundna socket eller namngivna pipe. Detta anropas automatiskt när lyssnaren garbage collectas. Det är dock tillrådligt att anropa det explicit.
Listener-objekt har följande skrivskyddade egenskaper:
- address¶
Den adress som används av Listener-objektet.
- last_accepted¶
Den adress som den senast accepterade anslutningen kom från. Om den inte är tillgänglig är den
None
.
Ändrad i version 3.3: Lyssnarobjekt stöder nu kontexthanteringsprotokollet – se Typer av kontexthanterare.
__enter__()
returnerar lyssnarobjektet, och__exit__()
anroparclose()
.
- multiprocessing.connection.wait(object_list, timeout=None)¶
Vänta tills ett objekt i object_list är klart. Returnerar listan över de objekt i object_list som är klara. Om timeout är en float blockeras anropet i högst så många sekunder. Om timeout är
None
blockeras anropet under obegränsad tid. En negativ timeout är likvärdig med en timeout på noll.För både POSIX och Windows kan ett objekt visas i object_list om det är
ett läsbart
Connection
-objekt;ett anslutet och läsbart
socket.socket
-objekt; eller
En anslutning eller ett socketobjekt är klart när det finns data att läsa från det, eller när den andra änden har stängts.
POSIX:
wait(object_list, timeout)
nästan likvärdigselect.select(object_list, [], [], timeout)
. Skillnaden är att omselect.select()
avbryts av en signal, kan den ge upphov tillOSError
med ett felnummer påEINTR
, medanwait()
inte gör det.Windows: Ett objekt i object_list måste antingen vara ett heltalshandtag som är väntbart (enligt den definition som används i dokumentationen för Win32-funktionen
WaitForMultipleObjects()
) eller så kan det vara ett objekt med enfileno()
-metod som returnerar ett socket-handtag eller pipe-handtag. (Observera att pipe- och socket-handtag inte är väntande handtag)Tillagd i version 3.3.
Exempel
Följande serverkod skapar en lyssnare som använder 'hemligt lösenord'
som autentiseringsnyckel. Den väntar sedan på en anslutning och skickar några data till klienten:
from multiprocessing.connection import Listener
from array import array
address = ('localhost', 6000) # family is deduced to be 'AF_INET'
with Listener(address, authkey=b'secret password') as listener:
with listener.accept() as conn:
print('connection accepted from', listener.last_accepted)
conn.send([2.25, None, 'junk', float])
conn.send_bytes(b'hello')
conn.send_bytes(array('i', [42, 1729]))
Följande kod ansluter till servern och tar emot vissa data från servern:
from multiprocessing.connection import Client
from array import array
address = ('localhost', 6000)
with Client(address, authkey=b'secret password') as conn:
print(conn.recv()) # => [2.25, None, 'junk', float]
print(conn.recv_bytes()) # => 'hello'
arr = array('i', [0, 0, 0, 0, 0])
print(conn.recv_bytes_into(arr)) # => 8
print(arr) # => array('i', [42, 1729, 0, 0, 0])
Följande kod använder wait()
för att vänta på meddelanden från flera processer samtidigt:
från multiprocessing import Process, Pipe, current_process
från multiprocessing.connection import wait
def foo(w):
för i i intervall(10):
w.send((i, current_process().name))
w.close()
if __name__ == '__main__':
läsare = []
för i i intervall(4):
r, w = Pipe(duplex=False)
readers.append(r)
p = Process(mål=foo, args=(w,))
p.start()
# Vi stänger den skrivbara änden av röret nu för att vara säkra på att
# p är den enda process som äger ett handtag för den. Detta
# säkerställer att när p stänger sitt handtag för den skrivbara änden
# kommer wait() omedelbart att rapportera att den läsbara änden är klar.
w.close()
medan läsare:
for r in wait(readers):
försök:
msg = r.recv()
utom EOFError:
readers.remove(r)
else:
print(msg)
Adressformat¶
En
'AF_INET'
-adress är en tupel av formen(hostname, port)
där hostname är en sträng och port är ett heltal.En
'AF_UNIX'
-adress är en sträng som representerar ett filnamn i filsystemet.En
'AF_PIPE'
-adress är en sträng av formenr'\\.\pipe\PipeName'
. Om du vill användaClient()
för att ansluta till en namngiven pipe på en fjärrdator som heter ServerName bör du använda en adress av formenr'\\ServerName\pipe\PipeName'
istället.
Observera att alla strängar som börjar med två backslash antas som standard vara en 'AF_PIPE'
-adress i stället för en 'AF_UNIX'
-adress.
Autentiseringsnycklar¶
När man använder Connection.recv
, avplockas automatiskt de data som tas emot. Tyvärr är det en säkerhetsrisk att avpickla data från en icke betrodd källa. Därför använder Listener
och Client()
modulen hmac
för att tillhandahålla digest-autentisering.
En autentiseringsnyckel är en byte-sträng som kan betraktas som ett lösenord: när en anslutning har upprättats kommer båda ändarna att kräva bevis för att den andra känner till autentiseringsnyckeln. (Att visa att båda ändarna använder samma nyckel innebär inte att nyckeln skickas över anslutningen)
Om autentisering begärs men ingen autentiseringsnyckel anges används returvärdet för current_process().authkey
(se Process
). Detta värde kommer automatiskt att ärvas av alla Process
-objekt som den aktuella processen skapar. Detta innebär att (som standard) alla processer i ett multiprocessprogram kommer att dela en enda autentiseringsnyckel som kan användas när de upprättar anslutningar sinsemellan.
Lämpliga autentiseringsnycklar kan också genereras med hjälp av os.urandom()
.
Loggning¶
Visst stöd för loggning finns tillgängligt. Observera dock att paketet logging
inte använder processdelade lås så det är möjligt (beroende på typ av hanterare) att meddelanden från olika processer blandas ihop.
- multiprocessing.get_logger()¶
Returnerar den logger som används av
multiprocessing
. Om det behövs skapas en ny.När loggern skapas första gången har den nivån
logging.NOTSET
och ingen standardhanterare. Meddelanden som skickas till denna logger kommer som standard inte att spridas till rotloggaren.Observera att i Windows ärver barnprocesser endast nivån på den överordnade processens logger - alla andra anpassningar av loggern ärvs inte.
- multiprocessing.log_to_stderr(level=None)¶
Denna funktion utför ett anrop till
get_logger()
men förutom att returnera den logger som skapats av get_logger lägger den till en hanterare som skickar utdata tillsys.stderr
med formatet'[%(levelname)s/%(processName)s] %(message)s'
. Du kan ändralevelname
för loggern genom att skicka ettlevel
-argument.
Nedan visas ett exempel på en session med loggning aktiverad:
>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0
En fullständig tabell över loggningsnivåer finns i modulen logging
.
Modulen multiprocessing.dummy
¶
multiprocessing.dummy
replikerar API:t för multiprocessing
men är inte mer än ett omslag runt modulen threading
.
I synnerhet funktionen Pool
som tillhandahålls av multiprocessing.dummy
returnerar en instans av ThreadPool
, som är en underklass till Pool
som stöder alla samma metodanrop men använder en pool av arbetartrådar i stället för arbetsprocesser.
- class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])¶
Ett trådpoolsobjekt som styr en pool av arbetstrådar som jobb kan skickas till.
ThreadPool
-instanser är helt gränssnittskompatibla medPool
-instanser och deras resurser måste också hanteras korrekt, antingen genom att använda poolen som en kontexthanterare eller genom att anropaclose()
ochterminate()
manuellt.processes är antalet arbetstrådar som ska användas. Om processes är
None
används det antal som returneras avos.process_cpu_count()
.Om initializer inte är
None
kommer varje arbetsprocess att anropainitializer(*initargs)
när den startar.Till skillnad från
Pool
kan maxtasksperchild och context inte anges.Anteckning
En
ThreadPool
delar samma gränssnitt somPool
, som är utformad kring en pool av processer och föregår introduktionen avconcurrent.futures
-modulen. Som sådan ärver den vissa operationer som inte är meningsfulla för en pool som backas upp av trådar, och den har en egen typ för att representera statusen för asynkrona jobb,AsyncResult
, som inte förstås av några andra bibliotek.Användare bör i allmänhet föredra att använda
concurrent.futures.ThreadPoolExecutor
, som har ett enklare gränssnitt som utformades kring trådar från början och som returnerarconcurrent.futures.Future
-instanser som är kompatibla med många andra bibliotek, inklusiveasyncio
.
Riktlinjer för programmering¶
Det finns vissa riktlinjer och idiom som bör följas vid användning av multiprocessing
.
Alla startmetoder¶
Följande gäller för alla startmetoder.
Undvik delat tillstånd
Så långt det är möjligt bör man försöka undvika att flytta stora mängder data mellan processer.
Det är förmodligen bäst att hålla sig till att använda köer eller pipes för kommunikation mellan processer snarare än att använda synkroniseringsprimitiven på lägre nivå.
Inläggning
Säkerställ att argumenten till metoderna för proxies är picklbara.
Trådsäkerhet för proxyer
Använd inte ett proxyobjekt från mer än en tråd om du inte skyddar det med ett lås.
(Det är aldrig något problem att olika processer använder samma proxy)
Ansluta sig till zombieprocesser
På POSIX blir en process en zombie när den är klar men inte har blivit joinad. Det bör aldrig finnas särskilt många eftersom varje gång en ny process startar (eller
active_children()
anropas) kommer alla avslutade processer som ännu inte har förenats att förenas. Även anrop av en avslutad processProcess.is_alive
kommer att ansluta processen. Trots detta är det förmodligen god praxis att uttryckligen joina alla processer som du startar.
Bättre att ärva än att plocka/ta bort
När man använder startmetoderna spawn eller forkserver måste många typer från
multiprocessing
vara picklable så att barnprocesser kan använda dem. Man bör dock i allmänhet undvika att skicka delade objekt till andra processer med hjälp av pipes eller köer. Istället bör man arrangera programmet så att en process som behöver tillgång till en delad resurs som skapats någon annanstans kan ärva den från en förfadersprocess.
Undvik att avsluta processer
Om du använder metoden
Process.terminate
för att stoppa en process kan det leda till att delade resurser (t.ex. lås, semaforer, rör och köer) som för närvarande används av processen blir trasiga eller otillgängliga för andra processer.Därför är det förmodligen bäst att bara överväga att använda
Process.terminate
på processer som aldrig använder några delade resurser.
Ansluta sig till processer som använder köer
Tänk på att en process som har lagt objekt i en kö kommer att vänta innan den avslutas tills alla buffrade objekt har matats av ”feeder”-tråden till den underliggande pipen. (Barnprocessen kan anropa
Queue.cancel_join_thread
-metoden för kön för att undvika detta beteende)Detta innebär att när du använder en kö måste du se till att alla objekt som har lagts i kön så småningom kommer att tas bort innan processen ansluts. Annars kan du inte vara säker på att processer som har lagt objekt i kön kommer att avslutas. Kom också ihåg att icke-daemoniska processer kommer att anslutas automatiskt.
Ett exempel som kommer att låsa sig är följande:
från multiprocessing import Process, Kö def f(q): q.put('X' * 1000000) if __name__ == '__main__': queue = Queue() p = Process(mål=f, args=(kö,)) p.start() p.join() # detta leder till dödläge obj = kö.get()En lösning här skulle vara att byta ut de två sista raderna (eller helt enkelt ta bort raden
p.join()
)).
Explicit skicka resurser till underordnade processer
På POSIX med startmetoden fork kan en underordnad process använda en delad resurs som skapats i en överordnad process med hjälp av en global resurs. Det är dock bättre att skicka objektet som ett argument till konstruktören för barnprocessen.
Förutom att göra koden (potentiellt) kompatibel med Windows och de andra startmetoderna säkerställer detta också att så länge barnprocessen fortfarande är vid liv kommer objektet inte att samlas in i den överordnade processen. Detta kan vara viktigt om någon resurs frigörs när objektet garbage collectas i den överordnade processen.
Så till exempel
från multiprocessing import Process, Lås def f(): ... gör något med hjälp av "lock" ... if __name__ == '__main__': lock = Lock() för i i intervall(10): Process(mål=f).start()bör skrivas om till
från multiprocessing import Process, Lås def f(l): ... gör något med hjälp av "l" ... if __name__ == '__main__': lock = Lock() för i i intervall(10): Process(mål=f, args=(lock,)).start()
Se upp så att du inte ersätter sys.stdin
med ett ”filliknande objekt”
multiprocessing
ursprungligen ovillkorligen anropad:os.close(sys.stdin.fileno())i metoden
multiprocessing.Process._bootstrap()
— detta resulterade i problem med processer-i-processer. Detta har ändrats till:sys.stdin.close() sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)Detta löser det grundläggande problemet med processer som kolliderar med varandra och resulterar i ett fel med dålig filbeskrivare, men introducerar en potentiell fara för applikationer som ersätter
sys.stdin()
med ett ”filliknande objekt” med buffring av utdata. Denna fara är att om flera processer anroparclose()
på detta filliknande objekt, kan det resultera i att samma data spolas till objektet flera gånger, vilket resulterar i korruption.Om du skriver ett filliknande objekt och implementerar din egen cachelagring kan du göra den gaffelsäker genom att lagra pid när du lägger till något i cachen och kassera cachen när pid ändras. Till exempel:
@egenskap def cache(self): pid = os.getpid() if pid != self._pid: self._pid = pid self._cache = [] returnera self._cache
Startmetoderna spawn och forkserver¶
Det finns några extra begränsningar som inte gäller för startmetoden fork.
Mer betningsbarhet
Se till att alla argument till
Process
är betningsbara. Om du subklassarProcess.__init__
måste du också se till att instanser kommer att vara betningsbara när metodenProcess.start
anropas.
Globala variabler
Tänk på att om kod som körs i en underordnad process försöker komma åt en global variabel, så kanske det värde som visas (om det finns något) inte är detsamma som värdet i den överordnade processen vid den tidpunkt då
Process.start
anropades.Globala variabler som bara är konstanter på modulnivå orsakar dock inga problem.
Säker import av huvudmodul
Se till att huvudmodulen kan importeras på ett säkert sätt av en ny Python-tolk utan att orsaka oavsiktliga bieffekter (t.ex. att starta en ny process).
Om du till exempel använder startmetoden spawn eller forkserver och kör följande modul skulle det misslyckas med
RuntimeError
:från multiprocessing import Process def foo(): print('hallå') p = Process(mål=foo) p.start()Istället bör man skydda programmets ”ingångspunkt” genom att använda
if __name__ == '__main__':
på följande sätt:from multiprocessing import Process, freeze_support, set_start_method def foo(): print('hello') if __name__ == '__main__': freeze_support() set_start_method('spawn') p = Process(mål=foo) p.start()(Raden
freeze_support()
kan utelämnas om programmet ska köras normalt istället för att frysas)Detta gör att den nystartade Python-tolken kan importera modulen på ett säkert sätt och sedan köra modulens funktion
foo()
.Liknande begränsningar gäller om en pool eller manager skapas i huvudmodulen.
Exempel¶
Demonstration av hur man skapar och använder anpassade hanterare och fullmakter:
from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator
##
class Foo:
def f(self):
print('you called Foo.f()')
def g(self):
print('you called Foo.g()')
def _h(self):
print('you called Foo._h()')
# A simple generator function
def baz():
for i in range(10):
yield i*i
# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
_exposed_ = ['__next__']
def __iter__(self):
return self
def __next__(self):
return self._callmethod('__next__')
# Function to return the operator module
def get_operator_module():
return operator
##
class MyManager(BaseManager):
pass
# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)
# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))
# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)
# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)
##
def test():
manager = MyManager()
manager.start()
print('-' * 20)
f1 = manager.Foo1()
f1.f()
f1.g()
assert not hasattr(f1, '_h')
assert sorted(f1._exposed_) == sorted(['f', 'g'])
print('-' * 20)
f2 = manager.Foo2()
f2.g()
f2._h()
assert not hasattr(f2, 'f')
assert sorted(f2._exposed_) == sorted(['g', '_h'])
print('-' * 20)
it = manager.baz()
for i in it:
print('<%d>' % i, end=' ')
print()
print('-' * 20)
op = manager.operator()
print('op.add(23, 45) =', op.add(23, 45))
print('op.pow(2, 94) =', op.pow(2, 94))
print('op._exposed_ =', op._exposed_)
##
if __name__ == '__main__':
freeze_support()
test()
Använder Pool
:
import multiprocessing
import time
import random
import sys
#
# Functions used by test code
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % (
multiprocessing.current_process().name,
func.__name__, args, result
)
def calculatestar(args):
return calculate(*args)
def mul(a, b):
time.sleep(0.5 * random.random())
return a * b
def plus(a, b):
time.sleep(0.5 * random.random())
return a + b
def f(x):
return 1.0 / (x - 5.0)
def pow3(x):
return x ** 3
def noop(x):
pass
#
# Test code
#
def test():
PROCESSES = 4
print('Creating pool with %d processes\n' % PROCESSES)
with multiprocessing.Pool(PROCESSES) as pool:
#
# Tests
#
TASKS = [(mul, (i, 7)) for i in range(10)] + \
[(plus, (i, 8)) for i in range(10)]
results = [pool.apply_async(calculate, t) for t in TASKS]
imap_it = pool.imap(calculatestar, TASKS)
imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)
print('Ordered results using pool.apply_async():')
for r in results:
print('\t', r.get())
print()
print('Ordered results using pool.imap():')
for x in imap_it:
print('\t', x)
print()
print('Unordered results using pool.imap_unordered():')
for x in imap_unordered_it:
print('\t', x)
print()
print('Ordered results using pool.map() --- will block till complete:')
for x in pool.map(calculatestar, TASKS):
print('\t', x)
print()
#
# Test error handling
#
print('Testing error handling:')
try:
print(pool.apply(f, (5,)))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.apply()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(pool.map(f, list(range(10))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from pool.map()')
else:
raise AssertionError('expected ZeroDivisionError')
try:
print(list(pool.imap(f, list(range(10)))))
except ZeroDivisionError:
print('\tGot ZeroDivisionError as expected from list(pool.imap())')
else:
raise AssertionError('expected ZeroDivisionError')
it = pool.imap(f, list(range(10)))
for i in range(10):
try:
x = next(it)
except ZeroDivisionError:
if i == 5:
pass
except StopIteration:
break
else:
if i == 5:
raise AssertionError('expected ZeroDivisionError')
assert i == 9
print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
print()
#
# Testing timeouts
#
print('Testing ApplyResult.get() with timeout:', end=' ')
res = pool.apply_async(calculate, TASKS[0])
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % res.get(0.02))
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
print('Testing IMapIterator.next() with timeout:', end=' ')
it = pool.imap(calculatestar, TASKS)
while 1:
sys.stdout.flush()
try:
sys.stdout.write('\n\t%s' % it.next(0.02))
except StopIteration:
break
except multiprocessing.TimeoutError:
sys.stdout.write('.')
print()
print()
if __name__ == '__main__':
multiprocessing.freeze_support()
test()
Ett exempel som visar hur man använder köer för att mata uppgifter till en samling arbetsprocesser och samla in resultaten:
import time
import random
from multiprocessing import Process, Queue, current_process, freeze_support
#
# Function run by worker processes
#
def worker(input, output):
for func, args in iter(input.get, 'STOP'):
result = calculate(func, args)
output.put(result)
#
# Function used to calculate result
#
def calculate(func, args):
result = func(*args)
return '%s says that %s%s = %s' % \
(current_process().name, func.__name__, args, result)
#
# Functions referenced by tasks
#
def mul(a, b):
time.sleep(0.5*random.random())
return a * b
def plus(a, b):
time.sleep(0.5*random.random())
return a + b
#
#
#
def test():
NUMBER_OF_PROCESSES = 4
TASKS1 = [(mul, (i, 7)) for i in range(20)]
TASKS2 = [(plus, (i, 8)) for i in range(10)]
# Create queues
task_queue = Queue()
done_queue = Queue()
# Submit tasks
for task in TASKS1:
task_queue.put(task)
# Start worker processes
for i in range(NUMBER_OF_PROCESSES):
Process(target=worker, args=(task_queue, done_queue)).start()
# Get and print results
print('Unordered results:')
for i in range(len(TASKS1)):
print('\t', done_queue.get())
# Add more tasks using `put()`
for task in TASKS2:
task_queue.put(task)
# Get and print some more results
for i in range(len(TASKS2)):
print('\t', done_queue.get())
# Tell child processes to stop
for i in range(NUMBER_OF_PROCESSES):
task_queue.put('STOP')
if __name__ == '__main__':
freeze_support()
test()