Qwen-Image-Edit与Python集成:自动化图像处理脚本开发
Qwen-Image-Edit与Python集成:自动化图像处理脚本开发
想象一下,你手头有几百张商品图片需要批量处理——有的要换个背景,有的要加上促销文字,还有的要调整风格。如果一张张手动操作,估计得加班到深夜。但现在,有了Qwen-Image-Edit和Python,这些繁琐的工作可以完全交给代码自动完成。
今天我就来分享如何用Python调用Qwen-Image-Edit的API,搭建一套自动化图像处理流水线。这套方案特别适合电商运营、内容创作者、自媒体团队,能帮你把重复性的修图工作变成一键操作,效率提升可不是一点点。
1. 准备工作:环境搭建与API配置
在开始写代码之前,我们需要先把基础环境准备好。整个过程其实很简单,跟着步骤走就行。
1.1 安装必要的Python库
首先打开你的命令行工具,创建一个新的项目文件夹,然后安装几个必需的库:
# 创建项目目录
mkdir qwen-image-automation
cd qwen-image-automation
# 安装核心库
pip install dashscope requests pillow python-dotenv
这里安装的库各有各的用处:
dashscope:这是阿里云百炼平台的官方SDK,用来调用Qwen-Image-Edit的APIrequests:用来下载API生成的图片pillow:Python的图像处理库,用来处理本地图片python-dotenv:管理环境变量,保护你的API密钥
1.2 获取API密钥
要使用Qwen-Image-Edit的API,你需要一个API密钥。这个密钥就像是你使用服务的通行证。
- 访问阿里云百炼平台(https://bailian.console.aliyun.com/)
- 注册或登录你的账号
- 在控制台找到“API密钥管理”
- 创建一个新的API密钥,把它复制下来
重要提醒:API密钥是你的个人凭证,千万不要直接写在代码里,更不要上传到公开的代码仓库。我们接下来会用安全的方式来管理它。
1.3 配置环境变量
在项目根目录创建一个名为.env的文件,内容如下:
# .env 文件
DASHSCOPE_API_KEY=你的API密钥
然后在同一目录下创建一个.gitignore文件,确保.env不会被上传到Git:
# .gitignore
.env
__pycache__/
*.pyc
outputs/
这样配置之后,你的API密钥就安全了,代码也能正常读取到它。
2. 基础API调用:从单张图片编辑开始
我们先从一个最简单的例子开始,看看怎么用Python调用API编辑一张图片。这样你能快速看到效果,建立信心。
2.1 编写第一个编辑脚本
创建一个名为basic_edit.py的文件,输入以下代码:
# basic_edit.py
import os
import json
from dotenv import load_dotenv
from dashscope import MultiModalConversation
import dashscope
# 加载环境变量
load_dotenv()
def edit_single_image(image_url, prompt, output_size="1024*1024"):
"""
编辑单张图片的基础函数
参数:
image_url: 输入图片的URL
prompt: 编辑指令,比如"把背景换成海滩"
output_size: 输出图片尺寸,默认1024*1024
"""
# 设置API基础URL(中国北京地域)
dashscope.base_http_api_url = 'https://dashscope.aliyuncs.com/api/v1'
# 从环境变量获取API密钥
api_key = os.getenv("DASHSCOPE_API_KEY")
if not api_key:
print("错误:请先在.env文件中设置DASHSCOPE_API_KEY")
return None
# 构建消息内容
messages = [
{
"role": "user",
"content": [
{"image": image_url},
{"text": prompt}
]
}
]
print(f"正在处理图片: {image_url}")
print(f"编辑指令: {prompt}")
try:
# 调用API
response = MultiModalConversation.call(
api_key=api_key,
model="qwen-image-edit-max", # 使用旗舰版模型
messages=messages,
stream=False,
n=1, # 生成1张图片
watermark=False, # 不添加水印
negative_prompt="模糊, 变形, 多余的手指", # 反向提示词,避免不想要的效果
prompt_extend=True, # 开启提示词智能优化
size=output_size,
)
# 检查响应状态
if response.status_code == 200:
# 获取生成的图片URL
image_url = response.output.choices[0].message.content[0]['image']
print(f" 图片生成成功!")
print(f"图片URL: {image_url}")
return image_url
else:
print(f" API调用失败")
print(f"错误码: {response.code}")
print(f"错误信息: {response.message}")
return None
except Exception as e:
print(f" 发生异常: {str(e)}")
return None
# 测试函数
if __name__ == "__main__":
# 使用示例图片(来自官方文档)
test_image_url = "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20250925/thtclx/input1.png"
test_prompt = "把背景换成海滩,人物保持原样"
result_url = edit_single_image(test_image_url, test_prompt)
if result_url:
print("\n 恭喜!你的第一个图像编辑脚本运行成功了!")
print("接下来可以尝试下载这张图片,或者修改提示词看看不同效果")
运行这个脚本,你会看到控制台输出生成的图片URL。虽然现在还不能直接下载,但至少证明API调用成功了。
2.2 下载生成的图片
光有URL还不够,我们需要把图片保存到本地。创建一个新的函数来处理下载:
# download_image.py
import requests
import os
from datetime import datetime
def download_image(image_url, save_dir="outputs", filename=None):
"""
下载图片到本地
参数:
image_url: 图片的URL
save_dir: 保存目录
filename: 文件名,如果为None则自动生成
"""
# 创建保存目录
os.makedirs(save_dir, exist_ok=True)
# 生成文件名
if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"edited_{timestamp}.png"
save_path = os.path.join(save_dir, filename)
try:
print(f"正在下载图片到: {save_path}")
# 设置较长的超时时间,因为图片可能比较大
response = requests.get(image_url, stream=True, timeout=300)
response.raise_for_status() # 检查HTTP状态码
# 写入文件
with open(save_path, 'wb') as f:
for chunk in response.iter_content(chunk_size=8192):
f.write(chunk)
print(f" 图片下载完成: {save_path}")
file_size = os.path.getsize(save_path) / 1024 # 转换为KB
print(f"文件大小: {file_size:.1f} KB")
return save_path
except requests.exceptions.RequestException as e:
print(f" 下载失败: {e}")
return None
except Exception as e:
print(f" 发生错误: {e}")
return None
# 整合到主流程中
def edit_and_download(image_url, prompt, output_size="1024*1024"):
"""完整的编辑+下载流程"""
# 第一步:编辑图片
print("=" * 50)
print("步骤1: 调用API编辑图片")
print("=" * 50)
edited_url = edit_single_image(image_url, prompt, output_size)
if not edited_url:
return None
# 第二步:下载图片
print("\n" + "=" * 50)
print("步骤2: 下载生成的图片")
print("=" * 50)
save_path = download_image(edited_url)
return save_path
# 测试完整流程
if __name__ == "__main__":
# 测试图片和提示词
test_cases = [
{
"url": "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20250925/thtclx/input1.png",
"prompt": "把背景换成阳光明媚的海滩,人物保持微笑",
"name": "海滩背景"
},
{
"url": "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20250925/iclsnx/input2.png",
"prompt": "把裙子颜色换成红色,保持原有款式",
"name": "红裙子"
}
]
for i, test_case in enumerate(test_cases, 1):
print(f"\n{'#' * 60}")
print(f"测试案例 {i}: {test_case['name']}")
print(f"{'#' * 60}")
save_path = edit_and_download(
test_case['url'],
test_case['prompt']
)
if save_path:
print(f" 案例{i}处理完成,图片保存在: {save_path}")
else:
print(f" 案例{i}处理失败")
print() # 空行分隔
运行这个脚本,你会看到完整的处理流程:调用API → 生成图片 → 下载保存。现在你已经掌握了最基础的用法。
3. 实战应用:搭建批量处理流水线
单张处理只是开始,真正的价值在于批量处理。我们来构建一个实用的批量处理系统。
3.1 批量处理本地图片
大多数时候,我们需要处理的是本地电脑上的图片。下面这个类能帮你批量处理整个文件夹的图片:
# batch_processor.py
import os
import glob
import time
from pathlib import Path
from basic_edit import edit_single_image
from download_image import download_image
class BatchImageProcessor:
"""批量图片处理器"""
def __init__(self, api_key=None):
"""
初始化处理器
参数:
api_key: API密钥,如果为None则从环境变量读取
"""
self.api_key = api_key
self.processed_count = 0
self.failed_count = 0
self.results = []
def upload_to_temp_url(self, image_path):
"""
模拟上传图片到临时URL
注意:实际生产环境需要将图片上传到云存储
这里为了简化,假设图片已经在公网可访问
"""
# 在实际项目中,你需要将图片上传到阿里云OSS、七牛云等
# 这里返回本地路径作为演示(实际不可用)
# 生产环境请替换为真实的上传逻辑
print(f" 注意:实际使用时需要将图片上传到公网可访问的存储")
print(f"图片路径: {image_path}")
return image_path # 这里需要替换为真实URL
def process_single_image(self, image_path, prompt, output_dir="batch_outputs"):
"""
处理单张图片
参数:
image_path: 本地图片路径
prompt: 编辑指令
output_dir: 输出目录
"""
print(f"\n处理图片: {os.path.basename(image_path)}")
try:
# 在实际应用中,这里需要上传图片到云存储获取URL
# image_url = self.upload_to_temp_url(image_path)
# 为了演示,我们使用一个示例URL
# 实际使用时请取消下面这行注释,并实现upload_to_temp_url函数
# edited_url = edit_single_image(image_url, prompt)
# 演示用:模拟处理过程
print(f"编辑指令: {prompt}")
print("模拟API调用中...")
time.sleep(1) # 模拟网络延迟
# 在实际项目中,这里会返回真实的编辑结果
# 为了演示,我们创建一个占位文件
os.makedirs(output_dir, exist_ok=True)
timestamp = int(time.time())
filename = f"batch_{timestamp}_{self.processed_count}.png"
save_path = os.path.join(output_dir, filename)
# 创建空文件作为演示
with open(save_path, 'w') as f:
f.write("这是模拟的输出文件")
self.processed_count += 1
self.results.append({
"original": image_path,
"output": save_path,
"prompt": prompt,
"status": "success"
})
print(f" 处理完成: {save_path}")
return save_path
except Exception as e:
print(f" 处理失败: {e}")
self.failed_count += 1
self.results.append({
"original": image_path,
"output": None,
"prompt": prompt,
"status": "failed",
"error": str(e)
})
return None
def process_folder(self, folder_path, prompt, file_pattern="*.jpg;*.png;*.jpeg"):
"""
处理整个文件夹的图片
参数:
folder_path: 文件夹路径
prompt: 编辑指令(可以包含{filename}占位符)
file_pattern: 文件匹配模式,用分号分隔
"""
print(f"开始处理文件夹: {folder_path}")
print(f"文件模式: {file_pattern}")
print(f"编辑指令: {prompt}")
print("-" * 50)
# 解析文件模式
patterns = [p.strip() for p in file_pattern.split(';')]
# 收集所有匹配的文件
all_files = []
for pattern in patterns:
search_pattern = os.path.join(folder_path, pattern)
files = glob.glob(search_pattern)
all_files.extend(files)
# 去重
all_files = list(set(all_files))
if not all_files:
print(" 没有找到匹配的图片文件")
return
print(f"找到 {len(all_files)} 张图片")
# 创建输出目录
timestamp = time.strftime("%Y%m%d_%H%M%S")
output_dir = f"batch_output_{timestamp}"
# 处理每张图片
for i, image_path in enumerate(all_files, 1):
filename = os.path.basename(image_path)
print(f"\n[{i}/{len(all_files)}] 处理: {filename}")
# 如果prompt中包含{filename},替换为实际文件名
actual_prompt = prompt.replace("{filename}", filename)
self.process_single_image(image_path, actual_prompt, output_dir)
# 添加延迟,避免API限流
time.sleep(0.5)
# 输出统计信息
print(f"\n{'='*60}")
print("批量处理完成!")
print(f"{'='*60}")
print(f"总图片数: {len(all_files)}")
print(f"成功处理: {self.processed_count}")
print(f"处理失败: {self.failed_count}")
print(f"输出目录: {output_dir}")
# 保存处理日志
self.save_log(output_dir)
def save_log(self, output_dir):
"""保存处理日志"""
log_file = os.path.join(output_dir, "processing_log.json")
import json
with open(log_file, 'w', encoding='utf-8') as f:
json.dump(self.results, f, ensure_ascii=False, indent=2)
print(f"处理日志已保存: {log_file}")
# 使用示例
if __name__ == "__main__":
# 创建处理器
processor = BatchImageProcessor()
# 示例:处理电商商品图
# 假设有一个文件夹包含商品图片
folder_path = "./product_images" # 替换为你的图片文件夹路径
# 如果文件夹不存在,创建示例文件夹
if not os.path.exists(folder_path):
os.makedirs(folder_path, exist_ok=True)
print(f"创建了示例文件夹: {folder_path}")
print("请将你的商品图片放入此文件夹,然后重新运行")
else:
# 批量处理指令
# {filename} 会被替换为实际文件名
prompt = "为{filename}添加白色背景,保持商品主体清晰"
processor.process_folder(
folder_path=folder_path,
prompt=prompt,
file_pattern="*.jpg;*.png"
)
这个批量处理器虽然简化了上传部分,但框架是完整的。在实际使用中,你需要实现图片上传到云存储的功能。
3.2 电商场景实战:商品图批量优化
电商运营经常需要处理大量商品图片。下面是一个更贴近实际需求的例子:
# ecommerce_processor.py
import os
import json
from batch_processor import BatchImageProcessor
class EcommerceImageProcessor(BatchImageProcessor):
"""电商图片专用处理器"""
def __init__(self):
super().__init__()
self.templates = {
"white_background": "将背景替换为纯白色,商品主体保持清晰",
"scene_background": "将背景替换为现代家居场景,保持商品在画面中央",
"add_logo": "在图片右下角添加品牌Logo,Logo大小为图片宽度的10%",
"add_text": "在图片底部添加文字'限时特价',使用红色醒目字体",
"multiple_angles": "生成商品的45度角视图,保持商品特征一致"
}
def process_product_variants(self, product_folder, operations):
"""
处理商品的多角度/多版本图片
参数:
product_folder: 商品图片文件夹
operations: 操作列表,如["white_background", "add_logo"]
"""
print(f"处理商品: {product_folder}")
# 收集所有图片
image_files = []
for ext in ['*.jpg', '*.png', '*.jpeg']:
image_files.extend(glob.glob(os.path.join(product_folder, ext)))
if not image_files:
print("没有找到商品图片")
return
# 为每张图片应用所有操作
for image_path in image_files:
filename = os.path.basename(image_path)
print(f"\n处理商品图: {filename}")
# 组合所有操作指令
combined_prompt = ",".join([self.templates[op] for op in operations])
# 处理图片
self.process_single_image(
image_path,
combined_prompt,
output_dir=f"product_outputs/{os.path.basename(product_folder)}"
)
def batch_process_catalog(self, catalog_file):
"""
根据商品目录批量处理
参数:
catalog_file: JSON格式的商品目录文件
"""
with open(catalog_file, 'r', encoding='utf-8') as f:
catalog = json.load(f)
for product in catalog:
product_id = product.get('id')
folder = product.get('folder')
operations = product.get('operations', ['white_background'])
if os.path.exists(folder):
print(f"\n{'='*60}")
print(f"处理商品 {product_id}: {folder}")
print(f"操作: {operations}")
self.process_product_variants(folder, operations)
else:
print(f" 文件夹不存在: {folder}")
# 创建商品目录示例
def create_sample_catalog():
"""创建示例商品目录"""
catalog = [
{
"id": "PROD001",
"name": "无线耳机",
"folder": "./products/headphones",
"operations": ["white_background", "add_logo"]
},
{
"id": "PROD002",
"name": "运动水杯",
"folder": "./products/water_bottle",
"operations": ["scene_background", "add_text"]
}
]
# 创建目录结构
for product in catalog:
os.makedirs(product['folder'], exist_ok=True)
# 保存目录文件
with open('product_catalog.json', 'w', encoding='utf-8') as f:
json.dump(catalog, f, ensure_ascii=False, indent=2)
print("示例商品目录已创建: product_catalog.json")
print("请将商品图片放入对应的文件夹中")
# 使用示例
if __name__ == "__main__":
# 创建示例目录
create_sample_catalog()
# 创建处理器
processor = EcommerceImageProcessor()
print("\n电商图片批量处理器已就绪")
print("可用操作模板:")
for key, desc in processor.templates.items():
print(f" {key}: {desc}")
# 开始处理
# processor.batch_process_catalog('product_catalog.json')
print("\n请将商品图片放入对应文件夹后,取消注释上面的代码行开始处理")
这个电商处理器提供了模板化的操作,你可以根据实际需求扩展更多的处理模板。
4. 高级技巧与最佳实践
掌握了基础用法后,我们来看看一些提升效率和效果的高级技巧。
4.1 提示词优化策略
好的提示词能显著提升编辑效果。这里有一些实用技巧:
# prompt_optimizer.py
class PromptOptimizer:
"""提示词优化器"""
@staticmethod
def optimize_for_editing(base_prompt, style="detailed"):
"""
优化编辑提示词
参数:
base_prompt: 基础提示词,如"换个背景"
style: 优化风格,可选"detailed"/"simple"/"creative"
"""
templates = {
"detailed": (
"请精确编辑图片,具体要求如下:{base_prompt}。"
"请保持图片其他部分不变,只修改指定区域。"
"确保编辑后的部分与原始图片自然融合。"
),
"simple": (
"{base_prompt},保持其他部分不变"
),
"creative": (
"基于以下创意进行编辑:{base_prompt}。"
"可以适当发挥创意,但保持图片整体协调。"
)
}
return templates.get(style, templates["simple"]).format(base_prompt=base_prompt)
@staticmethod
def add_negative_prompt(base_prompt, negative_items=None):
"""
添加反向提示词
参数:
base_prompt: 基础提示词
negative_items: 需要避免的内容列表
"""
if negative_items is None:
negative_items = ["模糊", "变形", "多余的手指", "不自然的边缘"]
negative_text = ",".join(negative_items)
return f"{base_prompt}。避免出现:{negative_text}"
@staticmethod
def for_specific_scenario(scenario, **kwargs):
"""
为特定场景生成优化提示词
参数:
scenario: 场景类型
**kwargs: 场景特定参数
"""
scenarios = {
"product_background": (
"将商品背景替换为{background},保持商品主体清晰、边缘锐利。"
"背景与商品要有适当的空间感,避免看起来像贴图。"
),
"text_replacement": (
"将图片中的文字'{old_text}'替换为'{new_text}',"
"保持原有的字体、大小、颜色和风格。"
"确保新文字与背景自然融合。"
),
"object_removal": (
"移除图片中的{object_to_remove},"
"用合理的背景内容填充该区域。"
"确保修复后的区域与周围环境协调一致。"
),
"style_transfer": (
"将图片风格转换为{target_style}风格,"
"保持图片的主要内容和构图不变。"
"色彩和纹理要符合目标风格的特征。"
)
}
template = scenarios.get(scenario)
if template:
return template.format(**kwargs)
else:
return f"执行以下编辑:{scenario}"
# 使用示例
if __name__ == "__main__":
optimizer = PromptOptimizer()
# 示例1:详细编辑提示词
simple_prompt = "把天空换成晚霞"
detailed_prompt = optimizer.optimize_for_editing(simple_prompt, "detailed")
print("详细提示词示例:")
print(f"原始: {simple_prompt}")
print(f"优化后: {detailed_prompt}")
print()
# 示例2:商品背景替换
product_prompt = optimizer.for_specific_scenario(
"product_background",
background="简约的白色工作室背景"
)
print("商品背景替换提示词:")
print(product_prompt)
print()
# 示例3:添加反向提示词
full_prompt = optimizer.add_negative_prompt(detailed_prompt)
print("带反向提示词的完整提示词:")
print(full_prompt)
4.2 错误处理与重试机制
在实际生产环境中,网络波动、API限流等问题时有发生。一个健壮的系统需要有完善的错误处理机制。
# error_handler.py
import time
import logging
from functools import wraps
# 配置日志
logging.basicConfig(
level=logging.INFO,
format='%(asctime)s - %(levelname)s - %(message)s',
handlers=[
logging.FileHandler('api_errors.log'),
logging.StreamHandler()
]
)
def retry_on_failure(max_retries=3, delay=2, backoff=2):
"""
重试装饰器
参数:
max_retries: 最大重试次数
delay: 初始延迟时间(秒)
backoff: 延迟倍数
"""
def decorator(func):
@wraps(func)
def wrapper(*args, **kwargs):
retries = 0
current_delay = delay
while retries <= max_retries:
try:
return func(*args, **kwargs)
except Exception as e:
retries += 1
if retries > max_retries:
logging.error(f"函数 {func.__name__} 重试{max_retries}次后仍失败: {e}")
raise
logging.warning(f"函数 {func.__name__} 第{retries}次失败: {e}")
logging.info(f"等待{current_delay}秒后重试...")
time.sleep(current_delay)
current_delay *= backoff # 指数退避
return None
return wrapper
return decorator
class APIErrorHandler:
"""API错误处理器"""
ERROR_CODES = {
"RateLimitExceeded": "请求频率超限,请稍后重试",
"InvalidParameter": "参数错误,请检查输入",
"ModelNotAvailable": "模型暂时不可用",
"ImageTooLarge": "图片尺寸过大",
"NetworkError": "网络连接失败"
}
@staticmethod
def handle_error(error_code, error_message, context=None):
"""
处理API错误
参数:
error_code: 错误代码
error_message: 错误信息
context: 上下文信息,用于记录
"""
user_message = APIErrorHandler.ERROR_CODES.get(
error_code,
f"处理失败: {error_message}"
)
# 记录错误详情
error_log = {
"timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
"error_code": error_code,
"error_message": error_message,
"context": context,
"user_message": user_message
}
logging.error(f"API错误: {error_log}")
# 根据错误类型采取不同策略
if error_code == "RateLimitExceeded":
# 频率限制,建议等待
wait_time = 60 # 等待60秒
logging.info(f"达到频率限制,等待{wait_time}秒")
time.sleep(wait_time)
return "retry"
elif error_code == "ModelNotAvailable":
# 模型不可用,尝试备用模型
logging.info("尝试使用备用模型")
return "use_fallback"
else:
# 其他错误,返回用户友好的消息
return user_message
@staticmethod
@retry_on_failure(max_retries=3, delay=2)
def safe_api_call(api_func, *args, **kwargs):
"""
安全的API调用包装器
参数:
api_func: API函数
*args, **kwargs: 函数参数
"""
try:
return api_func(*args, **kwargs)
except Exception as e:
# 这里可以根据具体的异常类型映射到错误代码
error_code = "NetworkError" if "connection" in str(e).lower() else "UnknownError"
handled = APIErrorHandler.handle_error(error_code, str(e), {
"function": api_func.__name__,
"args": args,
"kwargs": kwargs
})
if handled == "retry":
raise # 触发重试
else:
return None
# 使用示例
if __name__ == "__main__":
# 模拟一个可能失败的API调用
def unstable_api_call(success_rate=0.5):
import random
if random.random() < success_rate:
return "API调用成功"
else:
raise Exception("模拟的API错误")
# 使用错误处理器包装API调用
handler = APIErrorHandler()
print("测试错误处理机制...")
# 测试重试机制
result = handler.safe_api_call(unstable_api_call, success_rate=0.3)
print(f"结果: {result}")
# 测试错误处理
error_response = handler.handle_error(
"RateLimitExceeded",
"请求过于频繁",
{"api_endpoint": "qwen-image-edit"}
)
print(f"错误处理建议: {error_response}")
4.3 性能优化建议
当处理大量图片时,性能变得很重要。这里有一些优化建议:
# performance_optimizer.py
import concurrent.futures
import time
from queue import Queue
from threading import Thread
class ParallelProcessor:
"""并行处理器"""
def __init__(self, max_workers=3):
"""
初始化并行处理器
参数:
max_workers: 最大工作线程数
"""
self.max_workers = max_workers
self.task_queue = Queue()
self.results = []
def add_task(self, image_path, prompt, output_dir):
"""添加任务到队列"""
self.task_queue.put({
"image_path": image_path,
"prompt": prompt,
"output_dir": output_dir
})
def worker(self, worker_id):
"""工作线程函数"""
from basic_edit import edit_single_image
from download_image import download_image
while not self.task_queue.empty():
try:
task = self.task_queue.get_nowait()
except:
break
print(f"Worker {worker_id}: 处理 {task['image_path']}")
# 这里应该调用实际的API
# 为了演示,我们模拟处理
time.sleep(1) # 模拟处理时间
# 记录结果
self.results.append({
"worker": worker_id,
"task": task,
"status": "completed",
"timestamp": time.time()
})
self.task_queue.task_done()
def process_parallel(self):
"""并行处理所有任务"""
print(f"开始并行处理,使用 {self.max_workers} 个工作线程")
start_time = time.time()
# 创建并启动工作线程
threads = []
for i in range(self.max_workers):
thread = Thread(target=self.worker, args=(i,))
thread.start()
threads.append(thread)
# 等待所有线程完成
for thread in threads:
thread.join()
# 等待队列清空
self.task_queue.join()
end_time = time.time()
duration = end_time - start_time
print(f"\n并行处理完成!")
print(f"总任务数: {len(self.results)}")
print(f"总耗时: {duration:.2f}秒")
print(f"平均每个任务: {duration/len(self.results):.2f}秒")
return self.results
class CacheManager:
"""缓存管理器"""
def __init__(self, cache_dir=".api_cache"):
self.cache_dir = cache_dir
os.makedirs(cache_dir, exist_ok=True)
def get_cache_key(self, image_url, prompt, params):
"""生成缓存键"""
import hashlib
content = f"{image_url}_{prompt}_{json.dumps(params, sort_keys=True)}"
return hashlib.md5(content.encode()).hexdigest()
def get_cached_result(self, cache_key):
"""获取缓存结果"""
cache_file = os.path.join(self.cache_dir, f"{cache_key}.json")
if os.path.exists(cache_file):
with open(cache_file, 'r') as f:
return json.load(f)
return None
def save_to_cache(self, cache_key, result):
"""保存到缓存"""
cache_file = os.path.join(self.cache_dir, f"{cache_key}.json")
with open(cache_file, 'w') as f:
json.dump(result, f)
def clear_old_cache(self, max_age_hours=24):
"""清理旧缓存"""
import glob
import datetime
cache_files = glob.glob(os.path.join(self.cache_dir, "*.json"))
for cache_file in cache_files:
file_age = datetime.datetime.now() - datetime.datetime.fromtimestamp(
os.path.getmtime(cache_file)
)
if file_age.total_seconds() > max_age_hours * 3600:
os.remove(cache_file)
print(f"清理旧缓存: {cache_file}")
# 使用示例
if __name__ == "__main__":
# 测试并行处理
print("测试并行处理能力...")
processor = ParallelProcessor(max_workers=3)
# 添加示例任务
for i in range(10):
processor.add_task(
image_path=f"image_{i}.jpg",
prompt=f"编辑任务{i}",
output_dir="parallel_outputs"
)
results = processor.process_parallel()
# 测试缓存管理
print("\n测试缓存管理...")
cache_manager = CacheManager()
# 生成缓存键
test_key = cache_manager.get_cache_key(
"https://example.com/image.jpg",
"测试提示词",
{"size": "1024x1024", "n": 1}
)
print(f"缓存键: {test_key}")
# 清理旧缓存
cache_manager.clear_old_cache(max_age_hours=1)
5. 完整项目实战:电商图片自动化处理系统
最后,我们把所有组件整合起来,构建一个完整的电商图片处理系统。
# ecommerce_automation_system.py
import os
import json
import yaml
import schedule
import time
from datetime import datetime
from pathlib import Path
class EcommerceAutomationSystem:
"""电商图片自动化处理系统"""
def __init__(self, config_file="config.yaml"):
"""
初始化系统
参数:
config_file: 配置文件路径
"""
self.load_config(config_file)
self.setup_directories()
self.setup_logging()
def load_config(self, config_file):
"""加载配置文件"""
if not os.path.exists(config_file):
# 创建默认配置
default_config = {
"api": {
"model": "qwen-image-edit-max",
"base_url": "https://dashscope.aliyuncs.com/api/v1",
"timeout": 300,
"max_retries": 3
},
"paths": {
"input_dir": "./input_images",
"output_dir": "./processed_images",
"archive_dir": "./archived_images",
"log_dir": "./logs"
},
"processing": {
"batch_size": 10,
"max_workers": 3,
"default_prompt": "优化商品图片,白色背景,主体清晰",
"file_patterns": ["*.jpg", "*.png", "*.jpeg"]
},
"scheduling": {
"enabled": False,
"interval_minutes": 60,
"run_at": ["09:00", "14:00", "19:00"]
}
}
with open(config_file, 'w', encoding='utf-8') as f:
yaml.dump(default_config, f, default_flow_style=False)
print(f"已创建默认配置文件: {config_file}")
with open(config_file, 'r', encoding='utf-8') as f:
self.config = yaml.safe_load(f)
def setup_directories(self):
"""设置目录结构"""
paths = self.config['paths']
for dir_path in paths.values():
os.makedirs(dir_path, exist_ok=True)
print("目录结构已创建")
def setup_logging(self):
"""设置日志系统"""
import logging
log_dir = self.config['paths']['log_dir']
log_file = os.path.join(log_dir, f"automation_{datetime.now().strftime('%Y%m%d')}.log")
self.logger = logging.getLogger('EcommerceAutomation')
self.logger.setLevel(logging.INFO)
# 文件处理器
file_handler = logging.FileHandler(log_file, encoding='utf-8')
file_handler.setLevel(logging.INFO)
# 控制台处理器
console_handler = logging.StreamHandler()
console_handler.setLevel(logging.INFO)
# 格式器
formatter = logging.Formatter(
'%(asctime)s - %(name)s - %(levelname)s - %(message)s'
)
file_handler.setFormatter(formatter)
console_handler.setFormatter(formatter)
self.logger.addHandler(file_handler)
self.logger.addHandler(console_handler)
self.logger.info("自动化系统初始化完成")
def monitor_input_folder(self):
"""监控输入文件夹"""
input_dir = self.config['paths']['input_dir']
file_patterns = self.config['processing']['file_patterns']
new_files = []
for pattern in file_patterns:
for file_path in Path(input_dir).glob(pattern):
# 检查文件是否是新添加的(根据修改时间)
file_mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
time_diff = (datetime.now() - file_mtime).total_seconds()
if time_diff < 300: # 5分钟内的新文件
new_files.append(str(file_path))
return new_files
def process_batch(self, file_list):
"""处理一批文件"""
from batch_processor import BatchImageProcessor
self.logger.info(f"开始处理批次,共 {len(file_list)} 个文件")
processor = BatchImageProcessor()
output_dir = self.config['paths']['output_dir']
# 为每个文件生成处理指令
for file_path in file_list:
filename = Path(file_path).name
prompt = self.generate_prompt_for_file(filename)
self.logger.info(f"处理文件: {filename}")
# 这里调用实际的处理逻辑
# 为了演示,我们记录处理状态
processor.process_single_image(file_path, prompt, output_dir)
# 移动原文件到归档目录
self.archive_file(file_path)
self.logger.info("批次处理完成")
def generate_prompt_for_file(self, filename):
"""根据文件名生成处理提示词"""
# 这里可以根据文件名中的关键词生成不同的提示词
# 例如:根据产品类别、颜色等
prompt_templates = {
"default": self.config['processing']['default_prompt'],
"clothing": "优化服装图片,展示细节,自然褶皱,专业摄影风格",
"electronics": "优化电子产品图片,突出科技感,简洁背景,清晰展示接口",
"food": "优化食品图片,增强食欲感,自然光线,突出食材新鲜度"
}
# 简单的关键词匹配
filename_lower = filename.lower()
for category, template in prompt_templates.items():
if category != "default" and category in filename_lower:
return template
return prompt_templates["default"]
def archive_file(self, file_path):
"""归档已处理的文件"""
archive_dir = self.config['paths']['archive_dir']
if not os.path.exists(file_path):
return
filename = Path(file_path).name
archive_path = os.path.join(archive_dir, filename)
# 如果目标文件已存在,添加时间戳
if os.path.exists(archive_path):
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
name_parts = os.path.splitext(filename)
archive_path = os.path.join(
archive_dir,
f"{name_parts[0]}_{timestamp}{name_parts[1]}"
)
os.rename(file_path, archive_path)
self.logger.info(f"文件已归档: {archive_path}")
def generate_report(self):
"""生成处理报告"""
output_dir = self.config['paths']['output_dir']
log_dir = self.config['paths']['log_dir']
# 统计输出文件
output_files = list(Path(output_dir).glob("*"))
report = {
"generated_at": datetime.now().isoformat(),
"total_files": len(output_files),
"file_types": {},
"recent_files": []
}
# 统计文件类型
for file_path in output_files:
ext = file_path.suffix.lower()
report["file_types"][ext] = report["file_types"].get(ext, 0) + 1
# 最近处理的文件
recent_files = sorted(output_files, key=lambda x: x.stat().st_mtime, reverse=True)[:10]
report["recent_files"] = [
{
"name": f.name,
"size": f.stat().st_size,
"modified": datetime.fromtimestamp(f.stat().st_mtime).isoformat()
}
for f in recent_files
]
# 保存报告
report_file = os.path.join(log_dir, f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
with open(report_file, 'w', encoding='utf-8') as f:
json.dump(report, f, ensure_ascii=False, indent=2)
self.logger.info(f"报告已生成: {report_file}")
return report
def run_once(self):
"""运行一次处理任务"""
self.logger.info("开始执行处理任务")
# 监控新文件
new_files = self.monitor_input_folder()
if new_files:
self.logger.info(f"发现 {len(new_files)} 个新文件")
self.process_batch(new_files)
else:
self.logger.info("没有发现新文件")
# 生成报告
report = self.generate_report()
self.logger.info("处理任务完成")
return report
def run_scheduled(self):
"""运行定时任务"""
if not self.config['scheduling']['enabled']:
self.logger.warning("定时任务未启用")
return
self.logger.info("启动定时任务调度器")
# 设置定时任务
interval = self.config['scheduling']['interval_minutes']
schedule.every(interval).minutes.do(self.run_once)
# 设置定点任务
for run_time in self.config['scheduling']['run_at']:
schedule.every().day.at(run_time).do(self.run_once)
self.logger.info(f"定时任务已设置,每 {interval} 分钟运行一次")
# 运行调度器
try:
while True:
schedule.run_pending()
time.sleep(60) # 每分钟检查一次
except KeyboardInterrupt:
self.logger.info("定时任务已停止")
def run(self, mode="once"):
"""
运行系统
参数:
mode: 运行模式,"once"或"scheduled"
"""
if mode == "once":
self.run_once()
elif mode == "scheduled":
self.run_scheduled()
else:
self.logger.error(f"未知的运行模式: {mode}")
# 使用示例
if __name__ == "__main__":
print("电商图片自动化处理系统")
print("=" * 50)
# 创建系统实例
system = EcommerceAutomationSystem()
# 显示配置信息
print("\n当前配置:")
print(f"输入目录: {system.config['paths']['input_dir']}")
print(f"输出目录: {system.config['paths']['output_dir']}")
print(f"默认提示词: {system.config['processing']['default_prompt']}")
# 运行模式选择
print("\n请选择运行模式:")
print("1. 运行一次")
print("2. 启动定时任务")
choice = input("请输入选择 (1/2): ").strip()
if choice == "1":
print("\n开始单次运行...")
system.run(mode="once")
elif choice == "2":
print("\n启动定时任务...")
system.run(mode="scheduled")
else:
print("无效选择,退出系统")
6. 总结
通过上面的内容,我们一步步构建了一个完整的Qwen-Image-Edit自动化处理系统。从最基础的API调用,到批量处理,再到完整的电商自动化系统,这套方案可以显著提升图片处理的效率。
实际用下来,Qwen-Image-Edit的API调用确实比较简单,效果也相当不错。特别是对于电商图片处理、内容创作这些场景,能节省大量时间。Python的灵活性让我们可以轻松定制各种处理流程,满足不同的业务需求。
如果你刚开始接触,建议先从单张图片处理开始,熟悉API的基本用法。等掌握了基础,再尝试批量处理。最后,根据你的具体业务需求,定制自己的自动化系统。
这套系统还有很多可以扩展的地方,比如添加图片质量检测、自动分类、与电商平台API对接等。你可以根据自己的需求继续完善它。
获取更多AI镜像
想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。
更多推荐

所有评论(0)