大模型并发的问题
【代码】大模型并发的问题。
·
import aiohttp
import asyncio
import time
from tqdm import tqdm
import json
#vllm部署qwen如何统计token数量呢?
async def fetch(session, url):
"""
参数:
session (aiohttp.ClientSession): 用于请求的会话。
url (str): 要发送请求的 URL。
返回:
tuple: 包含完成 token 数量和请求时间。
"""
start_time = time.time()
completion_tokens = 0
json_payload = {
"model": "Qwen25:14B-Instruct-GPTQ-Int8",
"messages": [{"role": "user", "content": "Why is the sky blue?"}],
"stream": True,
"temperature": 0.7
}
async with session.post(url, json=json_payload) as response:
async for line in response.content:
print(line)
line_str = line.decode('utf-8').strip()
if line_str.startswith('data:'):
data_str = line_str[len('data:'):].strip()
if data_str and data_str!= '[DONE]':
try:
data = json.loads(data_str)
if 'usage' in data:
completion_tokens += data['usage']['completion_tokens']
except json.JSONDecodeError as e:
print(f"JSON decoding error: {e}")
except Exception as e:
print(f"Error processing data: {e}")
end_time = time.time()
request_time = end_time - start_time
return completion_tokens, request_time
async def bound_fetch(sem, session, url, pbar):
# 使用信号量 sem 来限制并发请求的数量,确保不会超过最大并发请求数
async with sem:
result = await fetch(session, url)
pbar.update(1)
return result
async def run(load_url, max_concurrent_requests, total_requests):
"""
通过发送多个并发请求来运行基准测试。
参数:
load_url (str): 要发送请求的URL。
max_concurrent_requests (int): 最大并发请求数。
total_requests (int): 要发送的总请求数。
返回:
tuple: 包含完成 token 总数列表和响应时间列表。
"""
# 创建 Semaphore 来限制并发请求的数量
sem = asyncio.Semaphore(max_concurrent_requests)
# 创建一个异步的HTTP会话
async with aiohttp.ClientSession() as session:
tasks = []
# 创建一个进度条来可视化请求的进度
with tqdm(total=total_requests) as pbar:
# 循环创建任务,直到达到总请求数
for _ in range(total_requests):
# 为每个请求创建一个任务,确保它遵守信号量的限制
task = asyncio.ensure_future(bound_fetch(sem, session, load_url, pbar))
tasks.append(task) # 将任务添加到任务列表中
# 等待所有任务完成并收集它们的结果
results = await asyncio.gather(*tasks)
# 计算所有结果中的完成token总数
completion_tokens = sum(result[0] for result in results)
# 从所有结果中提取响应时间
response_times = [result[1] for result in results]
# 返回完成token的总数和响应时间的列表
return completion_tokens, response_times
if __name__ == '__main__':
import sys
if len(sys.argv) != 3:
print("Usage: python bench.py <C> <N>")
sys.exit(1)
C = int(sys.argv[1]) # 最大并发数
N = int(sys.argv[2]) # 请求总数
# vllm 和 ollama 都兼容了 openai 的 api 让测试变得更简单了
url = 'http://localhost:11435/v1/chat/completions'
start_time = time.time()
completion_tokens, response_times = asyncio.run(run(url, C, N))
end_time = time.time()
# 计算总时间
total_time = end_time - start_time
# 计算每个请求的平均时间
avg_time_per_request = sum(response_times) / len(response_times)
# 计算每秒生成的 token 数量
tokens_per_second = completion_tokens / total_time
print(f'Performance Results:')
print(f' Total requests : {N}')
print(f' Max concurrent requests : {C}')
print(f' Total time : {total_time:.2f} seconds')
print(f' Average time per request : {avg_time_per_request:.2f} seconds')
print(f' Tokens per second : {tokens_per_second:.2f}')
"""
python -m vllm.entrypoints.openai.api_server \
--model /home/huidao/.cache/modelscope/hub/Qwen/Qwen2___5-14B-Instruct-GPTQ-Int8 \
--served-model-name Qwen25:14B-Instruct-GPTQ-Int8 \
--trust-remote-code \
--max-model-len 4096 \
--port 11435
"""
import json
import aiohttp
import asyncio
import time
from tqdm import tqdm
async def fetch(session, url):
start_time = time.time()
json_payload = {
"model": "Qwen25:14B-Instruct-GPTQ-Int8",
"messages": [{"role": "user", "content": "Why is the sky blue?"}],
"stream": True,
"temperature": 0.7
}
async with session.post(url, json=json_payload) as response:
async for line in response.content.iter_any():
try:
data_str = line.decode('utf - 8') # 将字节数据转换为字符串
data_lines = data_str.split('\n') # 按行分割,处理可能的多个JSON数据
for data_line in data_lines:
if data_line.startswith('data:'):
json_data_str = data_line[len('data:'):].strip()
try:
json_data = json.loads(json_data_str)
for choice in json_data.get('choices', []):
delta = choice.get('delta')
if isinstance(delta, dict) and 'content' in delta:
content_value = delta['content']
print(content_value)
except json.JSONDecodeError as json_e:
print(f"JSON decoding error: {json_e}")
end_time = time.time()
break
except Exception as e:
print(f"Error processing line: {e}")
request_time = end_time - start_time
print(request_time)
return request_time
async def bound_fetch(sem, session, url, pbar):
# 使用信号量 sem 来限制并发请求的数量,确保不会超过最大并发请求数
async with sem:
result = await fetch(session, url)
pbar.update(1)
return result
async def run(load_url, max_concurrent_requests, total_requests):
"""
通过发送多个并发请求来运行基准测试。
参数:
load_url (str): 要发送请求的URL。
max_concurrent_requests (int): 最大并发请求数。
total_requests (int): 要发送的总请求数。
返回:
tuple: 包含完成 token 总数列表和响应时间列表。
"""
# 创建 Semaphore 来限制并发请求的数量
sem = asyncio.Semaphore(max_concurrent_requests)
# 创建一个异步的HTTP会话
async with aiohttp.ClientSession() as session:
tasks = []
# 创建一个进度条来可视化请求的进度
with tqdm(total=total_requests) as pbar:
# 循环创建任务,直到达到总请求数
for _ in range(total_requests):
# 为每个请求创建一个任务,确保它遵守信号量的限制
task = asyncio.ensure_future(bound_fetch(sem, session, load_url, pbar))
tasks.append(task) # 将任务添加到任务列表中
# 等待所有任务完成并收集它们的结果
results = await asyncio.gather(*tasks)
# 从所有结果中提取响应时间
response_times = [result for result in results]
# 返回完成token的总数和响应时间的列表
return response_times
if __name__ == '__main__':
import sys
if len(sys.argv) != 3:
print("Usage: python bench.py <C> <N>")
sys.exit(1)
C = int(sys.argv[1]) # 最大并发数
N = int(sys.argv[2]) # 请求总数
# vllm 和 ollama 都兼容了 openai 的 api 让测试变得更简单了
url = 'http://localhost:11435/v1/chat/completions'
response_times = asyncio.run(run(url, C, N))
# 计算每个请求的平均时间
avg_time_per_request = sum(response_times) / len(response_times)
print(f'Performance Results:')
print(f' Total requests : {N}')
print(f' Max concurrent requests : {C}')
print(f' 第一个token出现的平均耗时 : {avg_time_per_request} seconds')
"""
python -m vllm.entrypoints.openai.api_server \
--model /home/huidao/.cache/modelscope/hub/Qwen/Qwen2___5-14B-Instruct-GPTQ-Int8 \
--served-model-name Qwen25:14B-Instruct-GPTQ-Int8 \
--trust-remote-code \
--max-model-len 4096 \
--port 11435
"""
一些原理
更多推荐

所有评论(0)