Python多线程与多进程中join()方法的效果是相同的。
下面仅以多线程为例:
明确几个概念:
知识点一:
当一个进程启动之后,会默认产生一个主线程,因为线程是程序执行流的最小单元,当设置多线程时,主线程会创建多个子线程,在python中,默认情况下(其实就是setDaemon(False
),主线程执行完自己的任务以后,就退出了,此时子线程会继续执行自己的任务,直到自己的任务结束,例子见下面一。
知识点二:
当我们使用setDaemon(True)
方法,设置子线程为守护线程时,主线程一旦执行结束,则全部线程全部被终止执行,可能出现的情况就是,子线程的任务还没有完全执行结束,就被迫停止,例子见下面二。
知识点三:
此时join
的作用就凸显出来了,join所完成的工作就是线程同步,即主线程任务结束之后,进入阻塞状态,一直等待其他的子线程执行结束之后,主线程在终止,例子见下面三。
知识点四:
join有一个timeout
参数:
当设置守护线程时,含义是主线程对于子线程等待timeout的时间将会杀死该子线程,最后退出程序。所以说,如果有10个子线程,全部的等待时间就是每个timeout的累加和。简单的来说,就是给每个子线程一个timeout的时间,让他去执行,时间一到,不管任务有没有完成,直接杀死。
没有设置守护线程时,主线程将会等待timeout的累加和这样的一段时间,时间一到,主线程结束,但是并没有杀死子线程,子线程依然可以继续执行,直到子线程全部结束,程序退出。
一、没有守护线程
target方式
import threading
import time
def run():
time.sleep(2)
print('当前线程的名字是: ', threading.current_thread().name)
time.sleep(2)
if __name__ == '__main__':
start_time = time.time()
print('这是主线程:', threading.current_thread().name)
thread_list = []
for i in range(5):
t = threading.Thread(target=run)
thread_list.append(t)
for t in thread_list:
t.start()
print('主线程结束!' , threading.current_thread().name)
print('一共用时:', time.time()-start_time)
继承Thread方式
import time
import threading
class TestThread(threading.Thread):
def run(self):
time.sleep(2)
print('当前线程的名字是: ', threading.current_thread().name)
time.sleep(2)
start_time = time.time()
print('这是主线程:', threading.current_thread().name)
for i in range(5):
t1 = TestThread()
t1.start()
print('主线程结束了!' , threading.current_thread().name)
print('一共用时:', time.time()-start_time)
关键点:
我们的计时是对主线程计时,主线程结束,计时随之结束,打印出主线程的用时。
主线程的任务完成之后,主线程随之结束,子线程继续执行自己的任务,直到全部的子线程的任务全部结束,程序结束。
二、设置守护线程
target方式
import threading
import time
def run():
time.sleep(2)
print('当前线程的名字是: ', threading.current_thread().name)
time.sleep(2)
if __name__ == '__main__':
start_time = time.time()
print('这是主线程:', threading.current_thread().name)
thread_list = []
for i in range(5):
t = threading.Thread(target=run)
thread_list.append(t)
for t in thread_list:
t.setDaemon(True)
t.start()
print('主线程结束了!' , threading.current_thread().name)
print('一共用时:', time.time()-start_time)
继承Thread方式
import time
import threading
class TestThread(threading.Thread):
def run(self):
time.sleep(2)
print('当前线程的名字是: ', threading.current_thread().name)
time.sleep(2)
start_time = time.time()
print('这是主线程:', threading.current_thread().name)
for i in range(5):
t = TestThread()
t.setDaemon(True)
t.start()
print('主线程结束了!' , threading.current_thread().name)
print('一共用时:', time.time()-start_time)
关键点:
非常明显的看到,主线程结束以后,子线程还没有来得及执行,整个程序就退出了。
三、join的作用
target方式
import threading
import time
def run():
time.sleep(2)
print('当前线程的名字是: ', threading.current_thread().name)
time.sleep(2)
if __name__ == '__main__':
start_time = time.time()
print('这是主线程:', threading.current_thread().name)
thread_list = []
for i in range(5):
t = threading.Thread(target=run)
thread_list.append(t)
for t in thread_list:
t.setDaemon(True)
t.start()
for t in thread_list:
t.join()
print('主线程结束了!' , threading.current_thread().name)
print('一共用时:', time.time()-start_time)
继承Thread方式
import time
import threading
class TestThread(threading.Thread):
def run(self):
time.sleep(2)
print('当前线程的名字是: ', threading.current_thread().name)
time.sleep(2)
start_time = time.time()
print('这是主线程:', threading.current_thread().name)
for i in range(5):
t = TestThread()
t.setDaemon(True)
t.start()
t.join()
print('主线程结束了!' , threading.current_thread().name)
print('一共用时:', time.time()-start_time)
关键点:
可以看到,主线程一直等待全部的子线程结束之后,主线程自身才结束,程序退出。
实际例子,下载抖音视频:
import os
import pymysql.cursors
import urllib.request
from bs4 import BeautifulSoup
import socket
import requests
import copy
from threading import Thread
import time
import queue
ROOT_PATH = "/img/"
ROOT_URL = "http://img.xxx.com/img"
# Setting timeout
TIMEOUT = 10
# Retry times
RETRY = 5
# Numbers of downloading threads concurrently
THREADS = 5
HEADERS = {
'accept-encoding': 'gzip, deflate, br',
'accept-language': 'zh-CN,zh;q=0.9',
'pragma': 'no-cache',
'cache-control': 'no-cache',
'upgrade-insecure-requests': '1',
'user-agent': "Mozilla/5.0 (iPhone; CPU iPhone OS 11_0 like Mac OS X) AppleWebKit/604.1.38 (KHTML, like Gecko) Version/11.0 Mobile/15A372 Safari/604.1",
}
class DouyinDb(object):
"""
操作数据库
"""
def __init__(self):
self.conn = pymysql.connect(host="192.168.0.1",
user="username",
password="pwd",
db="database",
charset="utf8",
cursorclass=pymysql.cursors.DictCursor)
def get_date(self):
# 获取数据
try:
with self.conn.cursor() as cursor:
sql = "SELECT user_id, image, org_image, video, org_video, video_id FROM tb_douyin WHERE image = '' OR video = '' ORDER BY id DESC LIMIT 50;"
cursor.execute(sql)
result = cursor.fetchall()
return result
finally:
self.conn.close()
def update(self, medium_type, medium_url, file_name):
file_name = file_name.replace(ROOT_PATH, ROOT_URL)
try:
with self.conn.cursor() as cursor:
if medium_type is "image":
sql = "UPDATE tb_douyin SET image='%s' WHERE org_image='%s'" % (file_name, medium_url)
elif medium_type == 'video':
sql = "UPDATE tb_douyin SET video='%s' WHERE org_video='%s'" % (file_name, medium_url)
else:
return
cursor.execute(sql)
finally:
self.conn.close()
def getRemoteFileSize(url, proxy=None):
'''
通过content-length头获取远程文件大小
'''
try:
request = urllib.request.Request(url)
request.get_method = lambda: 'HEAD'
response = urllib.request.urlopen(request)
response.read()
except urllib.error.HTTPError as e:
# 远程文件不存在
print(e.code)
print(e.read().decode("utf8"))
return 0
else:
fileSize = dict(response.headers).get('Content-Length', 0)
return int(fileSize)
def download(medium_type, uri, medium_url, target_folder):
headers = copy.copy(HEADERS)
file_name = uri
if medium_type == 'video':
file_name += '.mp4'
headers['user-agent'] = 'Aweme/27014 CFNetwork/974.2.1 Darwin/18.0.0'
elif medium_type == 'image':
# file_name += '.webp'
file_name = file_name.replace("/", "-")
else:
return
file_path = os.path.join(target_folder, file_name)
if os.path.isfile(file_path):
remoteSize = getRemoteFileSize(medium_url)
localSize = os.path.getsize(file_path)
if remoteSize == localSize:
DouyinDb().update(medium_type, medium_url, file_path)
return
print("Downloading %s from %s\n" % (file_name, medium_url))
retry_times = 0
while retry_times < RETRY:
try:
resp = requests.get(medium_url, headers=headers, stream=True, timeout=TIMEOUT)
if resp.status_code == 403:
retry_times = RETRY
print("Access Denied when retrieve %s.\n" % medium_url)
raise Exception("Access Denied")
with open(file_path, 'wb') as fh:
for chunk in resp.iter_content(chunk_size=1024):
fh.write(chunk)
DouyinDb().update(medium_type, medium_url, file_path)
break
except:
pass
retry_times += 1
else:
try:
os.remove(file_path)
except OSError:
pass
print("Failed to retrieve %s from %s.\n" % (uri, medium_url))
time.sleep(1)
class DownloadWorker(Thread):
def __init__(self, queue):
Thread.__init__(self)
self.queue = queue
def run(self):
while True:
medium_type, uri, download_url, target_folder = self.queue.get()
download(medium_type, uri, download_url, target_folder)
self.queue.task_done()
class Check(object):
def __init__(self):
socket.setdefaulttimeout(60)
self.queue = queue.Queue()
def main(self):
ls = DouyinDb().get_date()
for x in range(THREADS):
worker = DownloadWorker(self.queue)
worker.daemon = True
worker.start()
for d in ls:
target_folder = os.path.join(ROOT_PATH, '%s' % d['user_id'])
if not os.path.isdir(target_folder):
os.mkdir(target_folder)
video_folder = os.path.join(target_folder, "video")
if not os.path.isdir(video_folder):
os.mkdir(video_folder)
img_folder = os.path.join(target_folder, "img")
if not os.path.isdir(img_folder):
os.mkdir(img_folder)
self.queue.put(('video', d['video_id'], d['org_video'], video_folder))
uri = d['org_image'].split("/")[-1]
self.queue.put(('image', uri, d['org_image'], img_folder))
self.queue.join()
Check().main()