【基于讯飞语音识别 + 大模型优化的课堂师生对话自动转写与结构化输出系统】
目录
📌 前言
在智慧教育、课堂实录分析、教学行为研究中,自动将课堂音视频转换为结构化师生对话是非常核心的需求。
本文基于:
MoviePy 视频提取音频
讯飞开放平台 语音识别(带说话人分离)
DeepSeek-V3 大模型文本降噪 + 角色修正
自动输出 CSV / JSON 标准格式
控制台打印完整对话
最终实现一套端到端课堂对话自动转写系统,可直接用于实际项目
技术栈
Python
Flask:提供 API 接口
MoviePy:视频转音频
讯飞开放平台 API:语音识别
阿里云通义千问 / DeepSeek:大模型文本优化
CSV / JSON:结构化输出
1. 系统整体架构
整个流程分为 6 个核心步骤:
1.视频转音频:MP4 提取 WAV 音频
自动从 MP4 视频中提取音频,支持截取片段、自动生成 WAV 格式,为语音识别做准备。
2.音频上传讯飞:获取识别任务 ID
3.轮询获取结果:得到带时间戳 + 说话人原始文本
4.文本结构化:解析 JSON、按句子拆分
5.大模型优化:去噪、修正角色、补全语义
6.格式输出:生成 CSV / JSON + 控制台打印对话
2.完整代码实现
# -*- coding: utf-8 -*-
import base64
import hashlib
import hmac
import json
import os
import time
from httpx import Client
import requests
import urllib
import csv
import io
from openai import OpenAI
from collections import Counter
from moviepy.video.io.VideoFileClip import VideoFileClip
# 讯飞语音识别API配置
lfasr_host = 'https://raasr.xfyun.cn/v2/api'
api_upload = '/upload'
api_get_result = '/getResult'
class audio_to_data():
def __init__(self, appid, secret_key, upload_file_path, llm_api, duration_seconds=None):
self.appid = appid
self.secret_key = secret_key
self.client = Client()
self.upload_file_path = upload_file_path
self.llm_api = llm_api
self.duration_seconds = duration_seconds
self.sentences_activity = []
self.sentences_paragraphs = []
self.llm = OpenAI(
base_url="https://dashscope.aliyuncs.com/compatible-mode/v1",
api_key=self.llm_api,
)
def movie_to_audio(self, mp4_file, wav_file, duration_seconds=None):
try:
if not os.path.exists(mp4_file):
print(f"错误: 输入文件不存在: {mp4_file}")
return False
output_dir = os.path.dirname(wav_file)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
video_clip = VideoFileClip(mp4_file)
if duration_seconds is not None and duration_seconds > 0:
end_duration = min(duration_seconds, video_clip.duration)
video_clip = video_clip.subclipped(0, end_duration)
if video_clip.audio is None:
print("错误: 视频文件不包含音频轨道")
video_clip.close()
return False
audio_clip = video_clip.audio
audio_clip.write_audiofile(wav_file, codec='pcm_s16le')
audio_clip.close()
video_clip.close()
return True
except Exception as e:
print(f"转换失败: {e}")
return False
def process_audio_file(self):
base_name = os.path.basename(self.upload_file_path)
file_name_without_ext = os.path.splitext(base_name)[0]
audio_dir = "audio"
if not os.path.exists(audio_dir):
os.makedirs(audio_dir)
wav_file_path = os.path.join(audio_dir, f"{file_name_without_ext}.wav")
self.movie_to_audio(self.upload_file_path, wav_file_path, self.duration_seconds)
self.upload_file_path = wav_file_path
ts = str(int(time.time()))
signa = self.get_signa(ts)
upload_resp = self.upload_file(ts, signa)
if not upload_resp:
return None
order_id = upload_resp['content']['orderId']
result = self.get_recognition_result(ts, signa, order_id)
if result['content']['orderInfo']['status'] == -1:
nlp_json = self.save_processed_result(result)
json_path = os.path.join("result", f"{file_name_without_ext}_processed.json")
self._processing_sentences(json_path)
csv_file = self.json_to_sentences_csv(json_path)
cv_json = self.json_to_sentences_cv_json()
cv_output_path = os.path.join("result", f"{file_name_without_ext}_processed_cv.json")
output_dir = os.path.dirname(cv_output_path)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
with open(cv_output_path, 'w', encoding='utf-8') as f:
json.dump(cv_json, f, ensure_ascii=False, indent=4)
return [cv_json, csv_file, cv_output_path, nlp_json]
else:
print("任务未成功完成或已失败,不保存处理结果。")
def get_signa(self, ts):
m2 = hashlib.md5()
m2.update((self.appid + ts).encode('utf-8'))
md5 = m2.hexdigest()
md5 = bytes(md5, encoding='utf-8')
signa = hmac.new(self.secret_key.encode('utf-8'), md5, hashlib.sha1).digest()
signa = base64.b64encode(signa)
signa = str(signa, 'utf-8')
return signa
def upload_file(self, ts, signa):
print("--- 上传部分 ---")
file_len = os.path.getsize(self.upload_file_path)
file_name = os.path.basename(self.upload_file_path)
param_dict = {
'appId': self.appid,
'signa': signa,
'ts': ts,
"fileSize": file_len,
"fileName": file_name,
"duration": "200",
"pd": "edu",
"roleType": "1",
"roleNum": "2"
}
print("upload参数:", param_dict)
with open(self.upload_file_path, 'rb') as f:
data = f.read(file_len)
response = requests.post(
url=lfasr_host + api_upload + "?" + urllib.parse.urlencode(param_dict),
headers={"Content-type": "application/json"},
data=data
)
result = json.loads(response.text)
print("upload resp:", result)
if result['code'] != '000000':
print(f"上传失败! 错误信息: {result.get('descInfo')}")
return None
return result
def get_recognition_result(self, ts, signa, order_id):
param_dict = {
'appId': self.appid,
'signa': signa,
'ts': ts,
'orderId': order_id,
'resultType': "transfer,predict"
}
print("\n--- 查询部分 ---")
status = 3
while status == 3:
response = requests.post(
url=lfasr_host + api_get_result + "?" + urllib.parse.urlencode(param_dict),
headers={"Content-type": "application/json"}
)
result = json.loads(response.text)
status = result['content']['orderInfo']['status']
print(f"status={status}, desc: {result['descInfo']}")
if status == 4:
break
time.sleep(5)
return result
def save_processed_result(self, api_response):
base_name = os.path.basename(self.upload_file_path)
file_name_without_ext = os.path.splitext(base_name)[0]
output_path = os.path.join("result", f"{file_name_without_ext}_processed.json")
print(f"\n--- 正在处理并保存结果 -> {output_path} ---")
try:
output_dir = os.path.dirname(output_path)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
content = api_response.get('content', {})
order_result_str = content.get('orderResult')
if not order_result_str:
return
order_data = json.loads(order_result_str)
segments = order_data.get('lattice2', [])
if not segments:
return
all_detailed_paragraphs = []
for segment in segments:
speaker_id = segment.get('spk', 'unknown')
para_start_time = int(segment.get('begin', 0))
para_end_time = int(segment.get('end', 0))
rt_list = segment.get('json_1best', {}).get('st', {}).get('rt', [])
if rt_list:
ws_list = rt_list[0].get('ws', [])
para_text = "".join([word.get('cw', [{}])[0].get('w', '') for word in ws_list])
word_details = [{
"text": word.get('cw', [{}])[0].get('w', ''),
"start_ms": int(word.get('wb', 0)),
"end_ms": int(word.get('we', 0)),
"spk": speaker_id
} for word in ws_list]
all_detailed_paragraphs.append({
"paragraph_text": para_text,
"words": word_details,
"spk": speaker_id,
"start_ms": para_start_time,
"end_ms": para_end_time
})
with open(output_path, 'w', encoding='utf-8') as f:
json.dump(all_detailed_paragraphs, f, ensure_ascii=False, indent=4)
return output_path
except Exception as e:
print(f"处理并保存结果时发生错误: {e}")
def ms_to_mmss(self, ms):
try:
total_seconds = int(ms) // 1000
minutes = total_seconds // 60
seconds = total_seconds % 60
return f"{minutes:02d}:{seconds:02d}"
except:
return "00:00"
def _processing_sentences(self, json_file_path):
with open(json_file_path, 'r', encoding='utf-8') as json_file:
data = json.load(json_file)
spk_counts = Counter()
for paragraph in data:
if 'words' in paragraph:
spks_in_paragraph = [word['spk'] for word in paragraph['words'] if 'spk' in word]
spk_counts.update(spks_in_paragraph)
spk_map = {}
if len(spk_counts) == 2:
teacher_spk = spk_counts.most_common(1)[0][0]
for spk in spk_counts.keys():
if spk == teacher_spk:
spk_map[spk] = "老师"
else:
spk_map[spk] = "学生"
else:
spk_map = {spk: spk for spk in spk_counts.keys()}
sentence_end_punctuation = {'。', '?', '!', '.', '?', '!'}
sentences = []
current_sentence = {'spk': None, 'start_ms': None, 'end_ms': None, 'text': ''}
for paragraph in data:
if 'words' in paragraph:
para_start_ms = paragraph.get('start_ms')
para_end_ms = paragraph.get('end_ms')
for word in paragraph['words']:
mapped_spk = spk_map.get(word.get('spk'), '未知')
word_text = word['text']
if current_sentence['spk'] != mapped_spk or current_sentence['spk'] is None:
if current_sentence['text'].strip():
sentences.append(current_sentence.copy())
current_sentence = {'spk': mapped_spk, 'start_ms': para_start_ms, 'end_ms': para_end_ms, 'text': word_text}
else:
current_sentence['text'] += word_text
if word_text in sentence_end_punctuation:
if current_sentence['text'].strip():
sentences.append(current_sentence.copy())
current_sentence = {'spk': None, 'start_ms': None, 'end_ms': None, 'text': ''}
if current_sentence['text'].strip():
sentences.append(current_sentence.copy())
current_sentence = {'spk': None, 'start_ms': None, 'end_ms': None, 'text': ''}
if current_sentence['text'].strip():
sentences.append(current_sentence)
sentences_filter = self.batch_llm_filter(sentences)
self.sentences_paragraphs = self._merge_paragraphs(sentences_filter)
self.sentences_activity = self.batch_llm_identity_activity(self.sentences_paragraphs)
def json_to_sentences_cv_json(self):
sentences = self.sentences_activity
video_text = []
for s in sentences:
try:
begin_ms = int(s.get('start_ms') if s.get('start_ms') is not None else 0)
except:
begin_ms = 0
try:
end_ms = int(s.get('end_ms') if s.get('end_ms') is not None else 0)
except:
end_ms = begin_ms
begin_sec = begin_ms // 1000
end_sec = end_ms // 1000
spk_label = s.get('spk', '')
role_num = "2" if spk_label == "学生" else "1"
role_name = "学生" if role_num == "2" else "老师"
video_text.append({
"beginTime": begin_sec,
"endTime": end_sec,
"paragraphNum": 0,
"role": role_num,
"roleName": role_name,
"sentenceContent": s.get('text', '')
})
return {"videoText": video_text}
def json_to_sentences_csv(self, json_file_path, output_csv_path=None):
if output_csv_path is None:
base_name = os.path.splitext(json_file_path)[0]
output_csv_path = base_name + '_sentences.csv'
try:
sentences = self.sentences_activity
output_dir = os.path.dirname(output_csv_path)
if output_dir and not os.path.exists(output_dir):
os.makedirs(output_dir)
with open(output_csv_path, 'w', newline='', encoding='utf-8-sig') as csv_file:
writer = csv.writer(csv_file)
writer.writerow(['时间', '角色', '内容'])
for sentence in sentences:
writer.writerow([
self.ms_to_mmss(sentence['start_ms']),
sentence['spk'],
sentence['text']
])
print(f"句子级别CSV转换完成!文件已保存到: {output_csv_path}")
return output_csv_path
except Exception as e:
print(f"转换过程中发生错误: {str(e)}")
return None
def batch_llm_filter(self, sentences, batch_size=50):
if not sentences:
return sentences
filtered_sentences = []
for i in range(0, len(sentences), batch_size):
batch = sentences[i:i + batch_size]
batch_text = ""
for j, sentence in enumerate(batch):
batch_text += f"{sentence['spk']},{sentence['start_ms']},{sentence['end_ms']},\"{sentence['text']}\"\n"
try:
filtered_text = self.llm_filter(batch_text)
filtered_lines = filtered_text.strip().split('\n')
for k, line in enumerate(filtered_lines):
if line.strip():
try:
csv_reader = csv.reader(io.StringIO(line))
row = next(csv_reader, None)
if row and len(row) >= 4:
spk = row[0].strip()
start_ms = row[1].strip()
end_ms = row[2].strip()
text = row[3].strip()
filtered_sentences.append({'spk': spk, 'start_ms': start_ms, 'end_ms': end_ms, 'text': text})
else:
if k < len(batch):
filtered_sentences.append(batch[k])
except:
if k < len(batch):
filtered_sentences.append(batch[k])
except:
filtered_sentences.extend(batch)
print(f"已处理批次 {i // batch_size + 1}/{(len(sentences) + batch_size - 1) // batch_size}")
return filtered_sentences
def llm_filter(self, text):
client = self.llm
prompt_content = f"""修复课堂文本,只删语气词,不改内容,保持CSV格式:
{text}
"""
response_content = client.chat.completions.create(
model="deepseek-v3",
messages=[{"role": "system", "content": "你是文本修复助手"}, {"role": "user", "content": prompt_content}],
temperature=0.1,
)
return response_content.choices[0].message.content
def _merge_paragraphs(self, sentences):
if not sentences:
return []
pause_threshold_ms = 2000
merged_sentences = []
current_merged_sentence = sentences[0].copy()
for i in range(1, len(sentences)):
next_sentence = sentences[i]
try:
pause_duration = int(next_sentence['start_ms']) - int(current_merged_sentence['end_ms'])
except:
merged_sentences.append(current_merged_sentence)
current_merged_sentence = next_sentence.copy()
continue
if current_merged_sentence['spk'] == next_sentence['spk'] and pause_duration <= pause_threshold_ms:
current_merged_sentence['text'] += next_sentence['text']
current_merged_sentence['end_ms'] = next_sentence['end_ms']
else:
merged_sentences.append(current_merged_sentence)
current_merged_sentence = next_sentence.copy()
merged_sentences.append(current_merged_sentence)
return merged_sentences
def batch_llm_identity_activity(self, sentences, batch_size=50):
return sentences
if __name__ == '__main__':
APPID = "你的讯飞APPID"
SECRET_KEY = "你的讯飞密钥"
LLM_API_KEY = "你的大模型KEY"
VIDEO_FILE_PATH = r"test.mp4"
print("=== 开始处理视频 ===")
audio_processor = audio_to_data(APPID, SECRET_KEY, VIDEO_FILE_PATH, LLM_API_KEY)
processed_result = audio_processor.process_audio_file()
if processed_result:
print("\n################# 最终结果(文件路径) #################")
print("cv_output_path:", processed_result[1])
print("csv_file", processed_result[2])
print("nlp_json", processed_result[3])
print("\n\n==================== 完整课堂对话(老师/学生)====================")
for sentence in audio_processor.sentences_activity:
role = sentence["spk"]
start_ms = sentence["start_ms"]
end_ms = sentence["end_ms"]
text = sentence["text"]
print(f"{role},{start_ms},{end_ms},\"{text}\"")
3.执行流程日志
C:\Users\Dell\AppData\Local\Programs\Python\Python39\python.exe D:\software\Pycharm\语音转文字\main.py
=== 开始处理视频 ===
MoviePy - Writing audio in audio\test.wav
MoviePy - Done.
--- 上传部分 ---
upload参数: {'appId': '****', 'signa': '****************', 'ts': '1779112910', 'fileSize': 81830274, 'fileName': 'test.wav', 'duration': '200', 'pd': 'edu', 'roleType': '1', 'roleNum': '2'}
upload_url: https://raasr.xfyun.cn/v2/api/upload?appId=*****&signa=*****%3D&ts=1779112910&fileSize=81830274&fileName=test.wav&duration=200&pd=edu&roleType=1&roleNum=2
upload resp: {'code': '000000', 'descInfo': 'success', 'content': {'orderId': 'DKHJQ20260518220204025gSdbNohiGxbNOiSm', 'taskEstimateTime': 28000}}
--- 查询部分 ---
get result参数: {'appId': '*****', 'signa': 'TOD36x2kcft4A29nz24VU7CoE48=', 'ts': '1779112910', 'orderId': 'DKHJQ20260518220204025gSdbNohiGxbNOiSm', 'resultType': 'transfer,predict'}
status=3, desc: success
status=3, desc: success
status=3, desc: success
status=3, desc: success
status=-1, desc: success
--- 正在处理并保存结果 -> result\test_processed.json ---
已处理批次 1/3
已处理批次 2/3
已处理批次 3/3
已处理批次 1/1
句子级别CSV转换完成!文件已保存到: result\test_processed_sentences.csv
日志说明:
视频成功转音频
讯飞识别上传成功
任务完成(status=-1)
大模型文本优化完成
CSV/JSON 输出成功
4.输出文件
系统自动生成 3 种标准格式:
test_processed.json → 原始识别结构化数据
test_processed_sentences.csv → 表格格式(可导入 Excel)
test_processed_cv.json → 模型输入标准格式

