0
点赞
收藏
分享

微信扫一扫

【从零开始】7. RAG 应用调优(番外篇)

书接上回,本节将使用压测工具(pressure_util.py)做一次 llm 性能调优(更正: RAG 应用调优),在开始之前先将背景跟大家说一下。

关于 RAG 应用

本次压测的 RAG 应用推理主体采用基于 transformers 部署的 Qwen2.5 7B 模型,语义向量采用智源的 bge-small-zh-v1.5 模型,而知识库则采用 Elasticsearch 8.12 存储库。对话请求的数据流程图如下所示:

首先用户在客户端发送问题,之后会对问题进行关键字提取。紧接着关键字也需要做向量化处理,并采用 Script 方式对 ES 库中向量字段进行查询。返回结果根据评分进行重排,并选取关联度最高的 10 条数据组装成 llm 能够理解的 prompt 。之后 prompt 会发送给 Qwen2.5 进行推理,推理结果使用 SSE 协议推送给客户。

这里先将 Qwen2.5 定义为一个黑盒,假设他在同样的环境下执行推理的效能是固定的。那么本次压测的意义就在于对模型调用、函数调用、知识库查询进行性能优化(这也是本篇开头说基于 RAG 应用调优的原因)。

关于压测脚本

本次压测将使用 pressure_util.py 自开发压测程序,这个工具目前只适用于同步接口,对于 flask async 接口暂不支持。此外,本次压测会根据生成速率和总耗时作为性能评判依据,虽看上去不太严谨但基于同环境、同参数的情况下,通过反复压测还是能看到一些端倪的。

关于生成速率

为方便对 token 生成量进行统计,对模型调用代码进行了修改,如下图:

# 获取当前 token 数
current_tokens = len(self.tokenizer.encode(buffer, add_special_tokens=False))
# 累计 token 数
total_tokens += current_tokens
# 返回时将各个 token 相关参数带上
yield {
'text': buffer,
'token_count': current_tokens,
'total_token_count': total_tokens,
'token_rate': total_tokens / (time.time() - start_time)
}

由上图可以看出,第一步会对生成字符进行 encode(重编码)以便获取该字符的 token 数。之后通过循环将 token 数进行累加。最后将总 token 数除以总时长就能得到平均每秒 tokens 的生成数了(当然了,这种做法是不够严谨的,在代码量不多时先凑合着用吧)。

第一次压测

嗯...由于某些原因不能将代码全部贴出来,先说结果吧。

原代码在功能验证环节是没有问题的,但并发数提升后性能就会急剧下降。这里使用了 5 个队列循环 20 次来进行压测,资源使用情况如下:

通过 nvtop 监控我们可以发现,在压测工具满负荷时 GPU 使用率稳定在 30% 左右,但到压测后期 GPU 使用率波动很大,如下图:

依我推测这跟 GPU 资源争抢有关,并发数多的时候 GPU 资源被分摊从而导致推理结果变慢。最终耗时 1675.20 秒。如下图:

- Program completed!
- Queue 0 completed 20 tasks
- Queue 1 completed 20 tasks
- Queue 2 completed 20 tasks
- Queue 3 completed 20 tasks
- Queue 4 completed 20 tasks
- Total run time: 1675.20 seconds

其并发高峰时 tokens 平均生成速率为 2~3 token/s。

第一次优化

第一次优化先从简单的“调参”入手,譬如:

  1. 减少设备迁移操作,使用 model.to(device) 代替 model.cuda()
if torch.cuda.is_available():
self.device = torch.device(self.gccp)
else:
self.device = torch.device(cpu)
self.model.to(self.device)
  1. 增加显存回收管理机制,避免每次推理后都进行内存清理影响性能
    ...
def _check_gpu_memory(self,threshold_percentage):

检查 GPU 显存使用情况
threshold_percentage: 触发清理的显存使用百分比阈值

if torch.cuda.is_available():
device = torch.cuda.current_device()
memory_allocated = torch.cuda.memory_allocated(device)
memory_total = torch.cuda.get_device_properties(device).total_memory
# 计算显存使用百分比
memory_used_percent = (memory_allocated / memory_total) * 100
return memory_used_percent > threshold_percentage
return False

def pytorch_transfor_msg(self, msg):
...

finally:
if self._check_gpu_memory(self.threshold_percentage):
torch.cuda.empty_cache()
gc.collect()
  1. 添加模型编译和半精度推理
# 启用 torch.compile
self.model = torch.compile(self.model,mode=reduce-overhead)
# 启用半精度推理
self.model = self.model.half()
  1. 启用自动混合精度和 inference_mode
...
# 创建一个函数来包装模型生成过程
def generate_with_optimizations():
with torch.inference_mode(), torch.amp.autocast('cuda'):
self.model.generate(**generate_kwargs)

