0
点赞
收藏
分享

微信扫一扫

Python中的多线程与多进程

Multi-thread vs. Multi-process

Sumarize

语法条目Multi-threadingMulti-processing
引入模块from threading import Threadfrom multiprocessing import Process
新建
启动
等待结束
t=Thread(target=func,args=tuple)
t.start()
t.join()
p=Process(target=func,args=tuple)
p.start()
p.join()
数据通信from queue import Queue
q=Queue()
q.put(item)
item=q.get()
from multiprocessing import Queue
q=Queue()
q.put(item)
item=q.get()
线程安全加锁from threading import Lock
lock=Lock()
method 1
with lock:
# do something
method 2
lock.acquire()
# do something
lock.release()
from multiprocessing import Lock
lock=Lock()
method 1
with lock:
# do something
method 2
lock.acquire()
# do something
lock.release()
池化技术from concurrent.futures import ThreadPoolExecutor
with ThreadPoolExecutor() as executor:
method 1
results = executor.map(func, [1,2,3])
method 2
future = executor.submit(func,1)
result = future.result()
from concurrent.futures import ProcessPoolExecutor
with ProcessPoolExecutor() as executor:
method 1
results = executor.map(func, [1,2,3])
method 2
future = executor.submit(func,1)
result = future.result()

threading

This module constructs higher-level threading interfaces on top of the lower level _thread module. See also the queue module.

Lock

A primitive lock is a synchronization primitive that is not owned by a particular thread when locked. In Python, it is currently the lowest level synchronization primitive available, implemented directly by the _thread extension module.

A primitive lock is in one of two states, “locked” or “unlocked”. It is created in the unlocked state. It has two basic methods, acquire() and release(). When the state is unlocked, acquire() changes the state to locked and returns immediately. When the state is locked, acquire() blocks until a call to release() in another thread changes it to unlocked, then the acquire() call resets it to locked and returns. The release() method should only be called in the locked state; it changes the state to unlocked and returns immediately. If an attempt is made to release an unlocked lock, a RuntimeError will be raised.

import threading
from time import sleep

lock = threading.Lock()

class Account:
    def __init__(self, balance: int = 1000) -> None:
        self.balance = balance

def draw_1(account_: Account, amount: int):
    lock.acquire()
    if account_.balance >= amount:
        sleep(0.1)
        account_.balance -= amount
        print(f'Success!\tthread name = {threading.current_thread().name} balance = {account_.balance}')
    else:
        print(f'Fail!\tthread name = {threading.current_thread().name} balance = {account_.balance}')
    lock.release()

def draw_2(account_: Account, amount: int):
    with lock:
        if account_.balance >= amount:
            sleep(0.1)
            account_.balance -= amount
            print(f'Success!\tthread name = {threading.current_thread().name} balance = {account_.balance}')
        else:
            print(f'Fail!\tthread name = {threading.current_thread().name} balance = {account_.balance}')

if __name__ == '__main__':
    account = Account(1000)
    t1 = threading.Thread(target=draw_2, args=(account, 800), name='t1')
    t2 = threading.Thread(target=draw_2, args=(account, 800), name='t2')
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    

multiprocessing

The multiprocessing package offers both local and remote concurrency, effectively side-stepping the Global Interpreter Lock by using subprocesses instead of threads. Due to this, the multiprocessing module allows the programmer to fully leverage multiple processors on a given machine.

Lock

multiprocessing contains equivalents of all the synchronization primitives from threading. For instance one can use a lock to ensure that only one process prints to standard output at a time:

from multiprocessing import Process, Lock

def f1(l, i):
    l.acquire()
    try:
        print('hello world', i)
    finally:
        l.release()

def f2(l, i):
    with l:
        print('hello world', i)

if __name__ == '__main__':
    lock = Lock()

    for num in range(10):
        # Process(target=f1, args=(lock, num)).start()
        Process(target=f2, args=(lock, num)).start()

Without using the lock output from the different processes is liable to get all mixed up.

Sharing state between processes

As mentioned above, when doing concurrent programming it is usually best to avoid using shared state as far as possible. This is particularly true when using multiple processes.

However, if you really do need to use some shared data then multiprocessing provides a couple of ways of doing so.

Shared memory

Data can be stored in a shared memory map using Value or Array. For example, the following code

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = 3.1415927
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0)
    arr = Array('i', range(10))

    p = Process(target=f, args=(num, arr))
    p.start()
    p.join()

    print(num.value)
    print(arr[:])

will print

3.1415927
[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]

Exchanging objects between processes

multiprocessing supports two types of communication channel between processes:

Queues

The Queue class is a near clone of queue.Queue. For example:

from multiprocessing import Process, Queue

def f(q):
    q.put([42, None, 'hello'])

if __name__ == '__main__':
    q = Queue()
    p = Process(target=f, args=(q,))
    p.start()
    print(q.get())    # prints "[42, None, 'hello']"
    p.join()

Queues are thread and process safe.

Pipes

The Pipe() function returns a pair of connection objects connected by a pipe which by default is duplex (two-way). For example:

from multiprocessing import Process, Pipe

def f(conn):
    conn.send([42, None, 'hello'])
    conn.close()

if __name__ == '__main__':
    parent_conn, child_conn = Pipe()
    p = Process(target=f, args=(child_conn,))
    p.start()
    print(parent_conn.recv())   # prints "[42, None, 'hello']"
    p.join()

The two connection objects returned by Pipe() represent the two ends of the pipe. Each connection object has send() and recv() methods (among others). Note that data in a pipe may become corrupted if two processes (or threads) try to read from or write to the same end of the pipe at the same time. Of course there is no risk of corruption from processes using different ends of the pipe at the same time.

concurrent.futures

ThreadPoolExecutor Example

import concurrent.futures
import urllib.request

URLS = ['http://www.foxnews.com/',
        'http://www.cnn.com/',
        'http://europe.wsj.com/',
        'http://www.bbc.co.uk/',
        'http://some-made-up-domain.com/']

# Retrieve a single page and report the URL and contents
def load_url(url, timeout):
    with urllib.request.urlopen(url, timeout=timeout) as conn:
        return conn.read()

# We can use a with statement to ensure threads are cleaned up promptly
with concurrent.futures.ThreadPoolExecutor(max_workers=5) as executor:
    # Start the load operations and mark each future with its URL
    future_to_url = {executor.submit(load_url, url, 60): url for url in URLS}
    for future in concurrent.futures.as_completed(future_to_url):
        url = future_to_url[future]
        try:
            data = future.result()
        except Exception as exc:
            print('%r generated an exception: %s' % (url, exc))
        else:
            print('%r page is %d bytes' % (url, len(data)))

ProcessPoolExecutor Example

import concurrent.futures
import math

PRIMES = [
    112272535095293,
    112582705942171,
    112272535095293,
    115280095190773,
    115797848077099,
    1099726899285419]

def is_prime(n):
    if n < 2:
        return False
    if n == 2:
        return True
    if n % 2 == 0:
        return False

    sqrt_n = int(math.floor(math.sqrt(n)))
    for i in range(3, sqrt_n + 1, 2):
        if n % i == 0:
            return False
    return True

def main():
    with concurrent.futures.ProcessPoolExecutor() as executor:
        for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):
            print('%d is prime: %s' % (number, prime))

if __name__ == '__main__':
    main()
举报

相关推荐

0 条评论