推理队列封装


import asyncio
import edge_tts
import time
from threading import Thread
from queue import Queue, Empty
import os

def tts_sync_stream(text, voice="zh-CN-XiaoxiaoNeural"):
    """同步流式生成(内部用 async)"""
    async def _run():
        communicate = edge_tts.Communicate(text, voice)

        audio_chunks = []
        first_chunk_time = None
        start_time = time.time()
        chunk_count = 0

        async for chunk in communicate.stream():
            if chunk["type"] == "audio":
                chunk_count += 1

                if first_chunk_time is None:
                    first_chunk_time = time.time() - start_time

                audio_chunks.append(chunk["data"])

        total_time = time.time() - start_time

        return audio_chunks, first_chunk_time, total_time, chunk_count

    return asyncio.run(_run())


# ================== Worker ==================
def worker(queue: Queue, out_dir="output"):
    os.makedirs(out_dir, exist_ok=True)

    while True:
        try:
            text, idx = queue.get(timeout=1)
        except Empty:
            break

        print(f"\n🎙 开始合成: {text}")

        audio_chunks, ttfb, total_time, chunk_count = tts_sync_stream(text)

        # 保存音频
        output_path = os.path.join(out_dir, f"out_{idx:02d}.mp3")
        with open(output_path, "wb") as f:
            for c in audio_chunks:
                f.write(c)

        # 估算音频时长(简单估计)
        audio_size = sum(len(c) for c in audio_chunks)
        audio_duration = audio_size / (16000 * 2)  # 粗略估计(16k 16bit)

        rtf = total_time / audio_duration if audio_duration > 0 else 0

        print(f"💾 保存: {output_path}")
        print(f"TTFB: {ttfb:.3f}s | Total: {total_time:.3f}s | RTF: {rtf:.3f} | chunks: {chunk_count}")

if __name__ == "__main__":
    texts = [
        "繁荣的商务区如金融街",
        "完善的城市基础设施",
        "服务业是主要经济支柱之一",
        "这里有兵马俑",
        "曾举办过2008年夏季奥运会",
        "北京是一座兼具悠久历史",
        "深厚文化底蕴和现代化城市功能的国际化大都市",
    ]

    queue = Queue()

    for i, t in enumerate(texts):
        queue.put((t, i))

    start_all = time.time()

    # 单 worker(你要测纯推理速度)
    t = Thread(target=worker, args=(queue,))
    t.start()
    t.join()

    print("\n 总耗时:", time.time() - start_all)

推理耗时20s。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