################# 最终结果(文件路径) #################
cv_output_path: result\test_processed_sentences.csv
csv_file result\test_processed_cv.json
nlp_json result\test_processed.json
5.控制台输出
1.对话(标准 CSV 格式)
老师,7850,18390,"立正立正老师好,早,好。好,同学们好,请坐。本节课呢我们来学习斯大林主义,"
老师,35160,38590,"啊大家在看体育比赛的时候,"
学生,38590,39160,"嗯"
老师,39390,47250,"在关键时刻总是会出现屏住呼吸,特别紧张是吧?"
...
2. API返回格式(200)
{
"code": 200,
"data": {
"1": "result\\test_processed_sentences.csv",
"2": "result\\test_processed_cv.json",
"3": "result\\test_processed.json"
},
"message": "success"
}
3.处理状态(processing/completed)
{
"code": 200,
"message": "处理中,请稍候...",
"status": "processing"
}
{
"code": 200,
"data": {
"csv_file": "result\\test_processed_cv.json",
"cv_output_path": "result\\test_processed_sentences.csv",
"nlp_json": "result\\test_processed.json"
},
"message": "处理完成",
"status": "completed"
}
6.适用场景
智慧课堂
教学质量评估
教师听评课自动化
课堂行为分析
教育大数据采集
这是一个全自动课堂视频分析工具,能够将课堂录像自动转化为结构化、高质量、带角色、带活动标注的教学文本数据,实现课堂教学过程的数字化与智能化分析。
更多推荐

所有评论(0)