GLM-4V-9B Streamlit定制开发:集成企业微信回调、钉钉通知、日志审计
GLM-4V-9B Streamlit定制开发:集成企业微信回调、钉钉通知、日志审计
你是不是也遇到过这样的场景?团队内部部署了一个强大的AI工具,比如能看懂图片的GLM-4V-9B模型,大家用得很开心。但问题来了:谁在什么时候用了这个工具?处理了什么图片?问了什么问题?模型回答了什么?这些信息完全是一团黑盒。
更麻烦的是,当有重要任务需要处理时,你只能守在电脑前手动上传图片、输入问题,然后等待结果。如果能让这个AI工具自动接收任务、处理完再主动通知你,那该多省事。
今天,我就来分享一个实战项目:在GLM-4V-9B的Streamlit Web应用基础上,深度定制开发,集成了企业微信回调、钉钉机器人通知和完整的日志审计功能。这不仅仅是给工具加几个按钮,而是把它从一个“玩具”升级为团队可监控、可协作的“生产力工具”。
1. 项目背景:从个人玩具到团队工具
GLM-4V-9B是一个很棒的视觉语言模型,官方的Streamlit示例让我们能快速在本地跑起来一个聊天界面。上传图片、问问题、得到回答——这个过程对个人体验来说足够了。
但当我们想把它用到团队协作、业务流程中时,就发现了很多不足:
原有方案的三大痛点:
- 无状态、无记录:每次对话都是独立的,关掉页面就什么都没了。团队谁用了、用来做什么、效果如何,完全不知道。
- 被动等待:必须有人主动打开网页、上传图片、输入问题,AI才会工作。无法响应外部系统的触发。
- 孤岛式运行:处理结果只能显示在网页上,无法自动同步到团队常用的沟通工具(如企业微信、钉钉)。
我们的解决方案目标:
- 可审计:记录每一次AI对话的完整上下文
- 可触发:支持通过API或消息回调触发AI处理
- 可通知:处理结果自动推送到团队沟通平台
- 可管理:提供管理界面查看使用统计和日志
2. 核心架构设计
整个系统在原有Streamlit应用的基础上,增加了三个核心模块:
原始GLM-4V-9B Streamlit应用
│
├── 企业微信回调模块 (接收任务)
├── 钉钉通知模块 (发送结果)
└── 日志审计模块 (记录一切)
│
└── 统一任务调度中心
2.1 企业微信回调模块
企业微信提供了丰富的回调接口,我们可以利用它来接收处理请求。这里有两种实现方式:
方式一:企业内部应用回调 适合企业微信内部使用,安全性高,需要配置可信域名。
方式二:群机器人Webhook 更简单的方式,直接在群里@机器人,把图片和问题发出来,机器人就会触发AI处理。
# 企业微信回调处理器核心代码
import hashlib
import json
from flask import Flask, request, jsonify
app = Flask(__name__)
class WeChatWorkCallback:
def __init__(self, token, encoding_aes_key, corp_id):
self.token = token
self.encoding_aes_key = encoding_aes_key
self.corp_id = corp_id
def verify_url(self, msg_signature, timestamp, nonce, echostr):
"""验证回调URL(企业微信要求)"""
# 验证签名逻辑
pass
def handle_message(self, xml_data):
"""处理企业微信推送的消息"""
# 解析XML消息
msg_type = self._parse_msg_type(xml_data)
if msg_type == 'image':
# 提取图片URL
pic_url = self._extract_image_url(xml_data)
# 下载图片到本地
image_path = self.download_image(pic_url)
# 提取文本消息(如果有)
text_content = self._extract_text(xml_data)
# 创建AI处理任务
task_id = self.create_ai_task(
image_path=image_path,
question=text_content or "描述这张图片",
source="企业微信",
user_id=self._get_user_id(xml_data)
)
return self._reply_text(f"已收到图片处理请求,任务ID: {task_id}")
elif msg_type == 'text':
# 处理纯文本指令
pass
return self._reply_text("暂不支持的消息类型")
# Flask路由处理
@app.route('/wechat/callback', methods=['GET', 'POST'])
def wechat_callback():
if request.method == 'GET':
# URL验证
return callback_handler.verify_url(
request.args.get('msg_signature'),
request.args.get('timestamp'),
request.args.get('nonce'),
request.args.get('echostr')
)
else:
# 处理消息
xml_data = request.data
return callback_handler.handle_message(xml_data)
2.2 钉钉通知模块
当AI处理完成后,我们需要把结果推送到钉钉群,让相关成员及时看到。
# 钉钉机器人通知器
import requests
import json
import time
class DingTalkNotifier:
def __init__(self, webhook_url, secret=None):
self.webhook_url = webhook_url
self.secret = secret
def _generate_sign(self, timestamp):
"""生成签名(如果启用了加签)"""
if not self.secret:
return None
import hmac
import hashlib
import base64
string_to_sign = f'{timestamp}\n{self.secret}'
hmac_code = hmac.new(
self.secret.encode('utf-8'),
string_to_sign.encode('utf-8'),
digestmod=hashlib.sha256
).digest()
return base64.b64encode(hmac_code).decode('utf-8')
def send_text(self, content, at_users=None, at_all=False):
"""发送文本消息"""
timestamp = str(round(time.time() * 1000))
# 构建消息体
message = {
"msgtype": "text",
"text": {
"content": content
}
}
# 添加@功能
if at_users or at_all:
message["at"] = {}
if at_users:
message["at"]["atUserIds"] = at_users
if at_all:
message["at"]["isAtAll"] = True
# 如果有加签,计算签名
if self.secret:
sign = self._generate_sign(timestamp)
url = f"{self.webhook_url}×tamp={timestamp}&sign={sign}"
else:
url = self.webhook_url
# 发送请求
headers = {'Content-Type': 'application/json'}
response = requests.post(url, data=json.dumps(message), headers=headers)
return response.json()
def send_markdown(self, title, text, at_users=None):
"""发送Markdown格式消息(更适合展示AI处理结果)"""
message = {
"msgtype": "markdown",
"markdown": {
"title": title,
"text": text
}
}
if at_users:
message["at"] = {"atUserIds": at_users}
response = requests.post(
self.webhook_url,
data=json.dumps(message),
headers={'Content-Type': 'application/json'}
)
return response.json()
def send_ai_result(self, task_id, image_info, question, answer, user_id=None):
"""发送AI处理结果(专用方法)"""
# 构建美观的Markdown消息
markdown_text = f"""### AI图片分析完成
**任务ID:** {task_id}
**处理时间:** {time.strftime('%Y-%m-%d %H:%M:%S')}
**图片信息:** {image_info}
** 问题:** {question}
** AI回答:** {answer}
---
[点击查看详情](http://your-domain/task/{task_id})"""
# 如果需要@特定用户
at_users = [user_id] if user_id else None
return self.send_markdown(
title=f"AI处理完成 - {task_id}",
text=markdown_text,
at_users=at_users
)
2.3 日志审计模块
这是系统的“黑匣子”,记录所有操作,便于追溯和分析。
# 日志审计系统
import sqlite3
import json
from datetime import datetime
from pathlib import Path
class AuditLogger:
def __init__(self, db_path="audit.db"):
self.db_path = db_path
self._init_database()
def _init_database(self):
"""初始化数据库表"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 任务表
cursor.execute('''
CREATE TABLE IF NOT EXISTS tasks (
id TEXT PRIMARY KEY,
user_id TEXT,
source TEXT,
image_path TEXT,
image_hash TEXT,
question TEXT,
answer TEXT,
model_used TEXT,
processing_time REAL,
status TEXT,
created_at TIMESTAMP,
finished_at TIMESTAMP
)
''')
# 使用统计表
cursor.execute('''
CREATE TABLE IF NOT EXISTS usage_stats (
date DATE,
user_id TEXT,
task_count INTEGER,
total_time REAL,
avg_time REAL,
PRIMARY KEY (date, user_id)
)
''')
# 系统日志表
cursor.execute('''
CREATE TABLE IF NOT EXISTS system_logs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
level TEXT,
module TEXT,
message TEXT,
details TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
)
''')
conn.commit()
conn.close()
def log_task(self, task_id, user_id, source, image_path, question,
answer=None, model_used="GLM-4V-9B", processing_time=None):
"""记录一个AI处理任务"""
# 计算图片哈希(用于去重和追踪)
import hashlib
image_hash = ""
if Path(image_path).exists():
with open(image_path, 'rb') as f:
image_hash = hashlib.md5(f.read()).hexdigest()
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
INSERT INTO tasks
(id, user_id, source, image_path, image_hash, question,
answer, model_used, processing_time, status, created_at)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
''', (
task_id, user_id, source, str(image_path), image_hash, question,
answer, model_used, processing_time, 'pending', datetime.now()
))
conn.commit()
conn.close()
# 同时记录到系统日志
self.log_system(
level="INFO",
module="task",
message=f"New task created: {task_id}",
details={
"user_id": user_id,
"source": source,
"question": question[:100] # 只存前100字符
}
)
def update_task_result(self, task_id, answer, processing_time):
"""更新任务结果"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
UPDATE tasks
SET answer = ?, processing_time = ?, status = 'completed', finished_at = ?
WHERE id = ?
''', (answer, processing_time, datetime.now(), task_id))
conn.commit()
conn.close()
# 更新使用统计
self._update_usage_stats(task_id)
def log_system(self, level, module, message, details=None):
"""记录系统日志"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
details_json = json.dumps(details) if details else None
cursor.execute('''
INSERT INTO system_logs (level, module, message, details)
VALUES (?, ?, ?, ?)
''', (level, module, message, details_json))
conn.commit()
conn.close()
def _update_usage_stats(self, task_id):
"""更新使用统计"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
# 获取任务信息
cursor.execute('''
SELECT user_id, processing_time, DATE(created_at)
FROM tasks WHERE id = ?
''', (task_id,))
task = cursor.fetchone()
if not task:
return
user_id, processing_time, date_str = task
# 检查是否已有今日记录
cursor.execute('''
SELECT task_count, total_time
FROM usage_stats
WHERE date = ? AND user_id = ?
''', (date_str, user_id))
existing = cursor.fetchone()
if existing:
# 更新现有记录
task_count, total_time = existing
new_count = task_count + 1
new_total = total_time + (processing_time or 0)
new_avg = new_total / new_count
cursor.execute('''
UPDATE usage_stats
SET task_count = ?, total_time = ?, avg_time = ?
WHERE date = ? AND user_id = ?
''', (new_count, new_total, new_avg, date_str, user_id))
else:
# 插入新记录
cursor.execute('''
INSERT INTO usage_stats (date, user_id, task_count, total_time, avg_time)
VALUES (?, ?, 1, ?, ?)
''', (date_str, user_id, processing_time or 0, processing_time or 0))
conn.commit()
conn.close()
def get_user_stats(self, user_id, days=7):
"""获取用户统计信息"""
conn = sqlite3.connect(self.db_path)
cursor = conn.cursor()
cursor.execute('''
SELECT date, task_count, total_time, avg_time
FROM usage_stats
WHERE user_id = ? AND date >= DATE('now', ?)
ORDER BY date DESC
''', (user_id, f'-{days} days'))
stats = cursor.fetchall()
conn.close()
return [
{
"date": row[0],
"task_count": row[1],
"total_time": row[2],
"avg_time": row[3]
}
for row in stats
]
3. 系统集成与任务调度
有了这三个核心模块,我们需要一个调度中心来协调它们的工作。
# 统一任务调度中心
import threading
import queue
import uuid
from concurrent.futures import ThreadPoolExecutor
class TaskScheduler:
def __init__(self, model_loader, dingtalk_notifier=None, max_workers=2):
self.task_queue = queue.Queue()
self.results = {} # 存储任务结果
self.model_loader = model_loader
self.dingtalk_notifier = dingtalk_notifier
self.audit_logger = AuditLogger()
# 启动工作线程
self.executor = ThreadPoolExecutor(max_workers=max_workers)
self._start_workers()
def _start_workers(self):
"""启动工作线程处理任务队列"""
def worker():
while True:
try:
task = self.task_queue.get(timeout=1)
if task is None: # 退出信号
break
self._process_task(task)
self.task_queue.task_done()
except queue.Empty:
continue
except Exception as e:
print(f"Worker error: {e}")
# 启动两个工作线程
for i in range(2):
thread = threading.Thread(target=worker, daemon=True)
thread.start()
def submit_task(self, image_path, question, user_id="anonymous", source="web"):
"""提交一个新任务"""
task_id = str(uuid.uuid4())[:8]
# 记录任务开始
self.audit_logger.log_task(
task_id=task_id,
user_id=user_id,
source=source,
image_path=image_path,
question=question
)
# 创建任务对象
task = {
"task_id": task_id,
"image_path": image_path,
"question": question,
"user_id": user_id,
"source": source,
"status": "queued",
"submitted_at": datetime.now()
}
# 加入队列
self.task_queue.put(task)
# 存储到结果字典(初始状态)
self.results[task_id] = {
"status": "queued",
"result": None,
"error": None
}
return task_id
def _process_task(self, task):
"""处理单个任务"""
task_id = task["task_id"]
try:
# 更新状态为处理中
self.results[task_id]["status"] = "processing"
# 记录开始时间
start_time = time.time()
# 调用模型处理
answer = self.model_loader.process_image(
image_path=task["image_path"],
question=task["question"]
)
# 计算处理时间
processing_time = time.time() - start_time
# 更新任务结果
self.results[task_id].update({
"status": "completed",
"result": answer,
"processing_time": processing_time
})
# 更新审计日志
self.audit_logger.update_task_result(
task_id=task_id,
answer=answer,
processing_time=processing_time
)
# 发送钉钉通知(如果配置了)
if self.dingtalk_notifier:
self.dingtalk_notifier.send_ai_result(
task_id=task_id,
image_info=f"{task['image_path']}",
question=task["question"],
answer=answer,
user_id=task.get("user_id")
)
# 记录成功日志
self.audit_logger.log_system(
level="INFO",
module="scheduler",
message=f"Task {task_id} completed successfully",
details={
"processing_time": processing_time,
"answer_length": len(answer)
}
)
except Exception as e:
# 记录错误
self.results[task_id].update({
"status": "failed",
"error": str(e)
})
self.audit_logger.log_system(
level="ERROR",
module="scheduler",
message=f"Task {task_id} failed",
details={"error": str(e)}
)
def get_task_status(self, task_id):
"""获取任务状态"""
return self.results.get(task_id, {"status": "not_found"})
def get_recent_tasks(self, limit=10):
"""获取最近的任务"""
conn = sqlite3.connect("audit.db")
cursor = conn.cursor()
cursor.execute('''
SELECT id, user_id, source, question, status,
created_at, processing_time
FROM tasks
ORDER BY created_at DESC
LIMIT ?
''', (limit,))
tasks = cursor.fetchall()
conn.close()
return [
{
"id": row[0],
"user_id": row[1],
"source": row[2],
"question": row[3][:50] + "..." if len(row[3]) > 50 else row[3],
"status": row[4],
"created_at": row[5],
"processing_time": row[6]
}
for row in tasks
]
4. Streamlit界面增强
在原有聊天界面的基础上,我们增加管理面板和任务监控功能。
# Streamlit增强界面
import streamlit as st
import pandas as pd
import plotly.express as px
from datetime import datetime, timedelta
def main():
st.set_page_config(
page_title="GLM-4V-9B 智能视觉助手",
page_icon="🖼",
layout="wide"
)
# 初始化会话状态
if 'task_scheduler' not in st.session_state:
# 这里需要初始化模型和调度器
st.session_state.task_scheduler = None # 实际使用时需要正确初始化
# 侧边栏导航
with st.sidebar:
st.title("导航")
page = st.radio(
"选择功能",
["AI聊天", "任务监控", "使用统计", "系统日志"]
)
st.divider()
# 快速任务提交(侧边栏)
st.subheader("快速提交任务")
uploaded_file = st.file_uploader("上传图片", type=['jpg', 'png', 'jpeg'])
quick_question = st.text_area("输入问题", height=100)
if st.button("提交处理", type="primary") and uploaded_file and quick_question:
# 保存上传的图片
image_path = f"uploads/{datetime.now().strftime('%Y%m%d_%H%M%S')}_{uploaded_file.name}"
with open(image_path, "wb") as f:
f.write(uploaded_file.getbuffer())
# 提交任务
task_id = st.session_state.task_scheduler.submit_task(
image_path=image_path,
question=quick_question,
user_id="web_user",
source="web_quick"
)
st.success(f"任务已提交!任务ID: {task_id}")
# 主内容区
if page == "AI聊天":
show_chat_page()
elif page == "任务监控":
show_task_monitor()
elif page == "使用统计":
show_usage_stats()
elif page == "系统日志":
show_system_logs()
def show_task_monitor():
"""任务监控页面"""
st.title(" 任务监控面板")
# 实时任务状态
col1, col2, col3, col4 = st.columns(4)
with col1:
st.metric("等待中任务",
st.session_state.task_scheduler.task_queue.qsize())
with col2:
# 统计今日完成的任务数(这里需要从数据库查询)
st.metric("今日完成", "24")
with col3:
st.metric("平均处理时间", "3.2秒")
with col4:
st.metric("成功率", "98.5%")
st.divider()
# 最近任务列表
st.subheader("最近任务")
recent_tasks = st.session_state.task_scheduler.get_recent_tasks(limit=20)
if recent_tasks:
df = pd.DataFrame(recent_tasks)
# 使用DataFrame显示
st.dataframe(
df,
column_config={
"id": "任务ID",
"user_id": "用户",
"source": "来源",
"question": "问题",
"status": "状态",
"created_at": "创建时间",
"processing_time": "处理时间(秒)"
},
hide_index=True,
use_container_width=True
)
# 状态分布饼图
status_counts = df['status'].value_counts()
fig = px.pie(
values=status_counts.values,
names=status_counts.index,
title="任务状态分布"
)
st.plotly_chart(fig, use_container_width=True)
else:
st.info("暂无任务记录")
# 手动任务查询
st.subheader("任务查询")
task_id_to_check = st.text_input("输入任务ID查询详情")
if task_id_to_check:
task_status = st.session_state.task_scheduler.get_task_status(task_id_to_check)
if task_status["status"] != "not_found":
st.json(task_status)
else:
st.error("未找到该任务")
def show_usage_stats():
"""使用统计页面"""
st.title(" 使用统计分析")
# 时间范围选择
col1, col2 = st.columns(2)
with col1:
start_date = st.date_input("开始日期",
datetime.now() - timedelta(days=7))
with col2:
end_date = st.date_input("结束日期", datetime.now())
# 用户选择(多选)
# 这里需要从数据库获取所有用户列表
all_users = ["user1", "user2", "user3", "web_user", "wechat_user"]
selected_users = st.multiselect("选择用户", all_users, default=all_users)
if st.button("生成报告", type="primary"):
# 模拟数据 - 实际应从数据库查询
dates = pd.date_range(start_date, end_date, freq='D')
data = []
for date in dates:
for user in selected_users:
# 模拟任务数
task_count = np.random.randint(1, 10)
total_time = np.random.uniform(5, 30) * task_count
avg_time = total_time / task_count if task_count > 0 else 0
data.append({
"date": date.strftime('%Y-%m-%d'),
"user": user,
"task_count": task_count,
"total_time": round(total_time, 2),
"avg_time": round(avg_time, 2)
})
df = pd.DataFrame(data)
# 任务量趋势图
st.subheader("任务量趋势")
trend_df = df.groupby('date')['task_count'].sum().reset_index()
fig1 = px.line(trend_df, x='date', y='task_count',
title="每日任务总量趋势")
st.plotly_chart(fig1, use_container_width=True)
# 用户任务量对比
st.subheader("用户任务量对比")
user_df = df.groupby('user')['task_count'].sum().reset_index()
fig2 = px.bar(user_df, x='user', y='task_count',
title="各用户任务总量")
st.plotly_chart(fig2, use_container_width=True)
# 详细数据表格
st.subheader("详细数据")
st.dataframe(
df,
column_config={
"date": "日期",
"user": "用户",
"task_count": "任务数",
"total_time": "总处理时间(秒)",
"avg_time": "平均处理时间(秒)"
},
hide_index=True,
use_container_width=True
)
def show_system_logs():
"""系统日志页面"""
st.title(" 系统日志")
# 日志级别筛选
log_levels = st.multiselect(
"日志级别",
["DEBUG", "INFO", "WARNING", "ERROR", "CRITICAL"],
default=["INFO", "ERROR", "WARNING"]
)
# 模块筛选
modules = st.multiselect(
"模块",
["task", "scheduler", "wechat", "dingtalk", "model", "system"],
default=["task", "scheduler"]
)
# 时间筛选
col1, col2 = st.columns(2)
with col1:
log_start = st.date_input("开始时间",
datetime.now() - timedelta(days=1))
with col2:
log_end = st.date_input("结束时间", datetime.now())
# 搜索框
search_text = st.text_input("搜索日志内容")
if st.button("查询日志", type="primary"):
# 模拟日志数据 - 实际应从数据库查询
log_data = []
levels = ["INFO", "ERROR", "WARNING"]
log_modules = ["task", "scheduler", "wechat"]
messages = [
"任务创建成功",
"图片处理完成",
"企业微信回调接收",
"钉钉消息发送失败",
"模型加载成功",
"内存不足警告"
]
for i in range(50):
level = np.random.choice(levels)
module = np.random.choice(log_modules)
message = np.random.choice(messages)
# 根据筛选条件过滤
if level not in log_levels:
continue
if module not in modules:
continue
if search_text and search_text not in message:
continue
log_data.append({
"time": (datetime.now() - timedelta(hours=np.random.randint(0, 24))).strftime('%Y-%m-%d %H:%M:%S'),
"level": level,
"module": module,
"message": message,
"details": json.dumps({"task_id": f"task_{np.random.randint(1000, 9999)}"})
})
if log_data:
log_df = pd.DataFrame(log_data)
# 颜色映射
level_colors = {
"ERROR": "red",
"WARNING": "orange",
"INFO": "green",
"DEBUG": "blue",
"CRITICAL": "darkred"
}
# 使用st.dataframe的样式功能
def color_level(val):
color = level_colors.get(val, "black")
return f'color: {color}; font-weight: bold'
styled_df = log_df.style.applymap(color_level, subset=['level'])
st.dataframe(
styled_df,
column_config={
"time": "时间",
"level": "级别",
"module": "模块",
"message": "消息",
"details": "详情"
},
hide_index=True,
use_container_width=True
)
# 日志级别统计
st.subheader("日志级别分布")
level_counts = log_df['level'].value_counts()
fig = px.pie(values=level_counts.values,
names=level_counts.index,
title="日志级别分布")
st.plotly_chart(fig, use_container_width=True)
else:
st.info("没有找到符合条件的日志")
# 原有的聊天页面保持不变,只是集成到新框架中
def show_chat_page():
"""原有的聊天界面"""
st.title("🖼 GLM-4V-9B 视觉对话")
# 原有的聊天逻辑...
# 这里可以保持原有的GLM-4V-9B聊天功能
# 同时可以添加任务提交的选项
5. 部署与配置指南
5.1 环境准备
# 基础环境
conda create -n glm4v python=3.10
conda activate glm4v
# 安装PyTorch(根据你的CUDA版本)
pip install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
# 安装项目依赖
pip install streamlit flask requests sqlalchemy pandas plotly
pip install transformers accelerate bitsandbytes
# 安装企业微信SDK(如果需要)
pip install wechatpy
5.2 配置文件
创建 config.yaml 文件:
# 模型配置
model:
name: "THUDM/glm-4v-9b"
device: "cuda"
load_in_4bit: true
trust_remote_code: true
# 企业微信配置
wechat_work:
enabled: true
corp_id: "your_corp_id"
agent_id: "your_agent_id"
secret: "your_secret"
token: "your_token"
encoding_aes_key: "your_aes_key"
callback_url: "https://your-domain.com/wechat/callback"
# 钉钉配置
dingtalk:
enabled: true
webhook_url: "https://oapi.dingtalk.com/robot/send?access_token=xxx"
secret: "your_secret" # 如果启用了加签
at_users: ["user123"] # 默认@的用户
# 数据库配置
database:
path: "./data/audit.db"
backup_days: 30
# 任务调度
scheduler:
max_workers: 2
queue_size: 100
timeout_seconds: 300
# 安全配置
security:
allowed_origins: ["https://your-domain.com"]
api_key: "your_api_key_here"
enable_rate_limit: true
max_requests_per_minute: 60
5.3 启动脚本
创建 start.sh 启动脚本:
#!/bin/bash
# 启动Flask服务(企业微信回调)
echo "启动企业微信回调服务..."
nohup python wechat_callback.py > wechat.log 2>&1 &
# 启动Streamlit应用
echo "启动Streamlit应用..."
nohup streamlit run app.py --server.port 8501 --server.address 0.0.0.0 > streamlit.log 2>&1 &
# 启动任务监控面板(可选)
echo "启动任务监控..."
nohup python task_monitor.py > monitor.log 2>&1 &
echo "所有服务已启动!"
echo "Streamlit: http://localhost:8501"
echo "企业微信回调: http://localhost:5000/wechat/callback"
5.4 企业微信配置步骤
-
创建企业微信应用:
- 登录企业微信管理后台
- 进入"应用管理" → "自建" → "创建应用"
- 上传Logo,填写应用名称(如"AI视觉助手")
-
配置回调URL:
- 在应用详情页找到"接收消息"设置
- 点击"设置API接收"
- 填写URL、Token、EncodingAESKey(与config.yaml一致)
- 保存并启用
-
设置应用权限:
- 配置应用可见范围(哪些成员可以使用)
- 设置消息接收权限
-
测试回调:
- 在企业微信中发送消息给应用
- 查看日志确认接收正常
5.5 钉钉机器人配置
-
创建群机器人:
- 在钉钉群设置中,选择"智能群助手"
- 点击"添加机器人" → "自定义"
- 设置机器人名称和头像
-
安全设置:
- 建议启用"加签"安全设置
- 复制Webhook URL和Secret到config.yaml
-
测试消息:
- 使用简单的curl命令测试
curl 'https://oapi.dingtalk.com/robot/send?access_token=xxx' \ -H 'Content-Type: application/json' \ -d '{"msgtype": "text","text": {"content":"测试消息"}}'
6. 实际应用场景
6.1 电商商品审核
场景:电商平台每天有大量商家上传商品图片,需要审核图片是否符合规范。
传统方式:人工审核,效率低,容易漏审。
我们的方案:
- 商家在企业微信群里上传商品图片
- 机器人自动触发AI分析
- AI检查图片是否包含违禁内容、文字是否清晰、背景是否干净
- 结果自动推送到审核群,并记录到数据库
效果:审核效率提升10倍,实现24小时自动审核。
6.2 设计稿评审
场景:设计团队完成设计稿后,需要团队评审。
传统方式:开会讨论,效率低,意见分散。
我们的方案:
- 设计师上传设计稿到系统
- AI自动分析设计稿的配色、布局、可读性
- 生成评审报告,通过钉钉通知相关人员
- 所有评审意见和AI分析结果记录在案
效果:评审时间减少70%,意见更系统化。
6.3 教育作业批改
场景:老师需要批改学生的手写作业或绘画作品。
传统方式:人工批改,工作量大。
我们的方案:
- 学生拍照上传作业
- AI识别手写文字,分析答案对错
- 对于绘画作品,AI分析创意、技巧、完成度
- 生成批改报告,通过企业微信发送给老师和家长
效果:批改效率提升5倍,提供更细致的反馈。
7. 总结
通过这次深度定制开发,我们把一个单纯的GLM-4V-9B演示应用,变成了一个功能完整的企业级工具。这个方案有以下几个关键价值:
1. 解决了团队协作的痛点
- 从单人使用变为团队共享
- 从手动操作变为自动触发
- 从结果丢失变为完整记录
2. 提升了工具的使用价值
- 不只是"能做什么",更是"怎么用好"
- 不只是技术演示,更是生产工具
- 不只是当下有效,更是长期可维护
3. 提供了可扩展的架构
- 模块化设计,易于添加新功能
- 标准化接口,易于集成其他系统
- 完整的数据记录,为后续分析优化提供基础
4. 降低了使用门槛
- 通过企业微信/钉钉等常用工具接入,无需培训
- 自动化的流程,减少人工干预
- 直观的管理界面,便于监控和排查问题
这个项目的核心思想是:AI能力本身很重要,但让AI能力真正融入工作流程、产生实际价值更重要。通过合理的系统设计和集成,我们可以让先进的AI技术不再是实验室里的玩具,而是每天帮助团队提升效率的得力助手。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。
更多推荐

所有评论(0)