添加上述优化后依然采用 5 队列循环 20 次进行压测。如下图:

并发高峰 GPU 使用率有所提升,tokens 生成率提升到 3~4 token/s,总耗时为 1551.28 秒,性能提升了 7.3%。

- Program completed!
- Queue 0 completed 20 tasks
- Queue 1 completed 20 tasks
- Queue 2 completed 20 tasks
- Queue 3 completed 20 tasks
- Queue 4 completed 20 tasks
- Total run time: 1551.28 seconds

第二次优化

通过前两次的压测不难发现,在整个压测过程系统只使用了一个进程,这并未发挥服务器“多核处理”的优势。于是带着问题重新审查了一遍代码做了以下调整:

  1. 进程级并行处理
  • 使用了多进程(multiprocessing)而不是线程(threading)来处理 token 生成;
  • 通过 mp.Process 创建独立进程,可以更好地利用多核 CPU;
  • 避免了 Python GIL (全局解释器锁)的限制,提高了并行处理能力;
  1. 更好的流控制机制
  • 使用文本缓冲机制(buffer)来承载生成的 token 内容;
  • 当缓冲区达到一定大小(5个字符或以上)或遇到标点符号时才一并输出,这可以产生更自然的文本片段,减少频繁的小片段输出;
  1. 更健壮的错误处理
  • 使用管道(Pipe)进行进程间通信,错误处理更完整;
  • 实现了超时机制(parent_conn.poll(0.05))
  • 完善的进程清理机制,包括超时终止(process.terminate())

调整后代码如下:

...
def stream_ret(self, msg):
...
try:
model_inputs = self._pytorch_model_input(msg)
# 创建管道
parent_conn, child_conn = Pipe()
# 创建并启动进程
process = mp.Process(target=self._generate_tokens,args=(model_inputs, child_conn))
process.start()

# 接收并处理输出
while True:
try:
if not parent_conn.poll(0.05): # 50ms 超时
continue
msg_type, content = parent_conn.recv()
if msg_type == error:
yield fError: {content}
break
elif msg_type == done:
if buffer: # 输出剩余的缓冲区内容
...
break
elif msg_type == text:
buffer += content
if len(buffer) >= 5 or content.endswith(('.', '!', '?', '\n')):
...
buffer =
except Exception as e:
yield fStreaming error: {e}
break
process.join(timeout=5)
if process.is_alive():
process.terminate()
process.join()
except Exception as e:
yield fError: {str(e)}
finally:
if self._check_gpu_memory(self.threshold_percentage):
torch.cuda.empty_cache()
gc.collect()

def _generate_tokens(self, model_inputs: Dict[str, Any], pipe_conn):
try:
...
# 创建一个函数来包装模型生成过程
def generate_with_optimizations():
with torch.inference_mode(), torch.amp.autocast('cuda'):
self.model.generate(**generate_kwargs)
# 启动线程
thread = Thread(target=generate_with_optimizations)
thread.start()
# 处理流式输出
try:
for new_text in streamer:
if new_text.strip():
pipe_conn.send((text, new_text))
except Exception as e:
pipe_conn.send((error, fStreaming error: {e}))
pipe_conn.send((done, None))
except Exception as e:
pipe_conn.send((error, fGeneration error: {e}))
finally:
pipe_conn.close()

依然采用了 5 队列循环 20 次进行压测。效果如下图所示:

通过使用 multiprocessing 使 GPU 资源被完全释放(这里我是想着既然单进程的情况下只能用到 30%,那么要使 GPU 满负荷再开几个子进程是否就可以?于是就有了使用 multiprocessing 的举措)。 CPU 处理也从一个增加到四个(其实是五个,因为启动了五个队列)。总耗时如下图所示:

- Program completed!
- Queue 0 completed 20 tasks
- Queue 1 completed 20 tasks
- Queue 2 completed 20 tasks
- Queue 3 completed 20 tasks
- Queue 4 completed 20 tasks
- Total run time: 691.23 seconds

691.23 秒!!

比第一次优化足足提升了 55.44%,比原代码提升了 58.73%,并发高峰 tokens 生成速率保持在 6~7 token/s。

第三、四、五...十七次优化(一直吃瘪)

在第二次优化时发现使用“异步”和“多线程”能有助于性能提升,于是一头扎进去将几乎全部交互都改为了异步处理了,结果却一次比一次慢。总结就是一个原因:反复地创建与销毁实例造成大量资源被浪费。那么我是怎么吃瘪的呢?主要做了以下两件事:

使用 multiprocessing.Pool 线程池代替 Threading

为了让知识库反馈尽可能体现出“多样性”。我将原来“并”查询改为了“或”查询,与此同时为了保证“或”查询能有足够快的响应时间使用了并行操作(多线程处理)。本来使用 Threading 时响应速度还是挺快的,但基于 Java 老兵的直觉多线程肯定要用线程池来管理比较好吧,于是没有多想就启用了 multiprocessing.Pool。

