import aiohttp
import asyncio
import time
from tqdm import tqdm
import json
#vllm部署qwen如何统计token数量呢?
async def fetch(session, url):
    """
    参数:
        session (aiohttp.ClientSession): 用于请求的会话。
        url (str): 要发送请求的 URL。
    返回:
        tuple: 包含完成 token 数量和请求时间。
    """
    start_time = time.time()
    completion_tokens = 0
    json_payload = {
        "model": "Qwen25:14B-Instruct-GPTQ-Int8",
        "messages": [{"role": "user", "content": "Why is the sky blue?"}],
        "stream": True,
        "temperature": 0.7
    }
    async with session.post(url, json=json_payload) as response:
        async for line in response.content:
            print(line)
            line_str = line.decode('utf-8').strip()
            if line_str.startswith('data:'):
                data_str = line_str[len('data:'):].strip()
                if data_str and data_str!= '[DONE]':
                    try:
                        data = json.loads(data_str)
                        if 'usage' in data:
                            completion_tokens += data['usage']['completion_tokens']
                    except json.JSONDecodeError as e:
                        print(f"JSON decoding error: {e}")
                    except Exception as e:
                        print(f"Error processing data: {e}")
        end_time = time.time()
        request_time = end_time - start_time
        return completion_tokens, request_time


async def bound_fetch(sem, session, url, pbar):
    # 使用信号量 sem 来限制并发请求的数量,确保不会超过最大并发请求数
    async with sem:
        result = await fetch(session, url)
        pbar.update(1)
        return result


async def run(load_url, max_concurrent_requests, total_requests):
    """
    通过发送多个并发请求来运行基准测试。
    参数:
        load_url (str): 要发送请求的URL。
        max_concurrent_requests (int): 最大并发请求数。
        total_requests (int): 要发送的总请求数。
    返回:
        tuple: 包含完成 token 总数列表和响应时间列表。
    """
    # 创建 Semaphore 来限制并发请求的数量
    sem = asyncio.Semaphore(max_concurrent_requests)

    # 创建一个异步的HTTP会话
    async with aiohttp.ClientSession() as session:
        tasks = []

        # 创建一个进度条来可视化请求的进度
        with tqdm(total=total_requests) as pbar:
            # 循环创建任务,直到达到总请求数
            for _ in range(total_requests):
                # 为每个请求创建一个任务,确保它遵守信号量的限制
                task = asyncio.ensure_future(bound_fetch(sem, session, load_url, pbar))
                tasks.append(task)  # 将任务添加到任务列表中

            # 等待所有任务完成并收集它们的结果
            results = await asyncio.gather(*tasks)

        # 计算所有结果中的完成token总数
        completion_tokens = sum(result[0] for result in results)

        # 从所有结果中提取响应时间
        response_times = [result[1] for result in results]

        # 返回完成token的总数和响应时间的列表
        return completion_tokens, response_times


if __name__ == '__main__':
    import sys

    if len(sys.argv) != 3:
        print("Usage: python bench.py <C> <N>")
        sys.exit(1)

    C = int(sys.argv[1])  # 最大并发数
    N = int(sys.argv[2])  # 请求总数

    # vllm 和 ollama 都兼容了 openai 的 api 让测试变得更简单了
    url = 'http://localhost:11435/v1/chat/completions'

    start_time = time.time()
    completion_tokens, response_times = asyncio.run(run(url, C, N))
    end_time = time.time()

    # 计算总时间
    total_time = end_time - start_time
    # 计算每个请求的平均时间
    avg_time_per_request = sum(response_times) / len(response_times)
    # 计算每秒生成的 token 数量
    tokens_per_second = completion_tokens / total_time

    print(f'Performance Results:')
    print(f'  Total requests            : {N}')
    print(f'  Max concurrent requests   : {C}')
    print(f'  Total time                : {total_time:.2f} seconds')
    print(f'  Average time per request  : {avg_time_per_request:.2f} seconds')
    print(f'  Tokens per second         : {tokens_per_second:.2f}')
