multiprocessing — Processbaserad parallellism

Källkod: Lib/multiprocessing/


Tillgänglighet: not Android, not iOS, not WASI.

Denna modul stöds inte på mobile platforms eller WebAssembly platforms.

Introduktion

multiprocessing är ett paket med stöd för att skapa processer med hjälp av ett API som liknar modulen threading. Paketet multiprocessing erbjuder både lokal och fjärransluten samtidighet, vilket effektivt kringgår Global Interpreter Lock genom att använda underprocesser istället för trådar. På grund av detta tillåter multiprocessing-modulen programmeraren att fullt ut utnyttja flera processorer på en given maskin. Den körs på både POSIX och Windows.

Modulen multiprocessing introducerar också API:er som inte har någon motsvarighet i modulen threading. Ett utmärkt exempel på detta är Pool-objektet som erbjuder ett bekvämt sätt att parallellisera exekveringen av en funktion över flera indatavärden genom att distribuera indata över processer (dataparallellism). Följande exempel visar hur det är vanligt att definiera sådana funktioner i en modul så att underordnade processer kan importera den modulen. Detta grundläggande exempel på dataparallellism använder Pool,

från multiprocessing import Pool

def f(x):
    returnerar x*x

om __name__ == '__main__':
    med Pool(5) som p:
        print(p.map(f, [1, 2, 3]))

kommer att skrivas ut till standardutmatningen

[1, 4, 9]

Se även

concurrent.futures.ProcessPoolExecutor erbjuder ett gränssnitt på högre nivå för att flytta uppgifter till en bakgrundsprocess utan att blockera körningen av den anropande processen. Jämfört med att använda gränssnittet Pool direkt, gör API:t concurrent.futures det lättare att separera inlämning av arbete till den underliggande processpoolen från väntan på resultat.

Klassen Process

I multiprocessing startas processer genom att skapa ett Process-objekt och sedan anropa dess start()-metod. Process följer API:et för threading.Thread. Ett trivialt exempel på ett multiprocessprogram är

från multiprocessing import Process

def f(namn):
    print('hallå', namn)

if __name__ == '__main__':
    p = Process(mål=f, args=('bob',))
    p.start()
    p.join()

För att visa de enskilda process-ID:n som är inblandade följer här ett utökat exempel:

from multiprocessing import Process
import os

def info(title):
    print(title)
    print('module name:', __name__)
    print('parent process:', os.getppid())
    print('process id:', os.getpid())

def f(name):
    info('function f')
    print('hello', name)

if __name__ == '__main__':
    info('main line')
    p = Process(target=f, args=('bob',))
    p.start()
    p.join()

För en förklaring till varför if __name__ == '__main__'-delen är nödvändig, se Riktlinjer för programmering.

Argumenten till Process måste vanligtvis vara omöjliga att plocka upp från barnprocessen. Om du försökte skriva in ovanstående exempel direkt i en REPL skulle det kunna leda till ett AttributeError i barnprocessen som försöker hitta funktionen f i modulen __main__.

Sammanhang och startmetoder

Beroende på plattform har multiprocessing stöd för tre sätt att starta en process. Dessa startmetoder är

spawn

Den överordnade processen startar en ny Python-tolkprocess. Barnprocessen kommer endast att ärva de resurser som krävs för att köra processobjektets run()-metod. I synnerhet kommer onödiga filbeskrivare och handtag från den överordnade processen inte att ärvas. Att starta en process med den här metoden är ganska långsamt jämfört med att använda fork eller forkserver.

Finns på POSIX- och Windows-plattformar. Standard på Windows och macOS.

gaffel

Den överordnade processen använder os.fork() för att förgrena Python-tolken. När barnprocessen startar är den i praktiken identisk med den överordnade processen. Alla resurser från den överordnade processen ärvs av barnprocessen. Observera att det är problematiskt att på ett säkert sätt forka en flertrådad process.

Tillgänglig på POSIX-system.

Ändrad i version 3.14: Detta är inte längre standardstartmetoden på någon plattform. Kod som kräver fork måste uttryckligen ange det via get_context() eller set_start_method().

Ändrad i version 3.12: Om Python kan upptäcka att din process har flera trådar, kommer funktionen os.fork() som denna startmetod anropar internt att ge upphov till en DeprecationWarning. Använd en annan startmetod. Se dokumentationen för os.fork() för ytterligare förklaringar.

forkserver

När programmet startas och startmetoden forkserver väljs, skapas en serverprocess. Från och med då, när en ny process behövs, ansluter den överordnade processen till servern och begär att den forkar en ny process. Fork-serverprocessen är enkeltrådad om inte systembibliotek eller förinstallerade importer skapar trådar som en bieffekt, så det är i allmänhet säkert för den att använda os.fork(). Inga onödiga resurser ärvs.

Finns på POSIX-plattformar som stöder överföring av filbeskrivare via Unix pipes, t.ex. Linux. Standard på dessa.

Ändrad i version 3.14: Detta blev standardstartmetoden på POSIX-plattformar.

Ändrad i version 3.4: spawn har lagts till på alla POSIX-plattformar och forkserver har lagts till på vissa POSIX-plattformar. Barnprocesser ärver inte längre alla föräldrarnas ärftliga handtag i Windows.

Ändrad i version 3.8: På macOS är startmetoden spawn nu standard. Startmetoden fork bör betraktas som osäker eftersom den kan leda till att underprocessen kraschar eftersom systembiblioteken i macOS kan starta trådar. Se bpo-33725.

Ändrad i version 3.14: På POSIX-plattformar ändrades standardstartmetoden från fork till forkserver för att bibehålla prestandan men undvika vanliga inkompatibiliteter med flertrådade processer. Se gh-84559.

På POSIX startar startmetoderna spawn eller forkserver även en resource tracker-process som spårar de olänkade namngivna systemresurser (t.ex. namngivna semaforer eller SharedMemory-objekt) som skapas av processer i programmet. När alla processer har avslutats kopplar resursspåraren bort eventuella kvarvarande spårningsobjekt. Vanligtvis bör det inte finnas några, men om en process dödades av en signal kan det finnas några ”läckta” resurser. (Varken läckta semaforer eller delade minnessegment kommer automatiskt att avlänkas förrän vid nästa omstart. Detta är problematiskt för båda objekten eftersom systemet endast tillåter ett begränsat antal namngivna semaforer, och delade minnessegment upptar en del utrymme i huvudminnet)

För att välja en startmetod använder du set_start_method() i if __name__ == '__main__' i huvudmodulens klausul. Till exempel:

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    mp.set_start_method('spawn')
    q = mp.Queue()
    p = mp.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

set_start_method() bör inte användas mer än en gång i programmet.

Alternativt kan du använda get_context() för att hämta ett kontextobjekt. Context-objekt har samma API som multiprocessing-modulen och gör att man kan använda flera startmetoder i samma program.

import multiprocessing as mp

def foo(q):
    q.put('hello')

if __name__ == '__main__':
    ctx = mp.get_context('spawn')
    q = ctx.Queue()
    p = ctx.Process(target=foo, args=(q,))
    p.start()
    print(q.get())
    p.join()

Observera att objekt som är relaterade till en kontext kanske inte är kompatibla med processer för en annan kontext. I synnerhet kan lås som skapats med kontexten fork inte skickas till processer som startats med startmetoderna spawn eller forkserver.

Bibliotek som använder multiprocessing eller ProcessPoolExecutor bör utformas så att de tillåter sina användare att tillhandahålla sin egen multiprocessing-kontext. Om du använder en egen specifik kontext i ett bibliotek kan det leda till inkompatibilitet med resten av biblioteksanvändarens applikation. Dokumentera alltid om ditt bibliotek kräver en specifik startmetod.

Varning

Startmetoderna 'spawn' och 'forkserver' kan i allmänhet inte användas med ”frysta” körbara filer (dvs. binära filer som produceras av paket som PyInstaller och cx_Freeze) på POSIX-system. Startmetoden 'fork' kan fungera om koden inte använder trådar.

Utbyte av objekt mellan processer

multiprocessing stöder två typer av kommunikationskanaler mellan processer:

Köer

Klassen Queue är en nära klon av queue.Queue. Till exempel:

från multiprocessing import Process, 

def f(q):
    q.put([42, None, 'hallå'])

if __name__ == '__main__':
    q = ()
    p = Process(mål=f, args=(q,))
    p.start()
    print(q.get()) # skriver ut "[42, None, 'hello']"
    p.join()

Köer är tråd- och processäkra. Alla objekt som sätts in i en multiprocessing-kö kommer att serialiseras.

Pipor

Funktionen Pipe() returnerar ett par anslutningsobjekt som är anslutna med en pipe som standard är duplex (tvåvägs). Till exempel:

från multiprocessing import Process, Rör

