引入 Gevent
gevent
是一个基于协程的 Python 网络库,它使用了 greenlet
提供的轻量级"绿色线程",配合事件循环,能够实现高效的并发处理。在开始编写异步服务之前,我们需要通过 monkey.patch_all()
来修改标准库中的一些阻塞操作,使其变为非阻塞,以便于 gevent
的使用。
from gevent import monkey; monkey.patch_all()
使用 WSGIServer
gevent.pywsgi.WSGIServer
是 gevent
提供的一个 WSGI 服务器实现,它能够处理并发的 HTTP 请求。与传统的同步服务器不同,WSGIServer
利用 gevent
的非阻塞IO和协程特性,可以在单个线程内高效地管理数千个连接。odoo也是采用了这个方式。
from gevent.pywsgi import WSGIServer
实现异步任务处理
为了实现异步处理,我们可以利用 Python 的 queue
模块创建一个任务队列。后台的工作线程从队列中获取任务,并执行。这样,主线程可以快速响应新的 HTTP 请求,而不需要等待任务的完成。
from queue import Queue
import threading
我们定义一个 WebTask
类来封装任务信息,包括任务 ID、要执行的函数、参数等。通过这种方式,我们可以灵活地添加不同类型的任务。
示例代码
结合以上元素,我们可以创建一个简单的异步 Web 服务,该服务能够接收任务请求,并立即返回。任务在后台队列中异步处理,客户端可以通过查询任务状态了解任务进度。
import time
# 初始化任务队列
task_queue = Queue()
# 定义任务模型
class WebTask:
def __init__(self, task_id, function, *args, **kwargs):
self.task_id = task_id
self.function = function
self.args = args
self.kwargs = kwargs
self.result = None
self.completed = False
def run(self):
self.result = self.function(*self.args, **self.kwargs)
self.completed = True
print(f"任务 {self.task_id} 完成.")
# 存储任务的字典,用于查询任务状态
tasks = {}
def example_task(duration):
time.sleep(duration)
print(f"执行了一个耗时 {duration} 秒的任务.")
def worker():
while True:
task = task_queue.get()
if task is None:
break
task.run()
task_queue.task_done()
# 启动工作线程
thread = threading.Thread(target=worker)
thread.start()
def application(env, start_response):
path = env['PATH_INFO']
if path == '/start_task':
task_id = str(time.time())
task = WebTask(task_id, example_task, 5)
tasks[task_id] = task
task_queue.put(task)
start_response('200 OK', [('Content-Type', 'text/plain')])
return [f"Task {task_id} started. Check status at /status/{task_id}".encode()]
elif path.startswith('/status/'):
task_id = path.split('/')[-1]
task = tasks.get(task_id)
if task and task.completed:
start_response('200 OK', [('Content-Type', 'text/plain')])
return [b"Task completed"]
else:
start_response('200 OK', [('Content-Type', 'text/plain')])
return [b"Task is still processing"]
else:
start_response('404 Not Found', [('Content-Type', 'text/plain')])
return [b"Not Found"]
if __name__ == '__main__':
print("Serving on http://0.0.0.0:8008")
WSGIServer(('0.0.0.0', 8008), application).serve_forever()