但是由于缺少对于 multiprocessing.Pool 的认知,一开始将 multiprocessing.Pool 作为统一线程池来编码,但由于 multiprocessing.Pool 是不可序列化的对象因此这种方式无法实现。这样就导致 multiprocessing.Pool 实例要放在函数体里面了,每次调用都需要创建和销毁 multiprocessing.Pool ,这种做法性能开销过大得不偿失。

使用统一线程池(ThreadPoolExecutor) + multiprocessing.shared_memory 来实现

既然无法使用 multiprocessing.Pool 那么用 ThreadPoolExecutor 再加上共享内存来实现多线程总可以了吧。呵呵...想法是好的,但结果还是一堆问题。由于这个问题比较典型,想后面再另开篇章说一下这个问题,这里就不再叙述了。

既然线程池的方法都无法实现的情况下,先还原回 Threading 的模式并在每次知识库查询后加上耗时时长,发现压力其实并不在查询,如下图:

...
[2024-11-16 04:11:24,774] [1592340] [INFO] - guard_rag.find_summary_search (guard_rag.py:110) - outside query totally use 0.23211336135864258 secords
[2024-11-16 04:11:57,303] [1592340] [INFO] - guard_rag._find_knowledge_info (guard_rag.py:176) - es_vct_ancient_books_512 query use 0.11098599433898926 secords
[2024-11-16 04:11:57,305] [1592340] [INFO] - guard_rag._find_knowledge_info (guard_rag.py:176) - es_vct_herbal_medicines_512 query use 0.08808517456054688 secords
[2024-11-16 04:11:57,334] [1592340] [INFO] - guard_rag._find_knowledge_info (guard_rag.py:176) - es_vct_pharmacopeia_512 query use 0.06516599655151367 secords
[2024-11-16 04:11:57,417] [1592340] [INFO] - guard_rag._find_knowledge_info (guard_rag.py:176) - es_vct_consultation_record_512 query use 0.11030220985412598 secords
[2024-11-16 04:11:57,427] [1592340] [INFO] - guard_rag._find_knowledge_info (guard_rag.py:176) - es_vct_medicine_formulas_512 query use 0.06111502647399902 secords
[2024-11-16 04:11:57,428] [1592340] [INFO] - guard_rag.find_summary_search (guard_rag.py:110) - outside query totally use 0.25348877906799316 secords
[2024-11-16 04:13:15,205] [1593911] [INFO] - guard_rag._find_knowledge_info (guard_rag.py:176) - es_vct_herbal_medicines_512 query use 0.05159330368041992 secords
[2024-11-16 04:13:15,210] [1593911] [INFO] - guard_rag._find_knowledge_info (guard_rag.py:176) - es_vct_pharmacopeia_512 query use 0.04917025566101074 secords
[2024-11-16 04:13:15,242] [1593911] [INFO] - guard_rag._find_knowledge_info (guard_rag.py:176) - es_vct_ancient_books_512 query use 0.094268798828125 secords
[2024-11-16 04:13:15,259] [1593911] [INFO] - guard_rag._find_knowledge_info (guard_rag.py:176) - es_vct_medicine_formulas_512 query use 0.05863833427429199 secords
[2024-11-16 04:13:15,268] [1593911] [INFO] - guard_rag._find_knowledge_info (guard_rag.py:176) - es_vct_consultation_record_512 query use 0.0825951099395752 secords
[2024-11-16 04:13:15,269] [1593911] [INFO] - guard_rag.find_summary_search (guard_rag.py:110) - outside query totally use 0.12564992904663086 secords
...

在并发 6 次查询的前提下(因为需要提取 6 个类型的知识内容),所有返回总耗时都保持在 0.1 ~ 0.2 秒内完成,再加上 SSE 执行过程中是不会对知识库进行查询的,那也就是说目前的并发量根本不可能产生压力。

第十八次优化

在意识到性能瓶颈不在知识库后,继续审查 llm 推理部分代码实现。通过对关键环节输出耗时时间得知,在第一次生成推理结果时最慢,如下图:

...
[2024-11-16 03:14:01,310] [1584665] [INFO] - pytorch_llm.pytorch_transfor_stream_msg (pytorch_llm.py:284) - the first rev use 15.954158306121826 seconds
[2024-11-16 03:14:10,572] [1584665] [INFO] - pytorch_llm.pytorch_transfor_msg (pytorch_llm.py:228) - totally use 1.9229881763458252 seconds
[2024-11-16 03:14:32,039] [1584665] [INFO] - pytorch_llm.pytorch_transfor_stream_msg (pytorch_llm.py:284) - the first rev use 9.45533013343811 seconds
[2024-11-16 03:14:51,553] [1584665] [INFO] - pytorch_llm.pytorch_transfor_msg (pytorch_llm.py:228) - totally use 2.0619521141052246 seconds
[2024-11-16 03:31:58,578] [1587316] [INFO] - pytorch_llm.pytorch_transfor_stream_msg (pytorch_llm.py:287) - the first rev use 9.751284837722778 seconds
[2024-11-16 03:32:12,363] [1587316] [INFO] - pytorch_llm.pytorch_transfor_msg (pytorch_llm.py:231) - totally use 2.1306257247924805 seconds
[2024-11-16 03:32:47,323] [1587316] [INFO] - pytorch_llm.pytorch_transfor_stream_msg (pytorch_llm.py:287) - the first rev use 9.490077257156372 seconds
[2024-11-16 03:32:59,150] [1587316] [INFO] - pytorch_llm.pytorch_transfor_msg (pytorch_llm.py:231) - totally use 1.0900585651397705 seconds
[2024-11-16 03:33:10,532] [1587316] [INFO] - pytorch_llm.pytorch_transfor_stream_msg (pytorch_llm.py:287) - the first rev use 9.31861424446106 seconds
...

在模型未预热的情况下,第一个字符返回居然需要 15 秒。而之后在模型预热完毕后也基本需要 9 秒才能开始输出第一个字符,于是考虑加入 Flash Attention 2 来优化生成速率。

根据 Qwen 的官方文档中所述,最新的 Flash Attention 2 只需将 transformers、torch 升级到最新版本, Flash Attention 2 将在适用时自动应用而无需修改代码参数。如下图:

话虽如此,在安装 Flash Attention 2 的过程还遇到了一些问题。最后是通过添加“FLASH_ATTENTION_FORCE_BUILD”参数解决的,如下图:

FLASH_ATTENTION_FORCE_BUILD=TRUE pip install -i https://mirrors.aliyun.com/pypi/simple flash-attn --no-build-isolation --verbose

有了 Flash Attention 2 的加持生成速度果然快了很多。与此同时,我将一些函数提取并作为公共函数使用,这样就不用每次调用都实例化一次。虽然只是小改动,但效果还是比较明显的,如下图:

...
[2024-11-18 02:03:25,685] [8456] [INFO] - pytorch_llm.pytorch_transfor_stream_msg (pytorch_llm.py:290) - the first rev use 6.534850358963013 seconds
[2024-11-18 02:03:27,146] [8456] [INFO] - pytorch_llm.pytorch_transfor_stream_msg (pytorch_llm.py:290) - the first rev use 6.747442722320557 seconds
[2024-11-18 02:03:30,344] [8456] [INFO] - pytorch_llm.pytorch_transfor_stream_msg (pytorch_llm.py:290) - the first rev use 6.885641813278198 seconds

...

- Program completed!
- Queue 0 completed 20 tasks
- Queue 1 completed 20 tasks
- Queue 2 completed 20 tasks
- Queue 3 completed 20 tasks
- Queue 4 completed 20 tasks
- Total run time: 718.13 seconds

第一个推理出来的字符耗时降低到 7 秒内,而整个压测结果耗时 718.13 秒。虽然相比第二次优化总耗时长了,但是每个对话的 token 生成数却多了一半,在内容没有重复的前提下,回答得比之前更详细了(可能是做了 RAG 知识库的查询调整引起的吧)。

至此,基于 transformer 部署的大模型 RAG “基础”优化已结束。

这时可能会有小伙伴会有以下问题:

  1. 不是还能通过批处理加速吗?

是的,批处理的处理方式在某些时候的确能够加速推理,但在本例中并不适用(别问我为什么知道)。

  1. 你为什么不使用像 vllm 又或者 llama.cpp 的方式来部署模型这样不更能优化速度吗?

是的,vllm 、 llama.cpp 又或者 ollama 他们都有非常优异的 llm 推理优化,性能也是杠杠的。我们可以通过 Docker 又或者本地部署的方式先将服务启动起来,然后通过 API 进行对接即可使用。但因为某些原因,这种未能完全整合到项目的实现方式不被允许使用...

PS:说到 vllm 和 llama.cpp(llama-cpp-python) 其实也有整合到项目的实现(我也将其实现了)。但在并发的情况下性能提升有限,因此后续我将其放到自己 brain-mix 项目中并分享给大家,公司项目就不采用这种方式了。

  1. 关于 Transformers 的性能优化不是还有 BetterTransformer 这种实现吗

是的,BetterTransformer 可以用最少的改动实现离线推理的性能飞跃,这跟它的实现原理有关。但无奈的是目前 BetterTransformer 并不支持 Qwen 架构。因此如果想用 BetterTransformer 的话可以换 bigscience/bloom-7b1 模型试试。

...... 更多 RAG 应用优化会放入正篇中进行叙述(未完待续...)

举报

相关推荐

0 条评论