Multi-thread vs. Multi-process
Sumarize
语法条目 | Multi-threading | Multi-processing |
---|---|---|
引入模块 | from threading import Thread | from multiprocessing import Process |
新建 启动 等待结束 | t=Thread(target=func,args=tuple) t.start() t.join() | p=Process(target=func,args=tuple) p.start() p.join() |
数据通信 | from queue import Queue q=Queue() q.put(item) item=q.get() | from multiprocessing import Queue q=Queue() q.put(item) item=q.get() |
线程安全加锁 | from threading import Lock lock=Lock() method 1 with lock: # do something method 2 lock.acquire() # do something lock.release() | from multiprocessing import Lock lock=Lock() method 1 with lock: # do something method 2 lock.acquire() # do something lock.release() |
池化技术 | from concurrent.futures import ThreadPoolExecutor with ThreadPoolExecutor() as executor: method 1 results = executor.map(func, [1,2,3]) method 2 future = executor.submit(func,1) result = future.result() | from concurrent.futures import ProcessPoolExecutor with ProcessPoolExecutor() as executor: method 1 results = executor.map(func, [1,2,3]) method 2 future = executor.submit(func,1) result = future.result() |
threading
This module constructs higher-level threading interfaces on top of the lower level _thread
module. See also the queue
module.
Lock
A primitive lock is a synchronization primitive that is not owned by a particular thread when locked. In Python, it is currently the lowest level synchronization primitive available, implemented directly by the _thread
extension module.
A primitive lock is in one of two states, “locked” or “unlocked”. It is created in the unlocked state. It has two basic methods, acquire()
and release()
. When the state is unlocked, acquire()
changes the state to locked and returns immediately. When the state is locked, acquire()
blocks until a call to release()
in another thread changes it to unlocked, then the acquire()
call resets it to locked and returns. The release()
method should only be called in the locked state; it changes the state to unlocked and returns immediately. If an attempt is made to release an unlocked lock, a RuntimeError
will be raised.
import threading
from time import sleep
lock = threading.Lock()
class Account:
def __init__(self, balance: int = 1000) -> None:
self.balance = balance
def draw_1(account_: Account, amount: int):
lock.acquire()
if account_.balance >= amount:
sleep(0.1)
account_.balance -= amount
print(f'Success!\tthread name = {threading.current_thread().name} balance = {account_.balance}')
else:
print(f'Fail!\tthread name = {threading.current_thread().name} balance = {account_.balance}')
lock.release()
def draw_2(account_: Account, amount: int):
with lock:
if account_.balance >= amount:
sleep(0.1)
account_.balance -= amount
print(f'Success!\tthread name = {threading.current_thread().name} balance = {account_.balance}')
else:
print(f'Fail!\tthread name = {threading.current_thread().name} balance = {account_.balance}')
if __name__ == '__main__':
account = Account(1000)
t1 = threading.Thread(target=draw_2, args=(account, 800), name='t1')
t2 = threading.Thread(target=draw_2, args=(account, 800), name='t2')
t1.start()
t2.start()
t1.join()
t2.join()
multiprocessing
The multiprocessing
package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing
module allows the programmer to fully leverage multiple processors on a given machine.
Lock
multiprocessing
contains equivalents of all the synchronization primitives from threading
. For instance one can use a lock to ensure that only one process prints to standard output at a time:
from multiprocessing import Process, Lock
def f1(l, i):
l.acquire()
try:
print('hello world', i)
finally:
l.release()
def f2(l, i):
with l:
print('hello world', i)
if __name__ == '__main__':
lock = Lock()
for num in range(10):
# Process(target=f1, args=(lock, num)).start()
Process(target=f2, args=(lock, num)).start()
Without using the lock output from the different processes is liable to get all mixed up.
Sharing state between processes
As mentioned above, when doing concurrent programming it is usually best to avoid using shared state as far as possible. This is particularly true when using multiple processes.
However, if you really do need to use some shared data then multiprocessing
provides a couple of ways of doing so.
Shared memory
Data can be stored in a shared memory map using Value
or Array
. For example, the following code
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = 3.1415927
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0)
arr = Array('i', range(10))
p = Process(target=f, args=(num, arr))
p.start()
p.join()
print(num.value)
print(arr[:])
will print
3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
Exchanging objects between processes
multiprocessing
supports two types of communication channel between processes:
Queues
The Queue
class is a near clone of queue.Queue
. For example:
from multiprocessing import Process, Queue
def f(q):
q.put([42, None, 'hello'])
if __name__ == '__main__':
q = Queue()
p = Process(target=f, args=(q,))
p.start()
print(q.get()) # prints "[42, None, 'hello']"
p.join()
Queues are thread and process safe.
Pipes
The Pipe()
function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:
from multiprocessing import Process, Pipe
def f(conn):
conn.send([42, None, 'hello'])
conn.close()
if __name__ == '__main__':
parent_conn, child_conn = Pipe()
p = Process(target=f, args=(child_conn,))
p.start()
print(parent_conn.recv()) # prints "[42, None, 'hello']"
p.join()
The two connection objects returned by Pipe()
represent the two ends of the pipe. Each connection object has send()
and recv()
methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.
concurrent.futures
ThreadPoolExecutor Example
import concurrent.futures
import urllib.request
URLS = ['http://www.foxnews.com/',
'http://www.cnn.com/',
'http://europe.wsj.com/',
'http://www.bbc.co.uk/',
'http://some-made-up-domain.com/']
# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
with urllib.request.urlopen(url, timeout=timeout) as conn:
return conn.read()
# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
# Start the load operations and mark each future with its URL
future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
for future in concurrent.futures.as_completed(future_to_url):
url = future_to_url[future]
try:
data = future.result()
except Exception as exc:
print('%r generated an exception: %s' % (url, exc))
else:
print('%r page is %d bytes' % (url, len(data)))
ProcessPoolExecutor Example
import concurrent.futures
import math
PRIMES = [
112272535095293,
112582705942171,
112272535095293,
115280095190773,
115797848077099,
1099726899285419]
def is_prime(n):
if n < 2:
return False
if n == 2:
return True
if n % 2 == 0:
return False
sqrt_n = int(math.floor(math.sqrt(n)))
for i in range(3, sqrt_n + 1, 2):
if n % i == 0:
return False
return True
def main():
with concurrent.futures.ProcessPoolExecutor() as executor:
for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
print('%d is prime: %s' % (number, prime))
if __name__ == '__main__':
main()