"""
python -m vllm.entrypoints.openai.api_server \
    --model /home/huidao/.cache/modelscope/hub/Qwen/Qwen2___5-14B-Instruct-GPTQ-Int8 \
    --served-model-name Qwen25:14B-Instruct-GPTQ-Int8 \
    --trust-remote-code \
    --max-model-len 4096 \
    --port 11435

"""
import json
import aiohttp
import asyncio
import time
from tqdm import tqdm

async def fetch(session, url):
    start_time = time.time()
    json_payload = {
        "model": "Qwen25:14B-Instruct-GPTQ-Int8",
        "messages": [{"role": "user", "content": "Why is the sky blue?"}],
        "stream": True,
        "temperature": 0.7
    }
    async with session.post(url, json=json_payload) as response:
        async for line in response.content.iter_any():
            try:
                data_str = line.decode('utf - 8')  # 将字节数据转换为字符串
                data_lines = data_str.split('\n')  # 按行分割,处理可能的多个JSON数据
                for data_line in data_lines:
                    if data_line.startswith('data:'):
                        json_data_str = data_line[len('data:'):].strip()
                        try:
                            json_data = json.loads(json_data_str)
                            for choice in json_data.get('choices', []):
                                delta = choice.get('delta')
                                if isinstance(delta, dict) and 'content' in delta:
                                    content_value = delta['content']
                                    print(content_value)
                        except json.JSONDecodeError as json_e:
                            print(f"JSON decoding error: {json_e}")
                end_time = time.time()
                break
            except Exception as e:
                print(f"Error processing line: {e}")
    request_time = end_time - start_time
    print(request_time)
    return request_time


async def bound_fetch(sem, session, url, pbar):
    # 使用信号量 sem 来限制并发请求的数量,确保不会超过最大并发请求数
    async with sem:
        result = await fetch(session, url)
        pbar.update(1)
        return result


async def run(load_url, max_concurrent_requests, total_requests):
    """
    通过发送多个并发请求来运行基准测试。
    参数:
        load_url (str): 要发送请求的URL。
        max_concurrent_requests (int): 最大并发请求数。
        total_requests (int): 要发送的总请求数。
    返回:
        tuple: 包含完成 token 总数列表和响应时间列表。
    """
    # 创建 Semaphore 来限制并发请求的数量
    sem = asyncio.Semaphore(max_concurrent_requests)

    # 创建一个异步的HTTP会话
    async with aiohttp.ClientSession() as session:
        tasks = []

        # 创建一个进度条来可视化请求的进度
        with tqdm(total=total_requests) as pbar:
            # 循环创建任务,直到达到总请求数
            for _ in range(total_requests):
                # 为每个请求创建一个任务,确保它遵守信号量的限制
                task = asyncio.ensure_future(bound_fetch(sem, session, load_url, pbar))
                tasks.append(task)  # 将任务添加到任务列表中

            # 等待所有任务完成并收集它们的结果
            results = await asyncio.gather(*tasks)

        # 从所有结果中提取响应时间
        response_times = [result for result in results]

        # 返回完成token的总数和响应时间的列表
        return response_times


if __name__ == '__main__':
    import sys

    if len(sys.argv) != 3:
        print("Usage: python bench.py <C> <N>")
        sys.exit(1)

    C = int(sys.argv[1])  # 最大并发数
    N = int(sys.argv[2])  # 请求总数

    # vllm 和 ollama 都兼容了 openai 的 api 让测试变得更简单了
    url = 'http://localhost:11435/v1/chat/completions'


    response_times = asyncio.run(run(url, C, N))


    # 计算每个请求的平均时间
    avg_time_per_request = sum(response_times) / len(response_times)

    print(f'Performance Results:')
    print(f'  Total requests            : {N}')
    print(f'  Max concurrent requests   : {C}')
    print(f'  第一个token出现的平均耗时  : {avg_time_per_request} seconds')

"""
python -m vllm.entrypoints.openai.api_server \
    --model /home/huidao/.cache/modelscope/hub/Qwen/Qwen2___5-14B-Instruct-GPTQ-Int8 \
    --served-model-name Qwen25:14B-Instruct-GPTQ-Int8 \
    --trust-remote-code \
    --max-model-len 4096 \
    --port 11435

"""

 一些原理

https://zhuanlan.zhihu.com/p/691045737

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