def f(conn):
    conn.send([42, None, 'hallå'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(mål=f, args=(barn_conn,))
    p.start()
    print(parent_conn.recv()) # skriver ut "[42, None, 'hello']"
    p.join()

De två anslutningsobjekt som returneras av Pipe() representerar rörets två ändar. Varje anslutningsobjekt har metoderna send() och recv() (bland andra). Observera att data i en pipe kan bli korrumperade om två processer (eller trådar) försöker läsa från eller skriva till samma ände av pipen samtidigt. Naturligtvis finns det ingen risk för korruption från processer som använder olika ändar av pipen samtidigt.

Metoden send() serialiserar objektet och recv() återskapar objektet.

Synkronisering mellan processer

multiprocessing innehåller motsvarigheter till alla synkroniseringsprimitiver från threading. Till exempel kan man använda ett lås för att säkerställa att endast en process skriver ut till standardutdata åt gången:

från multiprocessing import Process, Lås

def f(l, i):
    l.acquire()
    försök:
        print('hello world', i)
    slutligen:
        l.release()

if __name__ == '__main__':
    lock = Lock()

    för num i intervall(10):
        Process(mål=f, args=(lock, num)).start()

Om man inte använder låset kan utdata från de olika processerna blandas ihop.

Delning av tillstånd mellan processer

Som nämnts ovan är det vid samtidig programmering oftast bäst att undvika att använda delat tillstånd så långt det är möjligt. Detta gäller särskilt när man använder flera processer.

Men om du verkligen behöver använda delade data finns det ett par sätt att göra det på multiprocessing.

Gemensamt minne

Data kan lagras i en delad minneskarta med hjälp av Value eller Array. Till exempel följande kod

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

kommer att skriva ut

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Argumenten 'd' och 'i' som används när num och arr skapas är typkoder av den typ som används av modulen array: 'd' anger en flottör med dubbel precision och 'i' anger ett signerat heltal. Dessa delade objekt kommer att vara process- och trådsäkra.

För mer flexibilitet vid användning av delat minne kan man använda modulen multiprocessing.sharedctypes som stöder skapandet av godtyckliga ctypes-objekt som allokeras från delat minne.

Serverprocess

Ett manager-objekt som returneras av Manager() styr en serverprocess som håller Python-objekt och tillåter andra processer att manipulera dem med hjälp av proxies.

En manager som returneras av Manager() kommer att stödja typerna list, dict, set, Namespace, Lock, RLock, Semaphore, BoundedSemaphore, Condition, Event, Barrier, Queue, Value och Array. Till exempel

från multiprocessing import Process, Manager

def f(d, l, s):
    d[1] = '1'
    d['2'] = 2
    d[0,25] = Ingen
    l.reverse()
    s.add('a')
    s.add('b')

if __name__ == '__main__':
    med Manager() som manager:
        d = manager.dict()
        l = manager.list(intervall(10))
        s = manager.set()

        p = Process(mål=f, args=(d, l, s))
        p.start()
        p.join()

        print(d)
        print(l)
        skriv ut(s)

kommer att skriva ut

{0,25: Ingen, 1: '1', '2': 2}
[9, 8, 7, 6, 5, 4, 3, 2, 1, 0]
{'a', 'b'}

Serverprocesshanterare är mer flexibla än delade minnesobjekt eftersom de kan göras så att de stöder godtyckliga objekttyper. Dessutom kan en enda hanterare delas av processer på olika datorer över ett nätverk. De är dock långsammare än om man använder delat minne.

Använda en pool av arbetstagare

Klassen Pool representerar en pool av arbetsprocesser. Den har metoder som gör det möjligt att avlasta uppgifter till arbetsprocesserna på några olika sätt.

Till exempel:

from multiprocessing import Pool, TimeoutError
import time
import os

def f(x):
    return x*x

if __name__ == '__main__':
    # start 4 worker processes
    with Pool(processes=4) as pool:

        # print "[0, 1, 4,..., 81]"
        print(pool.map(f, range(10)))

        # print same numbers in arbitrary order
        for i in pool.imap_unordered(f, range(10)):
            print(i)

        # evaluate "f(20)" asynchronously
        res = pool.apply_async(f, (20,))      # runs in *only* one process
        print(res.get(timeout=1))             # prints "400"

        # evaluate "os.getpid()" asynchronously
        res = pool.apply_async(os.getpid, ()) # runs in *only* one process
        print(res.get(timeout=1))             # prints the PID of that process

        # launching multiple evaluations asynchronously *may* use more processes
        multiple_results = [pool.apply_async(os.getpid, ()) for i in range(4)]
        print([res.get(timeout=1) for res in multiple_results])

        # make a single worker sleep for 10 seconds
        res = pool.apply_async(time.sleep, (10,))
        try:
            print(res.get(timeout=1))
        except TimeoutError:
            print("We lacked patience and got a multiprocessing.TimeoutError")

        print("For the moment, the pool remains available for more work")

    # exiting the 'with'-block has stopped the pool
    print("Now the pool is closed and no longer available")

Observera att metoderna i en pool endast ska användas av den process som skapade den.

Anteckning

Funktionalitet inom detta paket kräver att modulen __main__ kan importeras av underordnade program. Detta behandlas i Riktlinjer för programmering men det är värt att påpeka här. Detta innebär att vissa exempel, såsom multiprocessing.pool.Pool exemplen inte kommer att fungera i den interaktiva tolken. Till exempel:

>>> from multiprocessing import Pool
>>> p = Pool(5)
>>> def f(x):
...     return x*x
...
>>> with p:
...     p.map(f, [1,2,3])
Process PoolWorker-1:
Process PoolWorker-2:
Process PoolWorker-3:
Traceback (most recent call last):
Traceback (most recent call last):
Traceback (most recent call last):
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>
AttributeError: Can't get attribute 'f' on <module '__main__' (<class '_frozen_importlib.BuiltinImporter'>)>

(Om du försöker detta kommer det faktiskt att matas ut tre fullständiga spårningar som är sammanflätade på ett halvt slumpmässigt sätt, och då kanske du måste stoppa den överordnade processen på något sätt)

Referens

Paketet multiprocessing replikerar till största delen API:et för modulen threading.

Process och undantag

class multiprocessing.Process(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)

Processobjekt representerar aktivitet som körs i en separat process. Klassen Process har motsvarigheter till alla metoder i threading.Thread.

Konstruktorn ska alltid anropas med nyckelordsargument. group bör alltid vara None; den existerar enbart för kompatibilitet med threading.Thread. target är det anropsbara objekt som skall anropas av run()-metoden. Standardvärdet är None, vilket betyder att ingenting anropas. name är processnamnet (se name för mer information). args är argumenttupeln för målinkallningen. kwargs är en ordbok med nyckelordsargument för målanropet. Om det tillhandahålls sätter det enda nyckelordsargumentet daemon processens flagga daemon till True eller False. Om None (standard), kommer denna flagga att ärvas från den skapande processen.

Som standard skickas inga argument till target. Argumentet args, som som standard är (), kan användas för att ange en lista eller tupel av de argument som ska skickas till target.

Om en subklass åsidosätter konstruktorn måste den se till att den anropar basklassens konstruktor (super().__init__()) innan den gör något annat med processen.

Anteckning

I allmänhet måste alla argument till Process vara plockbara. Detta observeras ofta när man försöker skapa en Process eller använda en concurrent.futures.ProcessPoolExecutor från en REPL med en lokalt definierad target-funktion.

Att skicka ett anropsbart objekt som definierats i den aktuella REPL-sessionen gör att barnprocessen dör via ett icke fångat AttributeError-undantag när den startas eftersom target måste ha definierats i en importerbar modul för att kunna laddas under unpickling.

Exempel på detta oavhjälpliga fel från barnet:

>>> import multiprocessing as mp
>>> def knigit():
...     print("Ni!")
...
>>> process = mp.Process(target=knigit)
>>> process.start()
>>> Traceback (most recent call last):
  File ".../multiprocessing/spawn.py", line ..., in spawn_main
  File ".../multiprocessing/spawn.py", line ..., in _main
AttributeError: module '__main__' has no attribute 'knigit'
>>> process
<SpawnProcess name='SpawnProcess-1' pid=379473 parent=378707 stopped exitcode=1>

Se Startmetoderna spawn och forkserver. Även om denna begränsning inte är sann om man använder startmetoden "fork", så är den inte längre standard på någon plattform från och med Python 3.14. Se Sammanhang och startmetoder. Se även gh-132898.

Ändrad i version 3.3: Parametern daemon har lagts till.

run()

Metod som representerar processens aktivitet.

Du kan åsidosätta denna metod i en subklass. Standardmetoden run() anropar det anropsbara objekt som skickats till objektets konstruktör som målargument, om något, med sekventiella argument och nyckelordsargument som hämtas från argumenten args respektive kwargs.

Samma effekt uppnås genom att använda en lista eller tupel som args-argument i Process.

Exempel:

>>> from multiprocessing import Process
>>> p = Process(target=print, args=[1])
>>> p.run()
1
>>> p = Process(target=print, args=(1,))
>>> p.run()
1
start()

Starta processens aktivitet.

Detta måste anropas högst en gång per processobjekt. Det ordnar så att objektets run()-metod anropas i en separat process.

join([timeout])

Om det valfria argumentet timeout är None (standard) blockeras metoden tills den process vars join()-metod anropas avslutas. Om timeout är ett positivt tal blockeras den i högst timeout sekunder. Observera att metoden returnerar None om dess process avslutas eller om metoden tidsbegränsas. Kontrollera processens exitcode för att avgöra om den avslutades.

En process kan anslutas flera gånger.

En process kan inte ansluta sig till sig själv eftersom detta skulle orsaka ett dödläge. Det är ett fel att försöka ansluta sig till en process innan den har startats.

name

Processens namn. Namnet är en sträng som endast används i identifieringssyfte. Det har ingen semantik. Flera processer kan ges samma namn.

Det initiala namnet sätts av konstruktören. Om inget explicit namn anges till konstruktören, konstrueras ett namn av formen ”Process-N1:N2:…:Nk”, där varje Nk är det N:te barnet till sin förälder.

is_alive()

Returnerar om processen är vid liv.

Ett processobjekt är i princip vid liv från det ögonblick då metoden start() returneras till dess att barnprocessen avslutas.

daemon

Processens daemonflagga, ett booleanskt värde. Detta måste anges innan start() anropas.

Det initiala värdet ärvs från skapandeprocessen.

När en process avslutas försöker den avsluta alla sina daemoniska underordnade processer.

Observera att en daemonisk process inte får skapa underordnade processer. I annat fall skulle en daemonisk process lämna sina barn föräldralösa om den avslutas när dess föräldraprocess avslutas. Dessutom är detta inte Unix daemons eller tjänster, det är normala processer som kommer att avslutas (och inte anslutas) om icke-daemoniska processer har avslutats.

Förutom API:et threading.Thread stöder Process-objekten även följande attribut och metoder:

pid

Returnerar processens ID. Innan processen startas kommer detta att vara None.

exitcode

Barnets utgångskod. Denna kommer att vara None om processen ännu inte har avslutats.

Om barnets run()-metod returnerades normalt blir utgångskoden 0. Om den avslutades via sys.exit() med ett heltalsargument N blir utgångskoden N.

Om barnet avslutades på grund av ett undantag som inte fångades upp inom run(), kommer utgångskoden att vara 1. Om det avslutades av signalen N kommer utgångskoden att vara det negativa värdet -N.

authkey

Processens autentiseringsnyckel (en byte-sträng).

När multiprocessing initieras tilldelas huvudprocessen en slumpmässig sträng med hjälp av os.urandom().

När ett Process-objekt skapas kommer det att ärva autentiseringsnyckeln för sin överordnade process, även om detta kan ändras genom att ange authkey till en annan byte-sträng.

Se Autentiseringsnycklar.

sentinel

Ett numeriskt handtag för ett systemobjekt som kommer att bli ”klart” när processen avslutas.

Du kan använda det här värdet om du vill vänta på flera händelser samtidigt med hjälp av multiprocessing.connection.wait(). Annars är det enklare att anropa join().

I Windows är detta ett OS-handtag som kan användas med API-anropen WaitForSingleObject och WaitForMultipleObjects. På POSIX är detta en filbeskrivare som kan användas med primitiver från modulen select.

Tillagd i version 3.3.

interrupt()

Avslutar processen. Fungerar på POSIX med hjälp av signalen SIGINT. Beteendet på Windows är odefinierat.

Som standard avslutas den underordnade processen genom att KeyboardInterrupt aktiveras. Detta beteende kan ändras genom att ställa in respektive signalhanterare i barnprocessen signal.signal() för SIGINT.

Obs: om barnprocessen fångar upp och kastar KeyboardInterrupt, kommer processen inte att avslutas.

Notera: standardbeteendet kommer också att sätta exitcode till 1 som om ett undantag utan åtgärd har uppstått i barnprocessen. För att ha en annan exitcode kan du helt enkelt fånga KeyboardInterrupt och anropa exit(your_code).

Tillagd i version 3.14.

terminate()

Avsluta processen. På POSIX görs detta med hjälp av signalen SIGTERM; på Windows används TerminateProcess(). Observera att exit-handlers och finally-klausuler etc. inte kommer att exekveras.

Observera att processer som härstammar från processen inte kommer att avslutas - de kommer helt enkelt att bli föräldralösa.

Varning

Om denna metod används när den associerade processen använder en pipe eller en kö, kan pipen eller kön bli skadad och bli oanvändbar för andra processer. På samma sätt, om processen har förvärvat ett lås eller en semafor etc., kan avslutningen av den leda till att andra processer fastnar.

kill()

Samma som terminate() men använder signalen SIGKILL på POSIX.

Tillagd i version 3.7.

close()

Stänger Process-objektet och frigör alla resurser som är associerade med det. ValueError visas om den underliggande processen fortfarande körs. När close() har returnerats framgångsrikt kommer de flesta andra metoder och attribut för Process-objektet att ge upphov till ValueError.

Tillagd i version 3.7.

Observera att metoderna start(), join(), is_alive(), terminate() och exitcode endast ska anropas av den process som skapade processobjektet.

Exempel på användning av några av metoderna i Process:

>>> import multiprocessing, time, signal
>>> mp_context = multiprocessing.get_context('spawn')
>>> p = mp_context.Process(target=time.sleep, args=(1000,))
>>> print(p, p.is_alive())
<...Process ... initial> False
>>> p.start()
>>> print(p, p.is_alive())
<...Process ... started> True
>>> p.terminate()
>>> time.sleep(0.1)
>>> print(p, p.is_alive())
<...Process ... stopped exitcode=-SIGTERM> False
>>> p.exitcode == -signal.SIGTERM
True
exception multiprocessing.ProcessError

Basklassen för alla multiprocessing-undantag.

exception multiprocessing.BufferTooShort

Exception orsakad av Connection.recv_bytes_into() när det medföljande buffertobjektet är för litet för det lästa meddelandet.

Om e är en instans av BufferTooShort så kommer e.args[0] att ge meddelandet som en byte-sträng.

exception multiprocessing.AuthenticationError

Uppstår när det finns ett autentiseringsfel.

exception multiprocessing.TimeoutError

Utlöses av metoder med en timeout när timeouten löper ut.

Pipes och köer

När man använder flera processer använder man i allmänhet meddelandepassning för kommunikation mellan processerna och undviker att behöva använda synkroniseringsprimitiver som lås.

För att skicka meddelanden kan man använda Pipe() (för en anslutning mellan två processer) eller en kö (som tillåter flera producenter och konsumenter).

Typerna Queue, SimpleQueue och JoinableQueue är köer med flera producenter och flera konsumenter FIFO som är modellerade efter klassen queue.Queue i standardbiblioteket. De skiljer sig åt genom att Queue saknar metoderna task_done() och join() som introducerades i Python 2.5:s klass queue.Queue.

Om du använder JoinableQueuemåste du anropa JoinableQueue.task_done() för varje uppgift som tas bort från kön, annars kan semaforen som används för att räkna antalet oavslutade uppgifter så småningom svämma över och orsaka ett undantag.

En skillnad från andra köimplementationer i Python är att multiprocessing-köer serialiserar alla objekt som läggs in i dem med pickle. Objektet som returneras av get-metoden är ett återskapat objekt som inte delar minne med det ursprungliga objektet.

Observera att man också kan skapa en delad kö genom att använda ett manager-objekt – se Ansvariga.

Anteckning

multiprocessing använder de vanliga undantagen queue.Empty och queue.Full för att signalera en timeout. De är inte tillgängliga i namnrymden multiprocessing så du måste importera dem från queue.

Anteckning

När ett objekt sätts i en kö picklas objektet och en bakgrundstråd spolar senare ut den picklade datan till en underliggande pipe. Detta har några konsekvenser som är lite överraskande, men bör inte orsaka några praktiska problem - om de verkligen stör dig kan du istället använda en kö som skapats med en manager.

  1. Efter att ha lagt ett objekt på en tom kö kan det bli en oändligt liten fördröjning innan köns metod empty() returnerar False och get_nowait() kan returnera utan att ge upphov till queue.Empty.

  2. Om flera processer ställer objekt i kö är det möjligt att objekten tas emot i den andra änden i fel ordning. Objekt som köas av samma process kommer dock alltid att vara i den förväntade ordningen i förhållande till varandra.

Varning

Om en process dödas med Process.terminate() eller os.kill() medan den försöker använda en Queue, är det troligt att data i kön blir skadade. Detta kan leda till att en annan process får ett undantag när den försöker använda kön senare.

Varning

Som nämnts ovan, om en underordnad process har lagt objekt i en kö (och inte har använt JoinableQueue.cancel_join_thread), kommer den processen inte att avslutas förrän alla buffrade objekt har spolats till pipen.

Det innebär att om du försöker ansluta dig till den processen kan du få ett dödläge om du inte är säker på att alla objekt som har ställts i kö har förbrukats. På samma sätt, om barnprocessen är icke-daemonisk, kan föräldraprocessen hänga sig vid utgången när den försöker ansluta sig till alla sina icke-daemoniska barn.

Observera att en kö som skapats med hjälp av en manager inte har detta problem. Se Riktlinjer för programmering.

För ett exempel på användning av köer för kommunikation mellan processer, se Exempel.

multiprocessing.Pipe([duplex])

Returnerar ett par (conn1, conn2) av Connection-objekt som representerar ändarna av ett rör.

Om duplex är True (standard) är röret dubbelriktat. Om duplex är False är pipen enkelriktad: conn1 kan bara användas för att ta emot meddelanden och conn2 kan bara användas för att skicka meddelanden.

Metoden send() serialiserar objektet med hjälp av pickle och metoden recv() återskapar objektet.

class multiprocessing.Queue([maxsize])

Returnerar en processdelad kö som implementerats med hjälp av en pipe och några lås/semaphores. När en process först lägger ett objekt i kön startas en matartråd som överför objekt från en buffert till pipen.

De vanliga undantagen queue.Empty och queue.Full från standardbibliotekets modul queue används för att signalera tidsavbrott.

Queue implementerar alla metoder i queue.Queue utom task_done() och join().

qsize()

Returnerar den ungefärliga storleken på kön. På grund av semantiken för multithreading/multiprocessing är detta tal inte tillförlitligt.

Observera att detta kan ge upphov till NotImplementedError på plattformar som macOS där sem_getvalue() inte är implementerad.

empty()

Returnerar True om kön är tom, annars False. På grund av semantiken för multithreading/multiprocessing är detta inte tillförlitligt.

Kan ge upphov till ett OSError på stängda köer. (inte garanterat)

full()

Returnerar True om kön är full, annars False. På grund av semantiken för multithreading/multiprocessing är detta inte tillförlitligt.

put(obj[, block[, timeout]])

Placerar obj i kön. Om det valfria argumentet block är True (standard) och timeout är None (standard), blockeras vid behov tills en ledig plats finns tillgänglig. Om timeout är ett positivt tal blockerar den i högst timeout sekunder och ger upphov till queue.Full-undantaget om ingen ledig plats fanns tillgänglig inom den tiden. I annat fall (block är False), lägg till ett objekt i kön om en ledig plats omedelbart finns tillgänglig, annars utlöses undantaget queue.Full (timeout ignoreras i det fallet).

Ändrad i version 3.8: Om kön är stängd kommer ValueError att uppstå istället för AssertionError.

put_nowait(obj)

Motsvarar put(obj, False).

get([block[, timeout]])

Tar bort och returnerar ett objekt från kön. Om de valfria argen block är True (standard) och timeout är None (standard), blockeras vid behov tills ett objekt är tillgängligt. Om timeout är ett positivt tal blockerar den högst timeout sekunder och ger upphov till queue.Empty-undantaget om inget objekt var tillgängligt inom den tiden. I annat fall (block är False) returneras ett objekt om det finns ett omedelbart tillgängligt, annars uppstår undantaget queue.Empty (timeout ignoreras i det fallet).

Ändrad i version 3.8: Om kön är stängd, genereras ValueError istället för OSError.

get_nowait()

Motsvarar get(False).

multiprocessing.Queue har några ytterligare metoder som inte finns i queue.Queue. Dessa metoder är vanligtvis onödiga för de flesta koder:

close()

Stäng kön: frigör interna resurser.

En kö får inte längre användas efter att den har stängts. Exempelvis får metoderna get(), put() och empty() inte längre anropas.

Bakgrundstråden kommer att avslutas när den har spolat all buffrad data till pipen. Detta anropas automatiskt när kön samlas in (garbage collected).

join_thread()

Gå med i bakgrundstråden. Detta kan endast användas efter att close() har anropats. Den blockerar tills bakgrundstråden avslutas och säkerställer att all data i bufferten har spolats till röret.

Om en process inte är skaparen av kön kommer den som standard att försöka ansluta sig till köns bakgrundstråd när den avslutas. Processen kan anropa cancel_join_thread() för att få join_thread() att inte göra någonting.

cancel_join_thread()

Förhindrar join_thread() från att blockera. I synnerhet förhindrar detta att bakgrundstråden ansluts automatiskt när processen avslutas – se join_thread().

Ett bättre namn på den här metoden skulle kunna vara allow_exit_without_flush(). Det är troligt att köade data går förlorade och du kommer nästan aldrig att behöva använda den. Den finns egentligen bara där om du vill att den aktuella processen ska avslutas omedelbart utan att vänta på att spola köade data till den underliggande pipen, och du inte bryr dig om förlorade data.

Anteckning

Funktionaliteten i denna klass kräver en fungerande implementation av delade semaforer i värdoperativsystemet. Utan en sådan kommer funktionaliteten i denna klass att inaktiveras, och försök att instansiera en Queue kommer att resultera i ett ImportError. Se bpo-3770 för ytterligare information. Detsamma gäller för någon av de specialiserade kötyper som listas nedan.

class multiprocessing.SimpleQueue

Det är en förenklad Queue-typ, mycket nära en låst Pipe.

close()

Stäng kön: frigör interna resurser.

En kö får inte längre användas efter att den har stängts. Till exempel får metoderna get(), put() och empty() inte längre anropas.

Tillagd i version 3.9.

empty()

Returnerar True om kön är tom, annars False.

Ger alltid upphov till ett OSError om SimpleQueue stängs.

get()

Ta bort och returnera ett objekt från kön.

put(item)

Lägg objekt i kön.

class multiprocessing.JoinableQueue([maxsize])

JoinableQueue, en underklass till Queue, är en kö som dessutom har metoderna task_done() och join().

task_done()

Anger att en tidigare köad uppgift är slutförd. Används av köanvändare. För varje get() som används för att hämta en uppgift, talar ett efterföljande anrop till task_done() om för kön att bearbetningen av uppgiften är klar.

Om en join() för närvarande blockeras kommer den att återupptas när alla objekt har bearbetats (vilket innebär att ett task_done()-anrop har mottagits för varje objekt som har put() i kön).

Utlöser ett ValueError om den anropas fler gånger än det finns objekt i kön.

join()

Blockera tills alla objekt i kön har hämtats och behandlats.

Antalet oavslutade uppgifter ökar varje gång ett objekt läggs till i kön. Antalet sjunker när en konsument anropar task_done() för att ange att objektet hämtades och att allt arbete med det är slutfört. När antalet oavslutade uppgifter sjunker till noll avblockeras join().

Diverse

multiprocessing.active_children()

Returnerar en lista över alla levande barn till den aktuella processen.

Om du anropar detta får det bieffekten att alla processer som redan har avslutats ”ansluts”.

multiprocessing.cpu_count()

Returnerar antalet CPU:er i systemet.

Detta antal motsvarar inte det antal processorer som den aktuella processen kan använda. Antalet användbara processorer kan erhållas med os.process_cpu_count() (eller len(os.sched_getaffinity(0))).

När antalet CPU:er inte kan bestämmas uppstår ett NotImplementedError.

Ändrad i version 3.13: Returvärdet kan också åsidosättas med flaggan -X cpu_count eller PYTHON_CPU_COUNT eftersom detta bara är ett omslag runt API:erna för cpu-räkning i os.

multiprocessing.current_process()

Returnerar Process-objektet som motsvarar den aktuella processen.

En motsvarighet till threading.current_thread().

multiprocessing.parent_process()

Returnerar Process-objektet som motsvarar den överordnade processen för current_process(). För huvudprocessen kommer parent_process att vara None.

Tillagd i version 3.8.

multiprocessing.freeze_support()

Lägg till stöd för när ett program som använder multiprocessing har frysts för att producera en körbar fil. (Har testats med py2exe, PyInstaller och cx_Freeze.)

Man måste anropa den här funktionen direkt efter raden if __name__ == '__main__' i huvudmodulen. Till exempel:

från multiprocessing import Process, freeze_support

def f():
    print('hej världen!')

if __name__ == '__main__':
    freeze_support()
    Process(mål=f).start()

Om raden freeze_support() utelämnas kommer försök att köra den frysta körbara filen att ge upphov till RuntimeError.

Anrop av freeze_support() har ingen effekt när startmetoden inte är spawn. Dessutom, om modulen körs normalt av Python-tolken (programmet har inte frysts), har freeze_support() ingen effekt.

multiprocessing.get_all_start_methods()

Returnerar en lista över de startmetoder som stöds, varav den första är standard. De möjliga startmetoderna är 'fork', 'spawn' och 'forkserver'. Alla plattformar stöder inte alla metoder. Se Sammanhang och startmetoder.

Tillagd i version 3.4.

multiprocessing.get_context(method=None)

Returnerar ett kontextobjekt som har samma attribut som modulen multiprocessing.

Om method är None returneras standardkontexten. Observera att om den globala startmetoden inte har ställts in, kommer detta att ställa in den till standardmetoden. Annars bör method vara 'fork', 'spawn', 'forkserver'. ValueError uppstår om den angivna startmetoden inte är tillgänglig. Se Sammanhang och startmetoder.

Tillagd i version 3.4.

multiprocessing.get_start_method(allow_none=False)

Returnerar namnet på den startmetod som används för att starta processer.

Om den globala startmetoden inte har ställts in och allow_none är False, ställs startmetoden in på standardvärdet och namnet returneras. Om startmetoden inte har ställts in och allow_none är True returneras None.

Returvärdet kan vara 'fork', 'spawn', 'forkserver' eller None. Se Sammanhang och startmetoder.

Tillagd i version 3.4.

Ändrad i version 3.8: På macOS är startmetoden spawn nu standard. Startmetoden fork bör betraktas som osäker eftersom den kan leda till att underprocessen kraschar. Se bpo-33725.

multiprocessing.set_executable(executable)

Ange sökvägen till den Python-tolk som ska användas när en underordnad process startas. (Som standard används sys.executable). Inbäddare kommer förmodligen att behöva göra något i stil med

set_executable(os.path.join(sys.exec_prefix, 'pythonw.exe'))

innan de kan skapa underordnade processer.

Ändrad i version 3.4: Stödjs nu på POSIX när startmetoden 'spawn' används.

Ändrad i version 3.11: Accepterar en path-like object.

multiprocessing.set_forkserver_preload(module_names)

Ange en lista med modulnamn som huvudprocessen för forkserver ska försöka importera så att deras redan importerade tillstånd ärvs av förgrenade processer. Alla ImportError när detta görs ignoreras i tysthet. Detta kan användas som en prestandaförbättring för att undvika upprepat arbete i varje process.

För att detta ska fungera måste det anropas innan forkserver-processen har startats (innan du skapar en Pool eller startar en Process).

Endast meningsfullt när man använder startmetoden 'forkserver'. Se Sammanhang och startmetoder.

Tillagd i version 3.4.

multiprocessing.set_start_method(method, force=False)

Ange vilken metod som ska användas för att starta underordnade processer. Argumentet method kan vara 'fork', 'spawn' eller 'forkserver'. Utlöser RuntimeError om startmetoden redan har ställts in och force inte är True. Om method är None och force är True så sätts startmetoden till None. Om method är None och force är False sätts kontexten till standardkontexten.

Observera att detta ska anropas högst en gång och att det ska skyddas i if __name__ == '__main__' i huvudmodulens klausul.

Se Sammanhang och startmetoder.

Tillagd i version 3.4.

Anslutningsobjekt

Connection-objekt gör det möjligt att skicka och ta emot plockbara objekt eller strängar. De kan betraktas som meddelandeorienterade anslutna socklar.

Anslutningsobjekt skapas vanligtvis med Pipe – se även Lyssnare och kunder.

class multiprocessing.connection.Connection
send(obj)

Skicka ett objekt till den andra änden av anslutningen som ska läsas med hjälp av recv().

Objektet måste kunna picklas. Mycket stora pickles (ungefär 32 MiB+, men det beror på operativsystemet) kan ge upphov till ett ValueError-undantag.

recv()

Returnerar ett objekt som skickats från den andra änden av anslutningen med send(). Blockerar tills det finns något att ta emot. Utlöser EOFError om det inte finns något kvar att ta emot och den andra änden var stängd.

fileno()

Returnerar filbeskrivaren eller handtaget som används av anslutningen.

close()

Stäng anslutningen.

Detta anropas automatiskt när anslutningen garbage collectas.

poll([timeout])

Returnerar om det finns någon data tillgänglig för läsning.

Om timeout inte anges kommer den att returnera omedelbart. Om timeout är ett tal anger detta den maximala tiden i sekunder för blockering. Om timeout är None används en oändlig timeout.

Observera att flera anslutningsobjekt kan pollas samtidigt genom att använda multiprocessing.connection.wait().

send_bytes(buffer[, offset[, size]])

Skicka byte-data från ett bytesliknande objekt som ett komplett meddelande.

Om offset anges läses data från den positionen i buffer. Om size anges kommer så många byte att läsas från bufferten. Mycket stora buffertar (ungefär 32 MiB+, men det beror på operativsystemet) kan ge upphov till ett ValueError-undantag

recv_bytes([maxlength])

Returnerar ett komplett meddelande med byte-data som skickats från den andra änden av anslutningen som en sträng. Blockerar tills det finns något att ta emot. Utlöser EOFError om det inte finns något kvar att ta emot och den andra änden har stängt.

Om maxlength anges och meddelandet är längre än maxlength så uppstår OSError och anslutningen kommer inte längre att vara läsbar.

Ändrad i version 3.3: Denna funktion gav tidigare upphov till IOError, som nu är ett alias för OSError.

recv_bytes_into(buffer[, offset])

Läser in i buffer ett komplett meddelande med byte-data som skickas från den andra änden av anslutningen och returnerar antalet byte i meddelandet. Blockerar tills det finns något att ta emot. Utlöser EOFError om det inte finns något kvar att ta emot och den andra änden var stängd.

buffer måste vara ett skrivbart bytesliknande objekt. Om offset anges kommer meddelandet att skrivas in i bufferten från den positionen. Offset måste vara ett icke-negativt heltal som är mindre än längden på buffer (i byte).

Om bufferten är för kort uppstår ett BufferTooShort undantag och det fullständiga meddelandet finns tillgängligt som e.args[0] där e är undantagsinstansen.

Ändrad i version 3.3: Själva anslutningsobjekten kan nu överföras mellan processer med hjälp av Connection.send() och Connection.recv().

Anslutningsobjekt stöder nu också kontexthanteringsprotokollet – se Typer av kontexthanterare. __enter__() returnerar anslutningsobjektet och __exit__() anropar close().

Till exempel:

>>> from multiprocessing import Pipe
>>> a, b = Pipe()
>>> a.send([1, 'hello', None])
>>> b.recv()
[1, 'hello', None]
>>> b.send_bytes(b'thank you')
>>> a.recv_bytes()
b'thank you'
>>> import array
>>> arr1 = array.array('i', range(5))
>>> arr2 = array.array('i', [0] * 10)
>>> a.send_bytes(arr1)
>>> count = b.recv_bytes_into(arr2)
>>> assert count == len(arr1) * arr1.itemsize
>>> arr2
array('i', [0, 1, 2, 3, 4, 0, 0, 0, 0, 0])

Varning

Metoden Connection.recv() avplockar automatiskt de data den tar emot, vilket kan vara en säkerhetsrisk om du inte kan lita på den process som skickade meddelandet.

Om inte anslutningsobjektet producerades med Pipe() bör du därför endast använda metoderna recv() och send() efter att ha utfört någon form av autentisering. Se Autentiseringsnycklar.

Varning

Om en process dödas medan den försöker läsa eller skriva till en pipe kommer data i pipen sannolikt att korrumperas, eftersom det kan bli omöjligt att vara säker på var meddelandegränserna ligger.

Synkroniseringsprimitiver

Generellt sett är synkroniseringsprimitiver inte lika nödvändiga i ett multiprocessprogram som i ett flertrådat program. Se dokumentationen för modulen threading.

Observera att man också kan skapa synkroniseringsprimitiver genom att använda ett manager-objekt – se Ansvariga.

class multiprocessing.Barrier(parties[, action[, timeout]])

Ett barriärobjekt: en klon av threading.Barrier.

Tillagd i version 3.3.

class multiprocessing.BoundedSemaphore([value])

Ett begränsat semaforobjekt: en nära analog till threading.BoundedSemaphore.

En enda skillnad från dess nära analog finns: dess acquire-metods första argument heter block, vilket är i överensstämmelse med Lock.acquire().

Anteckning

På macOS kan detta inte skiljas från Semaphore eftersom sem_getvalue() inte är implementerat på den plattformen.

class multiprocessing.Condition([lock])

En villkorsvariabel: ett alias för threading.Condition.

Om lock anges bör det vara ett Lock- eller RLock-objekt från multiprocessing.

Ändrad i version 3.3: Metoden wait_for() lades till.

class multiprocessing.Event

En klon av threading.Event.

class multiprocessing.Lock

Ett icke-rekursivt låsobjekt: en nära motsvarighet till threading.Lock. När en process eller tråd har förvärvat ett lås kommer efterföljande försök att förvärva det från någon annan process eller tråd att blockeras tills det släpps; vilken process eller tråd som helst kan släppa det. Koncepten och beteendena i threading.Lock som gäller för trådar är replikerade här i multiprocessing.Lock som gäller för antingen processer eller trådar, förutom vad som anges.

Observera att Lock faktiskt är en fabriksfunktion som returnerar en instans av multiprocessing.synchronize.Lock initialiserad med en standardkontext.

Lock stöder protokollet context manager och kan därför användas i with-satser.

acquire(block=True, timeout=None)

Förvärva ett lås, blockerande eller icke-blockerande.

Med block-argumentet inställt på True (standard) kommer metodanropet att blockera tills låset är i ett olåst tillstånd, sedan ställa in det på låst och returnera True. Observera att namnet på detta första argument skiljer sig från det i threading.Lock.acquire().

Om argumentet block är satt till False blockeras inte metodanropet. Om låset för närvarande är i låst tillstånd returneras False; annars sätts låset i låst tillstånd och returneras True.

När timeout anropas med ett positivt värde i flyttal, blockeras låset i högst det antal sekunder som anges av timeout så länge låset inte kan förvärvas. Anrop med ett negativt värde för timeout är likvärdigt med en timeout på noll. Anrop med ett timeout-värde på None (standard) sätter timeout-perioden till oändlig. Observera att behandlingen av negativa eller None-värden för timeout skiljer sig från det implementerade beteendet i threading.Lock.acquire(). Argumentet timeout har inga praktiska konsekvenser om argumentet block är satt till False och ignoreras därför. Returnerar True om låset har förvärvats eller False om timeout-perioden har löpt ut.

release()

Frigör ett lås. Detta kan anropas från vilken process eller tråd som helst, inte bara från den process eller tråd som ursprungligen förvärvade låset.

Beteendet är detsamma som i threading.Lock.release() förutom att ett ValueError uppstår när det anropas på ett olåst lås.

locked()

Returnerar en boolean som anger om detta objekt är låst just nu.

Tillagd i version 3.14.

class multiprocessing.RLock

Ett rekursivt låsobjekt: en nära motsvarighet till threading.RLock. Ett rekursivt lås måste frigöras av den process eller tråd som förvärvade det. När en process eller tråd har förvärvat ett rekursivt lås kan samma process eller tråd förvärva det igen utan att blockera; den processen eller tråden måste släppa det en gång för varje gång det har förvärvats.

Observera att RLock faktiskt är en fabriksfunktion som returnerar en instans av multiprocessing.synchronize.RLock initialiserad med en standardkontext.

RLock stöder protokollet context manager och kan därför användas i with-satser.

acquire(block=True, timeout=None)

Förvärva ett lås, blockerande eller icke-blockerande.

När den anropas med argumentet block inställt på True blockeras den tills låset är olåst (inte ägs av någon process eller tråd) om inte låset redan ägs av den aktuella processen eller tråden. Den aktuella processen eller tråden tar då över ägandet av låset (om den inte redan har ägandet) och rekursionsnivån inuti låset ökar med ett, vilket resulterar i ett returvärde på True. Observera att det finns flera skillnader i detta första arguments beteende jämfört med implementeringen av threading.RLock.acquire(), som börjar med namnet på själva argumentet.

När den anropas med argumentet block satt till False, blockeras inte. Om låset redan har förvärvats (och därmed ägs) av en annan process eller tråd, tar den aktuella processen eller tråden inte över ägandet och rekursionsnivån i låset ändras inte, vilket resulterar i ett returvärde på False. Om låset är olåst tar den aktuella processen eller tråden över ägandet och rekursionsnivån ökas, vilket resulterar i ett returvärde på True.

Användning och beteenden för argumentet timeout är desamma som i Lock.acquire(). Observera att vissa av dessa beteenden hos timeout skiljer sig från de implementerade beteendena i threading.RLock.acquire().

release()

Frigör ett lås och dekrementera rekursionsnivån. Om rekursionsnivån är noll efter dekrementeringen, återställ låset till olåst (ägs inte av någon process eller tråd) och om några andra processer eller trådar är blockerade i väntan på att låset ska bli olåst, låt exakt en av dem fortsätta. Om rekursionsnivån efter dekrementeringen fortfarande inte är noll, förblir låset låst och ägs av den anropande processen eller tråden.

Anropa endast denna metod när den anropande processen eller tråden äger låset. Ett AssertionError uppstår om denna metod anropas av en annan process eller tråd än ägaren eller om låset är i ett olåst (oägt) tillstånd. Observera att den typ av undantag som uppstår i denna situation skiljer sig från det implementerade beteendet i threading.RLock.release().

locked()

Returnerar en boolean som anger om detta objekt är låst just nu.

Tillagd i version 3.14.

class multiprocessing.Semaphore([value])

Ett semaforobjekt: en nära motsvarighet till threading.Semaphore.

En enda skillnad från dess nära analog finns: dess acquire-metods första argument heter block, vilket är i överensstämmelse med Lock.acquire().

Anteckning

På macOS stöds inte sem_timedwait, så om du anropar acquire() med en timeout kommer den funktionens beteende att emuleras med en sovande loop.

Anteckning

En del av funktionaliteten i detta paket kräver en fungerande implementation av delade semaforer i värdoperativsystemet. Utan en sådan kommer modulen multiprocessing.synchronize att inaktiveras och försök att importera den kommer att resultera i ett ImportError. Se bpo-3770 för ytterligare information.

Delade ctypes-objekt

Det är möjligt att skapa delade objekt med hjälp av delat minne som kan ärvas av underordnade processer.

multiprocessing.Value(typecode_or_type, *args, lock=True)

Returnerar ett ctypes-objekt som allokerats från ett delat minne. Som standard är returvärdet faktiskt ett synkroniserat omslag för objektet. Själva objektet kan nås via value-attributet i en Value.

typecode_or_type bestämmer typen av det returnerade objektet: det är antingen en ctypes-typ eller en typkod med ett tecken av den typ som används av modulen array. *args skickas vidare till konstruktören för typen.

Om lock är True (standard) skapas ett nytt rekursivt låsobjekt för att synkronisera åtkomsten till värdet. Om lock är ett Lock- eller RLock-objekt kommer det att användas för att synkronisera åtkomst till värdet. Om lock är False kommer åtkomst till det returnerade objektet inte automatiskt att skyddas av ett lås, så det kommer inte nödvändigtvis att vara ”processäkert”.

Operationer som += som involverar en läsning och skrivning är inte atomiska. Så om du till exempel vill öka ett delat värde atomiskt räcker det inte med att bara göra

räknare.värde += 1

Förutsatt att det associerade låset är rekursivt (vilket det är som standard) kan du istället göra

med counter.get_lock():
    räknare.värde += 1

Observera att lock är ett argument som endast gäller nyckelord.

multiprocessing.Array(typecode_or_type, size_or_initializer, *, lock=True)

Returnerar en ctypes-array som allokerats från delat minne. Som standard är returvärdet faktiskt ett synkroniserat omslag för arrayen.

typecode_or_type bestämmer typen av element i den returnerade matrisen: det är antingen en ctypes-typ eller en typkod med ett tecken av den typ som används av modulen array. Om size_or_initializer är ett heltal bestämmer det längden på arrayen, och arrayen nollställs initialt. I annat fall är size_or_initializer en sekvens som används för att initiera arrayen och vars längd bestämmer arrayens längd.

Om lock är True (standard) skapas ett nytt låsobjekt för att synkronisera åtkomsten till värdet. Om lock är ett Lock- eller RLock-objekt kommer det att användas för att synkronisera åtkomst till värdet. Om lock är False kommer åtkomsten till det returnerade objektet inte automatiskt att skyddas av ett lås, så det är inte nödvändigtvis ”processäkert”.

Observera att lock endast är ett nyckelordsargument.

Observera att en array av ctypes.c_char har attributen value och raw som gör att man kan använda den för att lagra och hämta strängar.

Modulen multiprocessing.sharedctypes (multiprocessing.sharedctypes)

Modulen multiprocessing.sharedctypes tillhandahåller funktioner för allokering av ctypes-objekt från delat minne som kan ärvas av underordnade processer.

Anteckning

Även om det är möjligt att lagra en pekare i ett delat minne bör man komma ihåg att den kommer att referera till en plats i adressrymden för en specifik process. Det är dock mycket troligt att pekaren är ogiltig i en andra process och om man försöker dereferentiera pekaren från den andra processen kan det leda till en krasch.

multiprocessing.sharedctypes.RawArray(typecode_or_type, size_or_initializer)

Returnerar en ctypes-array som allokerats från delat minne.

typecode_or_type bestämmer typen av element i den returnerade matrisen: det är antingen en ctypes-typ eller en typkod med ett tecken av den typ som används av modulen array. Om size_or_initializer är ett heltal bestämmer det längden på arrayen, och arrayen nollställs initialt. Annars är size_or_initializer en sekvens som används för att initiera arrayen och vars längd bestämmer arrayens längd.

Observera att det kan vara icke-atomiskt att sätta och hämta ett element - använd Array() istället för att se till att åtkomsten automatiskt synkroniseras med hjälp av ett lås.

multiprocessing.sharedctypes.RawValue(typecode_or_type, *args)

Returnerar ett ctypes-objekt som allokerats från delat minne.

typecode_or_type bestämmer typen av det returnerade objektet: det är antingen en ctypes-typ eller en typkod med ett tecken av den typ som används av modulen array. *args skickas vidare till konstruktören för typen.

Observera att inställning och hämtning av värdet potentiellt är icke-atomiskt – använd Value() istället för att se till att åtkomsten automatiskt synkroniseras med hjälp av ett lås.

Observera att en array av ctypes.c_char har attributen value och raw som gör att man kan använda den för att lagra och hämta strängar – se dokumentation för ctypes.

multiprocessing.sharedctypes.Array(typecode_or_type, size_or_initializer, *, lock=True)

Samma sak som RawArray() förutom att beroende på värdet av lock kan en processäker synkroniseringsomslag returneras istället för en rå ctypes-array.

Om lock är True (standard) skapas ett nytt låsobjekt för att synkronisera åtkomst till värdet. Om lock är ett Lock- eller RLock-objekt kommer det att användas för att synkronisera åtkomst till värdet. Om lock är False kommer åtkomst till det returnerade objektet inte automatiskt att skyddas av ett lås, så det kommer inte nödvändigtvis att vara ”processäkert”.

Observera att lock är ett argument som endast gäller nyckelord.

multiprocessing.sharedctypes.Value(typecode_or_type, *args, lock=True)

Samma sak som RawValue() förutom att beroende på värdet av lock kan en processäker synkroniseringsomslag returneras istället för ett rått ctypes-objekt.

Om lock är True (standard) skapas ett nytt låsobjekt för att synkronisera åtkomst till värdet. Om lock är ett Lock- eller RLock-objekt kommer det att användas för att synkronisera åtkomst till värdet. Om lock är False kommer åtkomst till det returnerade objektet inte automatiskt att skyddas av ett lås, så det kommer inte nödvändigtvis att vara ”processäkert”.

Observera att lock är ett argument som endast gäller nyckelord.

multiprocessing.sharedctypes.copy(obj)

Returnerar ett ctypes-objekt som allokerats från delat minne och som är en kopia av ctypes-objektet obj.

multiprocessing.sharedctypes.synchronized(obj[, lock])

Returnerar ett processäkert omslagsobjekt för ett ctypes-objekt som använder lock för att synkronisera åtkomst. Om lock är None (standard) så skapas ett multiprocessing.RLock-objekt automatiskt.

En synkroniserad wrapper har två metoder utöver metoderna för det objekt som den omsluter: get_obj() returnerar det omslutna objektet och get_lock() returnerar det låsobjekt som används för synkronisering.

Observera att det kan vara mycket långsammare att komma åt ctypes-objektet via omslaget än att komma åt det obearbetade ctypes-objektet.

Ändrad i version 3.5: Synkroniserade objekt stöder protokollet context manager.

I tabellen nedan jämförs syntaxen för att skapa delade ctypes-objekt från delat minne med den normala ctypes-syntaxen. (I tabellen är MyStruct en underklass till ctypes.Structure.)

ctyper

sharedctypes använder typ

delade typer med hjälp av typkod

c_double(2,4)

RawValue(c_double, 2,4)

RawValue(’d’, 2.4)

MyStruct(4, 6)

RawValue(MyStruct, 4, 6)

(c_short * 7)()

RawArray(c_short, 7)

RawArray(’h’, 7)

(c_int * 3)(9, 2, 8)

RawArray(c_int, (9, 2, 8))

RawArray(”i”, (9, 2, 8))

Nedan följer ett exempel där ett antal ctypes-objekt ändras av en underordnad process:

from multiprocessing import Process, Lock
from multiprocessing.sharedctypes import Value, Array
from ctypes import Structure, c_double

class Point(Structure):
    _fields_ = [('x', c_double), ('y', c_double)]

def modify(n, x, s, A):
    n.value **= 2
    x.value **= 2
    s.value = s.value.upper()
    for a in A:
        a.x **= 2
        a.y **= 2

if __name__ == '__main__':
    lock = Lock()

    n = Value('i', 7)
    x = Value(c_double, 1.0/3.0, lock=False)
    s = Array('c', b'hello world', lock=lock)
    A = Array(Point, [(1.875,-6.25), (-5.75,2.0), (2.375,9.5)], lock=lock)

    p = Process(target=modify, args=(n, x, s, A))
    p.start()
    p.join()

    print(n.value)
    print(x.value)
    print(s.value)
    print([(a.x, a.y) for a in A])

De resultat som skrivs ut är

49
0.1111111111111111
HALLÅ VÄRLDEN
[(3.515625, 39.0625), (33.0625, 4.0), (5.640625, 90.25)]

Ansvariga

Managers är ett sätt att skapa data som kan delas mellan olika processer, inklusive delning över ett nätverk mellan processer som körs på olika maskiner. Ett manager-objekt kontrollerar en serverprocess som hanterar delade objekt. Andra processer kan komma åt de delade objekten genom att använda proxies.

multiprocessing.Manager()

Returnerar ett startat SyncManager-objekt som kan användas för att dela objekt mellan processer. Det returnerade manager-objektet motsvarar en skapad underordnad process och har metoder som skapar delade objekt och returnerar motsvarande proxyer.

Managerprocesser kommer att stängas av så snart som de är garbage collected eller deras överordnade process avslutas. Managerklasserna är definierade i modulen multiprocessing.managers:

class multiprocessing.managers.BaseManager(address=None, authkey=None, serializer='pickle', ctx=None, *, shutdown_timeout=1.0)

Skapa ett BaseManager-objekt.

När den har skapats bör man anropa start() eller get_server().serve_forever() för att säkerställa att managerobjektet refererar till en startad managerprocess.

address är den adress på vilken manager-processen lyssnar efter nya anslutningar. Om address är None så väljs en godtycklig adress.

authkey är autentiseringsnyckeln som kommer att användas för att kontrollera giltigheten av inkommande anslutningar till serverprocessen. Om authkey är None används current_process().authkey. Annars används authkey och den måste vara en byte-sträng.

serializer måste vara 'pickle' (använd pickle serialisering) eller 'xmlrpclib' (använd xmlrpc.client serialisering).

ctx är ett kontextobjekt, eller None (använd aktuell kontext). Se funktionen get_context().

shutdown_timeout är en timeout i sekunder som används för att vänta tills den process som används av förvaltaren avslutas i metoden shutdown(). Om tiden för nedstängning går ut avslutas processen. Om det också tar tid att avsluta processen dödas processen.

Ändrad i version 3.11: Parametern shutdown_timeout har lagts till.

start([initializer[, initargs]])

Starta en underprocess för att starta hanteraren. Om initializer inte är None kommer underprocessen att anropa initializer(*initargs) när den startar.

get_server()

Returnerar ett Server-objekt som representerar den faktiska servern under Managerns kontroll. Objektet Server har stöd för metoden serve_forever():

>>> from multiprocessing.managers import BaseManager
>>> manager = BaseManager(address=('', 50000), authkey=b'abc')
>>> server = manager.get_server()
>>> server.serve_forever()

Server har dessutom ett attribut address.

connect()

Anslut ett lokalt managerobjekt till en fjärrmanagerprocess:

>>> from multiprocessing.managers import BaseManager
>>> m = BaseManager(address=('127.0.0.1', 50000), authkey=b'abc')
>>> m.connect()
shutdown()

Stoppar den process som används av hanteraren. Detta är endast tillgängligt om start() har använts för att starta serverprocessen.

Detta kan anropas flera gånger.

register(typeid[, callable[, proxytype[, exposed[, method_to_typeid[, create_method]]]]])

En klassmetod som kan användas för att registrera en typ eller en anropsbar kod hos managerklassen.

typeid är en ”typidentifierare” som används för att identifiera en viss typ av delat objekt. Detta måste vara en sträng.

callable är en callable som används för att skapa objekt för denna typidentifierare. Om en manager-instans kommer att anslutas till servern med connect()-metoden, eller om create_method-argumentet är False, kan detta lämnas som None.

proxytype är en underklass till BaseProxy som används för att skapa proxy för delade objekt med detta typeid. Om None så skapas en proxyklass automatiskt.

exposed används för att ange en sekvens av metodnamn som proxyer för detta typeid ska få tillgång till med hjälp av BaseProxy._callmethod(). (Om exposed är None används istället proxytype._exposed_ om den finns) Om ingen exponerad lista anges kommer alla ”offentliga metoder” i det delade objektet att vara tillgängliga. (Här betyder en ”publik metod” alla attribut som har en __call__()-metod och vars namn inte börjar med '_')

method_to_typeid är en mappning som används för att ange returtypen för de exponerade metoder som ska returnera en proxy. Den mappar metodnamn till typid-strängar. (Om method_to_typeid är None används istället proxytype._method_to_typeid_ om den finns) Om en metods namn inte är en nyckel i denna mappning eller om mappningen är None kommer det objekt som returneras av metoden att kopieras med värde.

create_method avgör om en metod ska skapas med namnet typeid som kan användas för att tala om för serverprocessen att den ska skapa ett nytt delat objekt och returnera en proxy för det. Som standard är det True.

BaseManager-instanser har också en skrivskyddad egenskap:

address

Den adress som används av chefen.

Ändrad i version 3.3: Manager-objekt stöder kontexthanteringsprotokollet – se Typer av kontexthanterare. __enter__() startar serverprocessen (om den inte redan har startat) och returnerar sedan manager-objektet. __exit__() anropar shutdown().

I tidigare versioner __enter__() startade inte managerns serverprocess om den inte redan var startad.

class multiprocessing.managers.SyncManager

En underklass till BaseManager som kan användas för synkronisering av processer. Objekt av denna typ returneras av multiprocessing.Manager().

Dess metoder skapar och returnerar Proxy-objekt för ett antal vanligt förekommande datatyper som ska synkroniseras mellan processer. Detta inkluderar särskilt delade listor och lexikon.

Barrier(parties[, action[, timeout]])

Skapa ett delat threading.Barrier -objekt och returnera en proxy för det.

Tillagd i version 3.3.

BoundedSemaphore([value])

Skapa ett delat threading.BoundedSemaphore -objekt och returnera en proxy för det.

Condition([lock])

Skapa ett delat threading.Condition-objekt och returnera en proxy för det.

Om lock anges bör det vara en proxy för ett threading.Lock- eller threading.RLock-objekt.

Ändrad i version 3.3: Metoden wait_for() lades till.

Event()

Skapa ett delat threading.Event -objekt och returnera en proxy för det.

Lock()

Skapa ett delat threading.Lock-objekt och returnera en proxy för det.

Namespace()

Skapa ett delat Namespace-objekt och returnera en proxy för det.

Queue([maxsize])

Skapa ett delat queue.Queue -objekt och returnera en proxy för det.

RLock()

Skapa ett delat threading.RLock-objekt och returnera en proxy för det.

Semaphore([value])

Skapa ett delat threading.Semaphore -objekt och returnera en proxy för det.

Array(typecode, sequence)

Skapa en array och returnera en proxy för den.

Value(typecode, value)

Skapa ett objekt med ett skrivbart value-attribut och returnera en proxy för det.

dict()
dict(mapping)
dict(sequence)

Skapa ett delat dict-objekt och returnera en proxy för det.

list()
list(sequence)

Skapa ett delat list-objekt och returnera en proxy för det.

set()
set(sequence)
set(mapping)

Skapa ett delat set-objekt och returnera en proxy för det.

Tillagd i version 3.14: stöd för set har lagts till.

Ändrad i version 3.6: Delade objekt kan vara nästlade. Ett delat containerobjekt, t.ex. en delad lista, kan innehålla andra delade objekt som alla hanteras och synkroniseras av SyncManager.

class multiprocessing.managers.Namespace

En typ som kan registreras med SyncManager.

Ett namespace-objekt har inga publika metoder, men har skrivbara attribut. Dess representation visar värdena för dess attribut.

När man använder en proxy för ett namnrymdsobjekt kommer dock ett attribut som börjar med '_' att vara ett attribut för proxyn och inte ett attribut för referenten:

>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> Global = manager.Namespace()
>>> Global.x = 10
>>> Global.y = 'hello'
>>> Global._z = 12.3    # this is an attribute of the proxy
>>> print(Global)
Namespace(x=10, y='hello')

Skräddarsydda chefer

För att skapa en egen manager skapar man en subklass av BaseManager och använder klassmetoden register() för att registrera nya typer eller anropbara objekt i managerklassen. Till exempel:

från multiprocessing.managers import BaseManager

klass MathsClass:
    def add(self, x, y):
        returnerar x + y
    def mul(self, x, y):
        returnerar x * y

klass MyManager(BaseManager):
    pass

MyManager.register('Maths', MathsClass)

if __name__ == '__main__':
    med MyManager() som manager:
        maths = manager.Maths()
        print(maths.add(4, 3))         # skriver ut 7
        print(maths.mul(7, 8))         # skriver ut 56

Använda en fjärrstyrd manager

Det är möjligt att köra en manager-server på en maskin och låta klienter använda den från andra maskiner (förutsatt att de berörda brandväggarna tillåter det).

Om du kör följande kommandon skapas en server för en enda delad kö som fjärrklienter kan komma åt:

>>> from multiprocessing.managers import BaseManager
>>> from queue import Queue
>>> queue = Queue()
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue', callable=lambda:queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

En klient kan komma åt servern på följande sätt:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.put('hello')

En annan kund kan också använda den:

>>> from multiprocessing.managers import BaseManager
>>> class QueueManager(BaseManager): pass
>>> QueueManager.register('get_queue')
>>> m = QueueManager(address=('foo.bar.org', 50000), authkey=b'abracadabra')
>>> m.connect()
>>> queue = m.get_queue()
>>> queue.get()
'hello'

Lokala processer kan också komma åt den kön genom att använda koden från ovan på klienten för att komma åt den på distans:

>>> from multiprocessing import Process, Queue
>>> from multiprocessing.managers import BaseManager
>>> class Worker(Process):
...     def __init__(self, q):
...         self.q = q
...         super().__init__()
...     def run(self):
...         self.q.put('local hello')
...
>>> queue = Queue()
>>> w = Worker(queue)
>>> w.start()
>>> class QueueManager(BaseManager): pass
...
>>> QueueManager.register('get_queue', callable=lambda: queue)
>>> m = QueueManager(address=('', 50000), authkey=b'abracadabra')
>>> s = m.get_server()
>>> s.serve_forever()

Proxy-objekt

En proxy är ett objekt som refererar till ett delat objekt som lever (förmodligen) i en annan process. Det delade objektet sägs vara proxyns referent. Flera proxyobjekt kan ha samma referent.

Ett proxyobjekt har metoder som anropar motsvarande metoder hos sin referent (även om inte alla metoder hos referenten nödvändigtvis är tillgängliga via proxyn). På så sätt kan en proxy användas precis som dess referent kan:

>>> mp_context = multiprocessing.get_context('spawn')
>>> manager = mp_context.Manager()
>>> l = manager.list([i*i for i in range(10)])
>>> print(l)
[0, 1, 4, 9, 16, 25, 36, 49, 64, 81]
>>> print(repr(l))
<ListProxy object, typeid 'list' at 0x...>
>>> l[4]
16
>>> l[2:5]
[4, 9, 16]

Observera att om du använder str() på en proxy kommer representationen av referenten att returneras, medan om du använder repr() kommer representationen av proxyn att returneras.

En viktig egenskap hos proxyobjekt är att de kan plockas upp så att de kan skickas mellan processer. Som sådan kan en referent innehålla Proxy-objekt. Detta tillåter nestning av dessa hanterade listor, dicts och andra Proxy-objekt:

>>> a = manager.list()
>>> b = manager.list()
>>> a.append(b)         # referent of a now contains referent of b
>>> print(a, b)
[<ListProxy object, typeid 'list' at ...>] []
>>> b.append('hello')
>>> print(a[0], b)
['hello'] ['hello']

På samma sätt kan dict- och list-proxies vara inbäddade i varandra:

>>> l_outer = manager.list([ manager.dict() for i in range(2) ])
>>> d_first_inner = l_outer[0]
>>> d_first_inner['a'] = 1
>>> d_first_inner['b'] = 2
>>> l_outer[1]['c'] = 3
>>> l_outer[1]['z'] = 26
>>> print(l_outer[0])
{'a': 1, 'b': 2}
>>> print(l_outer[1])
{'c': 3, 'z': 26}

Om standard (icke-proxy) list- eller dict-objekt finns i en referent, kommer ändringar av dessa föränderliga värden inte att spridas genom hanteraren eftersom proxyn inte har något sätt att veta när de värden som finns i den ändras. Att lagra ett värde i en containerproxy (vilket utlöser en __setitem__ på proxyobjektet) sprids dock genom hanteraren och för att effektivt modifiera ett sådant objekt kan man därför tilldela det modifierade värdet till containerproxyn igen:

# skapa en listproxy och lägg till ett föränderligt objekt (en ordbok)
lproxy = manager.list()
lproxy.append({})
# mutera nu ordboken
d = lproxy[0]
d['a'] = 1
d['b'] = 2
# Vid den här tidpunkten är ändringarna i d ännu inte synkroniserade, men genom att
# uppdatering av ordlistan meddelas proxyn om ändringen
lproxy[0] = d

Detta tillvägagångssätt är kanske mindre bekvämt än att använda nästlade Proxy-objekt för de flesta användningsfall, men det ger också en viss kontroll över synkroniseringen.

Anteckning

Proxytyperna i multiprocessing har inget stöd för värdejämförelser. Så, till exempel, vi har:

>>> manager.list([1,2,3]) == [1,2,3]
False

Man bör bara använda en kopia av referenten istället när man gör jämförelser.

class multiprocessing.managers.BaseProxy

Proxyobjekt är instanser av underklasser till BaseProxy.

_callmethod(methodname[, args[, kwds]])

Anropa och returnera resultatet av en metod i proxyns referent.

Om proxy är en proxy vars referent är obj så är uttrycket

proxy._callmethod(metodnamn, args, kwds)

kommer att utvärdera uttrycket

getattr(obj, metodnamn)(*args, **kwds)

i chefens process.

Det returnerade värdet är en kopia av resultatet av anropet eller en proxy till ett nytt delat objekt - se dokumentation för argumentet method_to_typeid i BaseManager.register().

Om ett undantag uppstår vid anropet, uppstår det på nytt genom _callmethod(). Om något annat undantag uppstår i chefens process så konverteras detta till ett RemoteError undantag och uppstår genom _callmethod().

Observera särskilt att ett undantag kommer att uppstå om methodname inte har exposed.

Ett exempel på användning av _callmethod():

>>> l = manager.list(range(10))
>>> l._callmethod('__len__')
10
>>> l._callmethod('__getitem__', (slice(2, 7),)) # equivalent to l[2:7]
[2, 3, 4, 5, 6]
>>> l._callmethod('__getitem__', (20,))          # equivalent to l[20]
Traceback (most recent call last):
...
IndexError: list index out of range
_getvalue()

Returnera en kopia av referenten.

Om referensen inte är betningsbar kommer detta att leda till ett undantag.

__repr__()

Returnerar en representation av proxyobjektet.

__str__()

Returnera representationen av referenten.

Rensa upp

Ett proxyobjekt använder ett weakref-callback så att det avregistrerar sig från den manager som äger dess referent när det samlas in.

Ett delat objekt tas bort från managerprocessen när det inte längre finns några proxyer som hänvisar till det.

Processpooler

Med klassen Pool kan man skapa en pool av processer som utför uppgifter som skickas till den.

class multiprocessing.pool.Pool([processes[, initializer[, initargs[, maxtasksperchild[, context]]]]])

Ett processpoolobjekt som kontrollerar en pool av arbetsprocesser till vilka jobb kan skickas. Det stöder asynkrona resultat med timeouts och callbacks och har en parallell map-implementering.

processes är antalet arbetsprocesser som ska användas. Om processes är None används det antal som returneras av os.process_cpu_count().

Om initializer inte är None kommer varje arbetsprocess att anropa initializer(*initargs) när den startar.

maxtasksperchild är antalet uppgifter som en arbetsprocess kan slutföra innan den avslutas och ersätts med en ny arbetsprocess, så att oanvända resurser kan frigöras. Standardvärdet för maxtasksperchild är None, vilket innebär att arbetsprocesser kommer att leva lika länge som poolen.

context kan användas för att ange den kontext som används för att starta arbetsprocesserna. Vanligtvis skapas en pool med hjälp av funktionen multiprocessing.Pool() eller metoden Pool() i ett context-objekt. I båda fallen anges context på lämpligt sätt.

Observera att metoderna i poolobjektet endast ska anropas av den process som skapade poolen.

Varning

multiprocessing.pool-objekt har interna resurser som måste hanteras korrekt (som alla andra resurser) genom att använda poolen som en kontexthanterare eller genom att anropa close() och terminate() manuellt. Om detta inte görs kan det leda till att processen hänger sig när den ska slutföras.

Observera att det är inte korrekt att förlita sig på att skräpsamlaren förstör poolen eftersom CPython inte garanterar att poolens finalizer kommer att anropas (se object.__del__() för mer information).

Ändrad i version 3.2: Parametern maxtasksperchild har lagts till.

Ändrad i version 3.4: Parametern context har lagts till.

Ändrad i version 3.13: processes använder os.process_cpu_count() som standard, istället för os.cpu_count().

Anteckning

Arbetsprocesser inom en Pool lever normalt under hela den tid som poolens arbetskö varar. Ett vanligt mönster i andra system (t.ex. Apache, mod_wsgi, etc.) för att frigöra resurser som hålls av arbetare är att låta en arbetare i en pool endast slutföra en viss mängd arbete innan den avslutas, rensas upp och en ny process skapas för att ersätta den gamla. Argumentet maxtasksperchild till Pool gör denna möjlighet tillgänglig för slutanvändaren.

apply(func[, args[, kwds]])

Anropa func med argumenten args och nyckelordsargumenten kwds. Den blockerar tills resultatet är klart. Med tanke på dessa blockeringar är apply_async() bättre lämpad för att utföra arbete parallellt. Dessutom utförs func endast i en av poolens arbetare.

apply_async(func[, args[, kwds[, callback[, error_callback]]]])

En variant av metoden apply() som returnerar ett AsyncResult-objekt.

Om callback specificeras ska det vara en callable som accepterar ett enda argument. När resultatet är klart används callback på det, såvida inte anropet misslyckades, i vilket fall error_callback används i stället.

Om error_callback anges bör det vara en callable som accepterar ett enda argument. Om målfunktionen misslyckas anropas error_callback med undantagsinstansen.

Återkallelser bör slutföras omedelbart eftersom den tråd som hanterar resultaten annars blockeras.

map(func, iterable[, chunksize])

En parallell motsvarighet till den inbyggda funktionen map() (den stöder dock bara ett iterable-argument, för flera iterables se starmap()). Den blockerar tills resultatet är klart.

Den här metoden delar upp iterabeln i ett antal bitar som den skickar till processpoolen som separata uppgifter. Den (ungefärliga) storleken på dessa bitar kan anges genom att chunksize sätts till ett positivt heltal.

Observera att det kan orsaka hög minnesanvändning för mycket långa iterabler. Överväg att använda imap() eller imap_unordered() med explicit chunksize-alternativ för bättre effektivitet.

map_async(func, iterable[, chunksize[, callback[, error_callback]]])

En variant av metoden map() som returnerar ett AsyncResult-objekt.

Om callback specificeras ska det vara en callable som accepterar ett enda argument. När resultatet är klart används callback på det, såvida inte anropet misslyckades, i vilket fall error_callback används i stället.

Om error_callback anges bör det vara en callable som accepterar ett enda argument. Om målfunktionen misslyckas anropas error_callback med undantagsinstansen.

Återkallelser bör slutföras omedelbart eftersom den tråd som hanterar resultaten annars blockeras.

imap(func, iterable[, chunksize])

En latare version av map().

Argumentet chunksize är detsamma som det som används av metoden map(). För mycket långa iterabler kan ett stort värde för chunksize göra att jobbet slutförs mycket snabbare än om standardvärdet 1 används.

Även om chunksize är 1 så har next()-metoden för iteratorn som returneras av imap()-metoden en valfri timeout-parameter: next(timeout) kommer att ge upphov till multiprocessing.TimeoutError om resultatet inte kan returneras inom timeout sekunder.

imap_unordered(func, iterable[, chunksize])

Samma som imap() förutom att ordningsföljden på resultaten från den returnerade iteratorn bör betraktas som godtycklig. (Endast när det bara finns en arbetsprocess garanteras att ordningen är ”korrekt”)

starmap(func, iterable[, chunksize])

Som map() förutom att elementen i iterable förväntas vara iterables som packas upp som argument.

Därför resulterar en iterabel av [(1,2), (3,4)] i [func(1,2), func(3,4)].

Tillagd i version 3.3.

starmap_async(func, iterable[, chunksize[, callback[, error_callback]]])

En kombination av starmap() och map_async() som itererar över iterable av iterabler och anropar func med iterablerna uppackade. Returnerar ett resultatobjekt.

Tillagd i version 3.3.

close()

Förhindrar att fler uppgifter skickas till poolen. När alla uppgifter har slutförts kommer arbetsprocesserna att avslutas.

terminate()

Stoppar arbetsprocesserna omedelbart utan att slutföra utestående arbete. När poolobjektet skräpsamlas kommer terminate() att anropas omedelbart.

join()

Vänta på att arbetsprocesserna ska avslutas. Man måste anropa close() eller terminate() innan man använder join().

Ändrad i version 3.3: Poolobjekt stöder nu kontexthanteringsprotokollet – se Typer av kontexthanterare. __enter__() returnerar poolobjektet, och __exit__() anropar terminate().

class multiprocessing.pool.AsyncResult

Klassen för det resultat som returneras av Pool.apply_async() och Pool.map_async().

get([timeout])

Returnerar resultatet när det anländer. Om timeout inte är None och resultatet inte anländer inom timeout sekunder så kommer multiprocessing.TimeoutError att uppstå. Om fjärranropet gav upphov till ett undantag kommer detta undantag att återskapas av get().

wait([timeout])

Vänta tills resultatet är tillgängligt eller tills timeout sekunder har passerat.

ready()

Returnerar om samtalet har slutförts.

successful()

Returnerar om anropet slutfördes utan att ett undantag uppstod. Kommer att ge ValueError om resultatet inte är klart.

Ändrad i version 3.7: Om resultatet inte är klart kommer ValueError att visas istället för AssertionError.

Följande exempel visar hur en pool används:

from multiprocessing import Pool
import time

def f(x):
    return x*x

if __name__ == '__main__':
    with Pool(processes=4) as pool:         # start 4 worker processes
        result = pool.apply_async(f, (10,)) # evaluate "f(10)" asynchronously in a single process
        print(result.get(timeout=1))        # prints "100" unless your computer is *very* slow

        print(pool.map(f, range(10)))       # prints "[0, 1, 4,..., 81]"

        it = pool.imap(f, range(10))
        print(next(it))                     # prints "0"
        print(next(it))                     # prints "1"
        print(it.next(timeout=1))           # prints "4" unless your computer is *very* slow

        result = pool.apply_async(time.sleep, (10,))
        print(result.get(timeout=1))        # raises multiprocessing.TimeoutError

Lyssnare och kunder

Vanligtvis sker meddelandeöverföring mellan processer med hjälp av köer eller genom att använda Connection-objekt som returneras av Pipe().

Modulen multiprocessing.connection ger dock lite extra flexibilitet. Den ger i princip ett meddelandeorienterat API på hög nivå för hantering av sockets eller Windows named pipes. Den har också stöd för digest authentication med modulen hmac, och för polling av flera anslutningar samtidigt.

multiprocessing.connection.deliver_challenge(connection, authkey)

Skicka ett slumpmässigt genererat meddelande till den andra änden av anslutningen och vänta på svar.

Om svaret matchar sammanfattningen av meddelandet med authkey som nyckel skickas ett välkomstmeddelande till den andra änden av anslutningen. I annat fall genereras AuthenticationError.

multiprocessing.connection.answer_challenge(connection, authkey)

Ta emot ett meddelande, beräkna sammanfattningen av meddelandet med authkey som nyckel och skicka sedan tillbaka sammanfattningen.

Om ett välkomstmeddelande inte tas emot kommer AuthenticationError att uppstå.

multiprocessing.connection.Client(address[, family[, authkey]])

Försök att upprätta en anslutning till lyssnaren som använder adressen address, returnerar en Connection.

Typen av anslutning bestäms av family-argumentet, men detta kan i allmänhet utelämnas eftersom det oftast kan härledas från formatet på address. (Se Adressformat)

Om authkey anges och inte None, bör den vara en bytesträng och kommer att användas som hemlig nyckel för en HMAC-baserad autentiseringsutmaning. Ingen autentisering görs om authkey är None. AuthenticationError genereras om autentiseringen misslyckas. Se Autentiseringsnycklar.

class multiprocessing.connection.Listener([address[, family[, backlog[, authkey]]]])

Ett omslag för en bound socket eller Windows named pipe som ”lyssnar” på anslutningar.

address är den adress som ska användas av lyssnarobjektets bundna socket eller namngivna pipe.

Anteckning

Om en adress på ”0.0.0.0” används kommer adressen inte att vara en anslutningsbar slutpunkt i Windows. Om du behöver en anslutningsbar slutpunkt bör du använda ”127.0.0.1”.

family är den typ av socket (eller named pipe) som ska användas. Detta kan vara en av strängarna 'AF_INET' (för ett TCP-socket), 'AF_UNIX' (för ett Unix-domän-socket) eller 'AF_PIPE' (för en Windows named pipe). Av dessa är det bara den första som garanterat är tillgänglig. Om family är None så härleds familjen från formatet på address. Om address också är None väljs ett standardvärde. Denna standard är den familj som antas vara den snabbaste tillgängliga. Se Adressformat. Observera att om family är 'AF_UNIX' och adress är None så kommer uttaget att skapas i en privat temporär katalog som skapas med tempfile.mkstemp().

Om lyssnarobjektet använder ett uttag skickas backlog (1 som standard) till listen()-metoden för uttaget när det har bundits.

Om authkey anges och inte None, bör den vara en bytesträng och kommer att användas som hemlig nyckel för en HMAC-baserad autentiseringsutmaning. Ingen autentisering görs om authkey är None. AuthenticationError genereras om autentiseringen misslyckas. Se Autentiseringsnycklar.

accept()

Acceptera en anslutning på lyssnarobjektets bundna socket eller namngivna pipe och returnera ett Connection-objekt. Om autentiseringsförsök misslyckas genereras AuthenticationError.

close()

Stäng lyssnarobjektets bundna socket eller namngivna pipe. Detta anropas automatiskt när lyssnaren garbage collectas. Det är dock tillrådligt att anropa det explicit.

Listener-objekt har följande skrivskyddade egenskaper:

address

Den adress som används av Listener-objektet.

last_accepted

Den adress som den senast accepterade anslutningen kom från. Om den inte är tillgänglig är den None.

Ändrad i version 3.3: Lyssnarobjekt stöder nu kontexthanteringsprotokollet – se Typer av kontexthanterare. __enter__() returnerar lyssnarobjektet, och __exit__() anropar close().

multiprocessing.connection.wait(object_list, timeout=None)

Vänta tills ett objekt i object_list är klart. Returnerar listan över de objekt i object_list som är klara. Om timeout är en float blockeras anropet i högst så många sekunder. Om timeout är None blockeras anropet under obegränsad tid. En negativ timeout är likvärdig med en timeout på noll.

För både POSIX och Windows kan ett objekt visas i object_list om det är

En anslutning eller ett socketobjekt är klart när det finns data att läsa från det, eller när den andra änden har stängts.

POSIX: wait(object_list, timeout) nästan likvärdig select.select(object_list, [], [], timeout). Skillnaden är att om select.select() avbryts av en signal, kan den ge upphov till OSError med ett felnummer på EINTR, medan wait() inte gör det.

Windows: Ett objekt i object_list måste antingen vara ett heltalshandtag som är väntbart (enligt den definition som används i dokumentationen för Win32-funktionen WaitForMultipleObjects()) eller så kan det vara ett objekt med en fileno()-metod som returnerar ett socket-handtag eller pipe-handtag. (Observera att pipe- och socket-handtag inte är väntande handtag)

Tillagd i version 3.3.

Exempel

Följande serverkod skapar en lyssnare som använder 'hemligt lösenord' som autentiseringsnyckel. Den väntar sedan på en anslutning och skickar några data till klienten:

from multiprocessing.connection import Listener
from array import array

address = ('localhost', 6000)     # family is deduced to be 'AF_INET'

with Listener(address, authkey=b'secret password') as listener:
    with listener.accept() as conn:
        print('connection accepted from', listener.last_accepted)

        conn.send([2.25, None, 'junk', float])

        conn.send_bytes(b'hello')

        conn.send_bytes(array('i', [42, 1729]))

Följande kod ansluter till servern och tar emot vissa data från servern:

from multiprocessing.connection import Client
from array import array

address = ('localhost', 6000)

with Client(address, authkey=b'secret password') as conn:
    print(conn.recv())                  # => [2.25, None, 'junk', float]

    print(conn.recv_bytes())            # => 'hello'

    arr = array('i', [0, 0, 0, 0, 0])
    print(conn.recv_bytes_into(arr))    # => 8
    print(arr)                          # => array('i', [42, 1729, 0, 0, 0])

Följande kod använder wait() för att vänta på meddelanden från flera processer samtidigt:

från multiprocessing import Process, Pipe, current_process
från multiprocessing.connection import wait

def foo(w):
    för i i intervall(10):
        w.send((i, current_process().name))
    w.close()

if __name__ == '__main__':
    läsare = []

    för i i intervall(4):
        r, w = Pipe(duplex=False)
        readers.append(r)
        p = Process(mål=foo, args=(w,))
        p.start()
        # Vi stänger den skrivbara änden av röret nu för att vara säkra på att
        # p är den enda process som äger ett handtag för den.  Detta
        # säkerställer att när p stänger sitt handtag för den skrivbara änden
        # kommer wait() omedelbart att rapportera att den läsbara änden är klar.
        w.close()

    medan läsare:
        for r in wait(readers):
            försök:
                msg = r.recv()
            utom EOFError:
                readers.remove(r)
            else:
                print(msg)

Adressformat

  • En 'AF_INET'-adress är en tupel av formen (hostname, port) där hostname är en sträng och port är ett heltal.

  • En 'AF_UNIX'-adress är en sträng som representerar ett filnamn i filsystemet.

  • En 'AF_PIPE'-adress är en sträng av formen r'\\.\pipe\PipeName'. Om du vill använda Client() för att ansluta till en namngiven pipe på en fjärrdator som heter ServerName bör du använda en adress av formen r'\\ServerName\pipe\PipeName' istället.

Observera att alla strängar som börjar med två backslash antas som standard vara en 'AF_PIPE'-adress i stället för en 'AF_UNIX'-adress.

Autentiseringsnycklar

När man använder Connection.recv, avplockas automatiskt de data som tas emot. Tyvärr är det en säkerhetsrisk att avpickla data från en icke betrodd källa. Därför använder Listener och Client() modulen hmac för att tillhandahålla digest-autentisering.

En autentiseringsnyckel är en byte-sträng som kan betraktas som ett lösenord: när en anslutning har upprättats kommer båda ändarna att kräva bevis för att den andra känner till autentiseringsnyckeln. (Att visa att båda ändarna använder samma nyckel innebär inte att nyckeln skickas över anslutningen)

Om autentisering begärs men ingen autentiseringsnyckel anges används returvärdet för current_process().authkey (se Process). Detta värde kommer automatiskt att ärvas av alla Process-objekt som den aktuella processen skapar. Detta innebär att (som standard) alla processer i ett multiprocessprogram kommer att dela en enda autentiseringsnyckel som kan användas när de upprättar anslutningar sinsemellan.

Lämpliga autentiseringsnycklar kan också genereras med hjälp av os.urandom().

Loggning

Visst stöd för loggning finns tillgängligt. Observera dock att paketet logging inte använder processdelade lås så det är möjligt (beroende på typ av hanterare) att meddelanden från olika processer blandas ihop.

multiprocessing.get_logger()

Returnerar den logger som används av multiprocessing. Om det behövs skapas en ny.

När loggern skapas första gången har den nivån logging.NOTSET och ingen standardhanterare. Meddelanden som skickas till denna logger kommer som standard inte att spridas till rotloggaren.

Observera att i Windows ärver barnprocesser endast nivån på den överordnade processens logger - alla andra anpassningar av loggern ärvs inte.

multiprocessing.log_to_stderr(level=None)

Denna funktion utför ett anrop till get_logger() men förutom att returnera den logger som skapats av get_logger lägger den till en hanterare som skickar utdata till sys.stderr med formatet '[%(levelname)s/%(processName)s] %(message)s'. Du kan ändra levelname för loggern genom att skicka ett level-argument.

Nedan visas ett exempel på en session med loggning aktiverad:

>>> import multiprocessing, logging
>>> logger = multiprocessing.log_to_stderr()
>>> logger.setLevel(logging.INFO)
>>> logger.warning('doomed')
[WARNING/MainProcess] doomed
>>> m = multiprocessing.Manager()
[INFO/SyncManager-...] child process calling self.run()
[INFO/SyncManager-...] created temp directory /.../pymp-...
[INFO/SyncManager-...] manager serving at '/.../listener-...'
>>> del m
[INFO/MainProcess] sending shutdown message to manager
[INFO/SyncManager-...] manager exiting with exitcode 0

En fullständig tabell över loggningsnivåer finns i modulen logging.

Modulen multiprocessing.dummy

multiprocessing.dummy replikerar API:t för multiprocessing men är inte mer än ett omslag runt modulen threading.

I synnerhet funktionen Pool som tillhandahålls av multiprocessing.dummy returnerar en instans av ThreadPool, som är en underklass till Pool som stöder alla samma metodanrop men använder en pool av arbetartrådar i stället för arbetsprocesser.

class multiprocessing.pool.ThreadPool([processes[, initializer[, initargs]]])

Ett trådpoolsobjekt som styr en pool av arbetstrådar som jobb kan skickas till. ThreadPool-instanser är helt gränssnittskompatibla med Pool-instanser och deras resurser måste också hanteras korrekt, antingen genom att använda poolen som en kontexthanterare eller genom att anropa close() och terminate() manuellt.

processes är antalet arbetstrådar som ska användas. Om processes är None används det antal som returneras av os.process_cpu_count().

Om initializer inte är None kommer varje arbetsprocess att anropa initializer(*initargs) när den startar.

Till skillnad från Pool kan maxtasksperchild och context inte anges.

Anteckning

En ThreadPool delar samma gränssnitt som Pool, som är utformad kring en pool av processer och föregår introduktionen av concurrent.futures-modulen. Som sådan ärver den vissa operationer som inte är meningsfulla för en pool som backas upp av trådar, och den har en egen typ för att representera statusen för asynkrona jobb, AsyncResult, som inte förstås av några andra bibliotek.

Användare bör i allmänhet föredra att använda concurrent.futures.ThreadPoolExecutor, som har ett enklare gränssnitt som utformades kring trådar från början och som returnerar concurrent.futures.Future-instanser som är kompatibla med många andra bibliotek, inklusive asyncio.

Riktlinjer för programmering

Det finns vissa riktlinjer och idiom som bör följas vid användning av multiprocessing.

Alla startmetoder

Följande gäller för alla startmetoder.

Undvik delat tillstånd

Så långt det är möjligt bör man försöka undvika att flytta stora mängder data mellan processer.

Det är förmodligen bäst att hålla sig till att använda köer eller pipes för kommunikation mellan processer snarare än att använda synkroniseringsprimitiven på lägre nivå.

Inläggning

Säkerställ att argumenten till metoderna för proxies är picklbara.

Trådsäkerhet för proxyer

Använd inte ett proxyobjekt från mer än en tråd om du inte skyddar det med ett lås.

(Det är aldrig något problem att olika processer använder samma proxy)

Ansluta sig till zombieprocesser

På POSIX blir en process en zombie när den är klar men inte har blivit joinad. Det bör aldrig finnas särskilt många eftersom varje gång en ny process startar (eller active_children() anropas) kommer alla avslutade processer som ännu inte har förenats att förenas. Även anrop av en avslutad process Process.is_alive kommer att ansluta processen. Trots detta är det förmodligen god praxis att uttryckligen joina alla processer som du startar.

Bättre att ärva än att plocka/ta bort

När man använder startmetoderna spawn eller forkserver måste många typer från multiprocessing vara picklable så att barnprocesser kan använda dem. Man bör dock i allmänhet undvika att skicka delade objekt till andra processer med hjälp av pipes eller köer. Istället bör man arrangera programmet så att en process som behöver tillgång till en delad resurs som skapats någon annanstans kan ärva den från en förfadersprocess.

Undvik att avsluta processer

Om du använder metoden Process.terminate för att stoppa en process kan det leda till att delade resurser (t.ex. lås, semaforer, rör och köer) som för närvarande används av processen blir trasiga eller otillgängliga för andra processer.

Därför är det förmodligen bäst att bara överväga att använda Process.terminate på processer som aldrig använder några delade resurser.

Ansluta sig till processer som använder köer

Tänk på att en process som har lagt objekt i en kö kommer att vänta innan den avslutas tills alla buffrade objekt har matats av ”feeder”-tråden till den underliggande pipen. (Barnprocessen kan anropa Queue.cancel_join_thread-metoden för kön för att undvika detta beteende)

Detta innebär att när du använder en kö måste du se till att alla objekt som har lagts i kön så småningom kommer att tas bort innan processen ansluts. Annars kan du inte vara säker på att processer som har lagt objekt i kön kommer att avslutas. Kom också ihåg att icke-daemoniska processer kommer att anslutas automatiskt.

Ett exempel som kommer att låsa sig är följande:

från multiprocessing import Process, 

def f(q):
    q.put('X' * 1000000)

if __name__ == '__main__':
    queue = Queue()
    p = Process(mål=f, args=(,))
    p.start()
    p.join() # detta leder till dödläge
    obj = .get()

En lösning här skulle vara att byta ut de två sista raderna (eller helt enkelt ta bort raden p.join())).

Explicit skicka resurser till underordnade processer

På POSIX med startmetoden fork kan en underordnad process använda en delad resurs som skapats i en överordnad process med hjälp av en global resurs. Det är dock bättre att skicka objektet som ett argument till konstruktören för barnprocessen.

Förutom att göra koden (potentiellt) kompatibel med Windows och de andra startmetoderna säkerställer detta också att så länge barnprocessen fortfarande är vid liv kommer objektet inte att samlas in i den överordnade processen. Detta kan vara viktigt om någon resurs frigörs när objektet garbage collectas i den överordnade processen.

Så till exempel

från multiprocessing import Process, Lås

def f():
    ... gör något med hjälp av "lock" ...

if __name__ == '__main__':
    lock = Lock()
    för i i intervall(10):
        Process(mål=f).start()

bör skrivas om till

från multiprocessing import Process, Lås

def f(l):
    ... gör något med hjälp av "l" ...

if __name__ == '__main__':
    lock = Lock()
    för i i intervall(10):
        Process(mål=f, args=(lock,)).start()

Se upp så att du inte ersätter sys.stdin med ett ”filliknande objekt”

multiprocessing ursprungligen ovillkorligen anropad:

os.close(sys.stdin.fileno())

i metoden multiprocessing.Process._bootstrap() — detta resulterade i problem med processer-i-processer. Detta har ändrats till:

sys.stdin.close()
sys.stdin = open(os.open(os.devnull, os.O_RDONLY), closefd=False)

Detta löser det grundläggande problemet med processer som kolliderar med varandra och resulterar i ett fel med dålig filbeskrivare, men introducerar en potentiell fara för applikationer som ersätter sys.stdin() med ett ”filliknande objekt” med buffring av utdata. Denna fara är att om flera processer anropar close() på detta filliknande objekt, kan det resultera i att samma data spolas till objektet flera gånger, vilket resulterar i korruption.

Om du skriver ett filliknande objekt och implementerar din egen cachelagring kan du göra den gaffelsäker genom att lagra pid när du lägger till något i cachen och kassera cachen när pid ändras. Till exempel:

@egenskap
def cache(self):
    pid = os.getpid()
    if pid != self._pid:
        self._pid = pid
        self._cache = []
    returnera self._cache

För mer information, se bpo-5155, bpo-5313 och bpo-5331

Startmetoderna spawn och forkserver

Det finns några extra begränsningar som inte gäller för startmetoden fork.

Mer betningsbarhet

Se till att alla argument till Process är betningsbara. Om du subklassar Process.__init__ måste du också se till att instanser kommer att vara betningsbara när metoden Process.start anropas.

Globala variabler

Tänk på att om kod som körs i en underordnad process försöker komma åt en global variabel, så kanske det värde som visas (om det finns något) inte är detsamma som värdet i den överordnade processen vid den tidpunkt då Process.start anropades.

Globala variabler som bara är konstanter på modulnivå orsakar dock inga problem.

Säker import av huvudmodul

Se till att huvudmodulen kan importeras på ett säkert sätt av en ny Python-tolk utan att orsaka oavsiktliga bieffekter (t.ex. att starta en ny process).

Om du till exempel använder startmetoden spawn eller forkserver och kör följande modul skulle det misslyckas med RuntimeError:

från multiprocessing import Process

def foo():
    print('hallå')

p = Process(mål=foo)
p.start()

Istället bör man skydda programmets ”ingångspunkt” genom att använda if __name__ == '__main__': på följande sätt:

from multiprocessing import Process, freeze_support, set_start_method

def foo():
    print('hello')

if __name__ == '__main__':
    freeze_support()
    set_start_method('spawn')
    p = Process(mål=foo)
    p.start()

(Raden freeze_support() kan utelämnas om programmet ska köras normalt istället för att frysas)

Detta gör att den nystartade Python-tolken kan importera modulen på ett säkert sätt och sedan köra modulens funktion foo().

Liknande begränsningar gäller om en pool eller manager skapas i huvudmodulen.

Exempel

Demonstration av hur man skapar och använder anpassade hanterare och fullmakter:

from multiprocessing import freeze_support
from multiprocessing.managers import BaseManager, BaseProxy
import operator

##

class Foo:
    def f(self):
        print('you called Foo.f()')
    def g(self):
        print('you called Foo.g()')
    def _h(self):
        print('you called Foo._h()')

# A simple generator function
def baz():
    for i in range(10):
        yield i*i

# Proxy type for generator objects
class GeneratorProxy(BaseProxy):
    _exposed_ = ['__next__']
    def __iter__(self):
        return self
    def __next__(self):
        return self._callmethod('__next__')

# Function to return the operator module
def get_operator_module():
    return operator

##

class MyManager(BaseManager):
    pass

# register the Foo class; make `f()` and `g()` accessible via proxy
MyManager.register('Foo1', Foo)

# register the Foo class; make `g()` and `_h()` accessible via proxy
MyManager.register('Foo2', Foo, exposed=('g', '_h'))

# register the generator function baz; use `GeneratorProxy` to make proxies
MyManager.register('baz', baz, proxytype=GeneratorProxy)

# register get_operator_module(); make public functions accessible via proxy
MyManager.register('operator', get_operator_module)

##

def test():
    manager = MyManager()
    manager.start()

    print('-' * 20)

    f1 = manager.Foo1()
    f1.f()
    f1.g()
    assert not hasattr(f1, '_h')
    assert sorted(f1._exposed_) == sorted(['f', 'g'])

    print('-' * 20)

    f2 = manager.Foo2()
    f2.g()
    f2._h()
    assert not hasattr(f2, 'f')
    assert sorted(f2._exposed_) == sorted(['g', '_h'])

    print('-' * 20)

    it = manager.baz()
    for i in it:
        print('<%d>' % i, end=' ')
    print()

    print('-' * 20)

    op = manager.operator()
    print('op.add(23, 45) =', op.add(23, 45))
    print('op.pow(2, 94) =', op.pow(2, 94))
    print('op._exposed_ =', op._exposed_)

##

if __name__ == '__main__':
    freeze_support()
    test()

Använder Pool:

import multiprocessing
import time
import random
import sys

#
# Functions used by test code
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % (
        multiprocessing.current_process().name,
        func.__name__, args, result
        )

def calculatestar(args):
    return calculate(*args)

def mul(a, b):
    time.sleep(0.5 * random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5 * random.random())
    return a + b

def f(x):
    return 1.0 / (x - 5.0)

def pow3(x):
    return x ** 3

def noop(x):
    pass

#
# Test code
#

def test():
    PROCESSES = 4
    print('Creating pool with %d processes\n' % PROCESSES)

    with multiprocessing.Pool(PROCESSES) as pool:
        #
        # Tests
        #

        TASKS = [(mul, (i, 7)) for i in range(10)] + \
                [(plus, (i, 8)) for i in range(10)]

        results = [pool.apply_async(calculate, t) for t in TASKS]
        imap_it = pool.imap(calculatestar, TASKS)
        imap_unordered_it = pool.imap_unordered(calculatestar, TASKS)

        print('Ordered results using pool.apply_async():')
        for r in results:
            print('\t', r.get())
        print()

        print('Ordered results using pool.imap():')
        for x in imap_it:
            print('\t', x)
        print()

        print('Unordered results using pool.imap_unordered():')
        for x in imap_unordered_it:
            print('\t', x)
        print()

        print('Ordered results using pool.map() --- will block till complete:')
        for x in pool.map(calculatestar, TASKS):
            print('\t', x)
        print()

        #
        # Test error handling
        #

        print('Testing error handling:')

        try:
            print(pool.apply(f, (5,)))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.apply()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(pool.map(f, list(range(10))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from pool.map()')
        else:
            raise AssertionError('expected ZeroDivisionError')

        try:
            print(list(pool.imap(f, list(range(10)))))
        except ZeroDivisionError:
            print('\tGot ZeroDivisionError as expected from list(pool.imap())')
        else:
            raise AssertionError('expected ZeroDivisionError')

        it = pool.imap(f, list(range(10)))
        for i in range(10):
            try:
                x = next(it)
            except ZeroDivisionError:
                if i == 5:
                    pass
            except StopIteration:
                break
            else:
                if i == 5:
                    raise AssertionError('expected ZeroDivisionError')

        assert i == 9
        print('\tGot ZeroDivisionError as expected from IMapIterator.next()')
        print()

        #
        # Testing timeouts
        #

        print('Testing ApplyResult.get() with timeout:', end=' ')
        res = pool.apply_async(calculate, TASKS[0])
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % res.get(0.02))
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()

        print('Testing IMapIterator.next() with timeout:', end=' ')
        it = pool.imap(calculatestar, TASKS)
        while 1:
            sys.stdout.flush()
            try:
                sys.stdout.write('\n\t%s' % it.next(0.02))
            except StopIteration:
                break
            except multiprocessing.TimeoutError:
                sys.stdout.write('.')
        print()
        print()


if __name__ == '__main__':
    multiprocessing.freeze_support()
    test()

Ett exempel som visar hur man använder köer för att mata uppgifter till en samling arbetsprocesser och samla in resultaten:

import time
import random

from multiprocessing import Process, Queue, current_process, freeze_support

#
# Function run by worker processes
#

def worker(input, output):
    for func, args in iter(input.get, 'STOP'):
        result = calculate(func, args)
        output.put(result)

#
# Function used to calculate result
#

def calculate(func, args):
    result = func(*args)
    return '%s says that %s%s = %s' % \
        (current_process().name, func.__name__, args, result)

#
# Functions referenced by tasks
#

def mul(a, b):
    time.sleep(0.5*random.random())
    return a * b

def plus(a, b):
    time.sleep(0.5*random.random())
    return a + b

#
#
#

def test():
    NUMBER_OF_PROCESSES = 4
    TASKS1 = [(mul, (i, 7)) for i in range(20)]
    TASKS2 = [(plus, (i, 8)) for i in range(10)]

    # Create queues
    task_queue = Queue()
    done_queue = Queue()

    # Submit tasks
    for task in TASKS1:
        task_queue.put(task)

    # Start worker processes
    for i in range(NUMBER_OF_PROCESSES):
        Process(target=worker, args=(task_queue, done_queue)).start()

    # Get and print results
    print('Unordered results:')
    for i in range(len(TASKS1)):
        print('\t', done_queue.get())

    # Add more tasks using `put()`
    for task in TASKS2:
        task_queue.put(task)

    # Get and print some more results
    for i in range(len(TASKS2)):
        print('\t', done_queue.get())

    # Tell child processes to stop
    for i in range(NUMBER_OF_PROCESSES):
        task_queue.put('STOP')


if __name__ == '__main__':
    freeze_support()
    test()