Qwen-Image-Edit与Python集成:自动化图像处理脚本开发

想象一下,你手头有几百张商品图片需要批量处理——有的要换个背景,有的要加上促销文字,还有的要调整风格。如果一张张手动操作,估计得加班到深夜。但现在,有了Qwen-Image-Edit和Python,这些繁琐的工作可以完全交给代码自动完成。

今天我就来分享如何用Python调用Qwen-Image-Edit的API,搭建一套自动化图像处理流水线。这套方案特别适合电商运营、内容创作者、自媒体团队,能帮你把重复性的修图工作变成一键操作,效率提升可不是一点点。

1. 准备工作:环境搭建与API配置

在开始写代码之前,我们需要先把基础环境准备好。整个过程其实很简单,跟着步骤走就行。

1.1 安装必要的Python库

首先打开你的命令行工具,创建一个新的项目文件夹,然后安装几个必需的库:

# 创建项目目录
mkdir qwen-image-automation
cd qwen-image-automation

# 安装核心库
pip install dashscope requests pillow python-dotenv

这里安装的库各有各的用处:

  • dashscope:这是阿里云百炼平台的官方SDK,用来调用Qwen-Image-Edit的API
  • requests:用来下载API生成的图片
  • pillow:Python的图像处理库,用来处理本地图片
  • python-dotenv:管理环境变量,保护你的API密钥

1.2 获取API密钥

要使用Qwen-Image-Edit的API,你需要一个API密钥。这个密钥就像是你使用服务的通行证。

  1. 访问阿里云百炼平台(https://bailian.console.aliyun.com/)
  2. 注册或登录你的账号
  3. 在控制台找到“API密钥管理”
  4. 创建一个新的API密钥,把它复制下来

重要提醒:API密钥是你的个人凭证,千万不要直接写在代码里,更不要上传到公开的代码仓库。我们接下来会用安全的方式来管理它。

1.3 配置环境变量

在项目根目录创建一个名为.env的文件,内容如下:

# .env 文件
DASHSCOPE_API_KEY=你的API密钥

然后在同一目录下创建一个.gitignore文件,确保.env不会被上传到Git:

# .gitignore
.env
__pycache__/
*.pyc
outputs/

这样配置之后,你的API密钥就安全了,代码也能正常读取到它。

2. 基础API调用:从单张图片编辑开始

我们先从一个最简单的例子开始,看看怎么用Python调用API编辑一张图片。这样你能快速看到效果,建立信心。

2.1 编写第一个编辑脚本

创建一个名为basic_edit.py的文件,输入以下代码:

# basic_edit.py
import os
import json
from dotenv import load_dotenv
from dashscope import MultiModalConversation
import dashscope

# 加载环境变量
load_dotenv()

def edit_single_image(image_url, prompt, output_size="1024*1024"):
    """
    编辑单张图片的基础函数
    
    参数:
        image_url: 输入图片的URL
        prompt: 编辑指令,比如"把背景换成海滩"
        output_size: 输出图片尺寸,默认1024*1024
    """
    
    # 设置API基础URL(中国北京地域)
    dashscope.base_http_api_url = 'https://dashscope.aliyuncs.com/api/v1'
    
    # 从环境变量获取API密钥
    api_key = os.getenv("DASHSCOPE_API_KEY")
    if not api_key:
        print("错误:请先在.env文件中设置DASHSCOPE_API_KEY")
        return None
    
    # 构建消息内容
    messages = [
        {
            "role": "user",
            "content": [
                {"image": image_url},
                {"text": prompt}
            ]
        }
    ]
    
    print(f"正在处理图片: {image_url}")
    print(f"编辑指令: {prompt}")
    
    try:
        # 调用API
        response = MultiModalConversation.call(
            api_key=api_key,
            model="qwen-image-edit-max",  # 使用旗舰版模型
            messages=messages,
            stream=False,
            n=1,  # 生成1张图片
            watermark=False,  # 不添加水印
            negative_prompt="模糊, 变形, 多余的手指",  # 反向提示词,避免不想要的效果
            prompt_extend=True,  # 开启提示词智能优化
            size=output_size,
        )
        
        # 检查响应状态
        if response.status_code == 200:
            # 获取生成的图片URL
            image_url = response.output.choices[0].message.content[0]['image']
            print(f" 图片生成成功!")
            print(f"图片URL: {image_url}")
            return image_url
        else:
            print(f" API调用失败")
            print(f"错误码: {response.code}")
            print(f"错误信息: {response.message}")
            return None
            
    except Exception as e:
        print(f" 发生异常: {str(e)}")
        return None

# 测试函数
if __name__ == "__main__":
    # 使用示例图片(来自官方文档)
    test_image_url = "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20250925/thtclx/input1.png"
    test_prompt = "把背景换成海滩,人物保持原样"
    
    result_url = edit_single_image(test_image_url, test_prompt)
    
    if result_url:
        print("\n 恭喜!你的第一个图像编辑脚本运行成功了!")
        print("接下来可以尝试下载这张图片,或者修改提示词看看不同效果")

运行这个脚本,你会看到控制台输出生成的图片URL。虽然现在还不能直接下载,但至少证明API调用成功了。

2.2 下载生成的图片

光有URL还不够,我们需要把图片保存到本地。创建一个新的函数来处理下载:

# download_image.py
import requests
import os
from datetime import datetime

def download_image(image_url, save_dir="outputs", filename=None):
    """
    下载图片到本地
    
    参数:
        image_url: 图片的URL
        save_dir: 保存目录
        filename: 文件名,如果为None则自动生成
    """
    
    # 创建保存目录
    os.makedirs(save_dir, exist_ok=True)
    
    # 生成文件名
    if filename is None:
        timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
        filename = f"edited_{timestamp}.png"
    
    save_path = os.path.join(save_dir, filename)
    
    try:
        print(f"正在下载图片到: {save_path}")
        
        # 设置较长的超时时间,因为图片可能比较大
        response = requests.get(image_url, stream=True, timeout=300)
        response.raise_for_status()  # 检查HTTP状态码
        
        # 写入文件
        with open(save_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        
        print(f" 图片下载完成: {save_path}")
        file_size = os.path.getsize(save_path) / 1024  # 转换为KB
        print(f"文件大小: {file_size:.1f} KB")
        
        return save_path
        
    except requests.exceptions.RequestException as e:
        print(f" 下载失败: {e}")
        return None
    except Exception as e:
        print(f" 发生错误: {e}")
        return None

# 整合到主流程中
def edit_and_download(image_url, prompt, output_size="1024*1024"):
    """完整的编辑+下载流程"""
    
    # 第一步:编辑图片
    print("=" * 50)
    print("步骤1: 调用API编辑图片")
    print("=" * 50)
    edited_url = edit_single_image(image_url, prompt, output_size)
    
    if not edited_url:
        return None
    
    # 第二步:下载图片
    print("\n" + "=" * 50)
    print("步骤2: 下载生成的图片")
    print("=" * 50)
    save_path = download_image(edited_url)
    
    return save_path

# 测试完整流程
if __name__ == "__main__":
    # 测试图片和提示词
    test_cases = [
        {
            "url": "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20250925/thtclx/input1.png",
            "prompt": "把背景换成阳光明媚的海滩,人物保持微笑",
            "name": "海滩背景"
        },
        {
            "url": "https://help-static-aliyun-doc.aliyuncs.com/file-manage-files/zh-CN/20250925/iclsnx/input2.png",
            "prompt": "把裙子颜色换成红色,保持原有款式",
            "name": "红裙子"
        }
    ]
    
    for i, test_case in enumerate(test_cases, 1):
        print(f"\n{'#' * 60}")
        print(f"测试案例 {i}: {test_case['name']}")
        print(f"{'#' * 60}")
        
        save_path = edit_and_download(
            test_case['url'], 
            test_case['prompt']
        )
        
        if save_path:
            print(f" 案例{i}处理完成,图片保存在: {save_path}")
        else:
            print(f" 案例{i}处理失败")
        
        print()  # 空行分隔

运行这个脚本,你会看到完整的处理流程:调用API → 生成图片 → 下载保存。现在你已经掌握了最基础的用法。

3. 实战应用:搭建批量处理流水线

单张处理只是开始,真正的价值在于批量处理。我们来构建一个实用的批量处理系统。

3.1 批量处理本地图片

大多数时候,我们需要处理的是本地电脑上的图片。下面这个类能帮你批量处理整个文件夹的图片:

# batch_processor.py
import os
import glob
import time
from pathlib import Path
from basic_edit import edit_single_image
from download_image import download_image

class BatchImageProcessor:
    """批量图片处理器"""
    
    def __init__(self, api_key=None):
        """
        初始化处理器
        
        参数:
            api_key: API密钥,如果为None则从环境变量读取
        """
        self.api_key = api_key
        self.processed_count = 0
        self.failed_count = 0
        self.results = []
        
    def upload_to_temp_url(self, image_path):
        """
        模拟上传图片到临时URL
        注意:实际生产环境需要将图片上传到云存储
        这里为了简化,假设图片已经在公网可访问
        """
        # 在实际项目中,你需要将图片上传到阿里云OSS、七牛云等
        # 这里返回本地路径作为演示(实际不可用)
        # 生产环境请替换为真实的上传逻辑
        print(f" 注意:实际使用时需要将图片上传到公网可访问的存储")
        print(f"图片路径: {image_path}")
        return image_path  # 这里需要替换为真实URL
    
    def process_single_image(self, image_path, prompt, output_dir="batch_outputs"):
        """
        处理单张图片
        
        参数:
            image_path: 本地图片路径
            prompt: 编辑指令
            output_dir: 输出目录
        """
        print(f"\n处理图片: {os.path.basename(image_path)}")
        
        try:
            # 在实际应用中,这里需要上传图片到云存储获取URL
            # image_url = self.upload_to_temp_url(image_path)
            
            # 为了演示,我们使用一个示例URL
            # 实际使用时请取消下面这行注释,并实现upload_to_temp_url函数
            # edited_url = edit_single_image(image_url, prompt)
            
            # 演示用:模拟处理过程
            print(f"编辑指令: {prompt}")
            print("模拟API调用中...")
            time.sleep(1)  # 模拟网络延迟
            
            # 在实际项目中,这里会返回真实的编辑结果
            # 为了演示,我们创建一个占位文件
            os.makedirs(output_dir, exist_ok=True)
            timestamp = int(time.time())
            filename = f"batch_{timestamp}_{self.processed_count}.png"
            save_path = os.path.join(output_dir, filename)
            
            # 创建空文件作为演示
            with open(save_path, 'w') as f:
                f.write("这是模拟的输出文件")
            
            self.processed_count += 1
            self.results.append({
                "original": image_path,
                "output": save_path,
                "prompt": prompt,
                "status": "success"
            })
            
            print(f" 处理完成: {save_path}")
            return save_path
            
        except Exception as e:
            print(f" 处理失败: {e}")
            self.failed_count += 1
            self.results.append({
                "original": image_path,
                "output": None,
                "prompt": prompt,
                "status": "failed",
                "error": str(e)
            })
            return None
    
    def process_folder(self, folder_path, prompt, file_pattern="*.jpg;*.png;*.jpeg"):
        """
        处理整个文件夹的图片
        
        参数:
            folder_path: 文件夹路径
            prompt: 编辑指令(可以包含{filename}占位符)
            file_pattern: 文件匹配模式,用分号分隔
        """
        print(f"开始处理文件夹: {folder_path}")
        print(f"文件模式: {file_pattern}")
        print(f"编辑指令: {prompt}")
        print("-" * 50)
        
        # 解析文件模式
        patterns = [p.strip() for p in file_pattern.split(';')]
        
        # 收集所有匹配的文件
        all_files = []
        for pattern in patterns:
            search_pattern = os.path.join(folder_path, pattern)
            files = glob.glob(search_pattern)
            all_files.extend(files)
        
        # 去重
        all_files = list(set(all_files))
        
        if not all_files:
            print(" 没有找到匹配的图片文件")
            return
        
        print(f"找到 {len(all_files)} 张图片")
        
        # 创建输出目录
        timestamp = time.strftime("%Y%m%d_%H%M%S")
        output_dir = f"batch_output_{timestamp}"
        
        # 处理每张图片
        for i, image_path in enumerate(all_files, 1):
            filename = os.path.basename(image_path)
            print(f"\n[{i}/{len(all_files)}] 处理: {filename}")
            
            # 如果prompt中包含{filename},替换为实际文件名
            actual_prompt = prompt.replace("{filename}", filename)
            
            self.process_single_image(image_path, actual_prompt, output_dir)
            
            # 添加延迟,避免API限流
            time.sleep(0.5)
        
        # 输出统计信息
        print(f"\n{'='*60}")
        print("批量处理完成!")
        print(f"{'='*60}")
        print(f"总图片数: {len(all_files)}")
        print(f"成功处理: {self.processed_count}")
        print(f"处理失败: {self.failed_count}")
        print(f"输出目录: {output_dir}")
        
        # 保存处理日志
        self.save_log(output_dir)
        
    def save_log(self, output_dir):
        """保存处理日志"""
        log_file = os.path.join(output_dir, "processing_log.json")
        
        import json
        with open(log_file, 'w', encoding='utf-8') as f:
            json.dump(self.results, f, ensure_ascii=False, indent=2)
        
        print(f"处理日志已保存: {log_file}")

# 使用示例
if __name__ == "__main__":
    # 创建处理器
    processor = BatchImageProcessor()
    
    # 示例:处理电商商品图
    # 假设有一个文件夹包含商品图片
    folder_path = "./product_images"  # 替换为你的图片文件夹路径
    
    # 如果文件夹不存在,创建示例文件夹
    if not os.path.exists(folder_path):
        os.makedirs(folder_path, exist_ok=True)
        print(f"创建了示例文件夹: {folder_path}")
        print("请将你的商品图片放入此文件夹,然后重新运行")
    else:
        # 批量处理指令
        # {filename} 会被替换为实际文件名
        prompt = "为{filename}添加白色背景,保持商品主体清晰"
        
        processor.process_folder(
            folder_path=folder_path,
            prompt=prompt,
            file_pattern="*.jpg;*.png"
        )

这个批量处理器虽然简化了上传部分,但框架是完整的。在实际使用中,你需要实现图片上传到云存储的功能。

3.2 电商场景实战:商品图批量优化

电商运营经常需要处理大量商品图片。下面是一个更贴近实际需求的例子:

# ecommerce_processor.py
import os
import json
from batch_processor import BatchImageProcessor

class EcommerceImageProcessor(BatchImageProcessor):
    """电商图片专用处理器"""
    
    def __init__(self):
        super().__init__()
        self.templates = {
            "white_background": "将背景替换为纯白色,商品主体保持清晰",
            "scene_background": "将背景替换为现代家居场景,保持商品在画面中央",
            "add_logo": "在图片右下角添加品牌Logo,Logo大小为图片宽度的10%",
            "add_text": "在图片底部添加文字'限时特价',使用红色醒目字体",
            "multiple_angles": "生成商品的45度角视图,保持商品特征一致"
        }
    
    def process_product_variants(self, product_folder, operations):
        """
        处理商品的多角度/多版本图片
        
        参数:
            product_folder: 商品图片文件夹
            operations: 操作列表,如["white_background", "add_logo"]
        """
        print(f"处理商品: {product_folder}")
        
        # 收集所有图片
        image_files = []
        for ext in ['*.jpg', '*.png', '*.jpeg']:
            image_files.extend(glob.glob(os.path.join(product_folder, ext)))
        
        if not image_files:
            print("没有找到商品图片")
            return
        
        # 为每张图片应用所有操作
        for image_path in image_files:
            filename = os.path.basename(image_path)
            print(f"\n处理商品图: {filename}")
            
            # 组合所有操作指令
            combined_prompt = ",".join([self.templates[op] for op in operations])
            
            # 处理图片
            self.process_single_image(
                image_path, 
                combined_prompt,
                output_dir=f"product_outputs/{os.path.basename(product_folder)}"
            )
    
    def batch_process_catalog(self, catalog_file):
        """
        根据商品目录批量处理
        
        参数:
            catalog_file: JSON格式的商品目录文件
        """
        with open(catalog_file, 'r', encoding='utf-8') as f:
            catalog = json.load(f)
        
        for product in catalog:
            product_id = product.get('id')
            folder = product.get('folder')
            operations = product.get('operations', ['white_background'])
            
            if os.path.exists(folder):
                print(f"\n{'='*60}")
                print(f"处理商品 {product_id}: {folder}")
                print(f"操作: {operations}")
                
                self.process_product_variants(folder, operations)
            else:
                print(f" 文件夹不存在: {folder}")

# 创建商品目录示例
def create_sample_catalog():
    """创建示例商品目录"""
    catalog = [
        {
            "id": "PROD001",
            "name": "无线耳机",
            "folder": "./products/headphones",
            "operations": ["white_background", "add_logo"]
        },
        {
            "id": "PROD002", 
            "name": "运动水杯",
            "folder": "./products/water_bottle",
            "operations": ["scene_background", "add_text"]
        }
    ]
    
    # 创建目录结构
    for product in catalog:
        os.makedirs(product['folder'], exist_ok=True)
    
    # 保存目录文件
    with open('product_catalog.json', 'w', encoding='utf-8') as f:
        json.dump(catalog, f, ensure_ascii=False, indent=2)
    
    print("示例商品目录已创建: product_catalog.json")
    print("请将商品图片放入对应的文件夹中")

# 使用示例
if __name__ == "__main__":
    # 创建示例目录
    create_sample_catalog()
    
    # 创建处理器
    processor = EcommerceImageProcessor()
    
    print("\n电商图片批量处理器已就绪")
    print("可用操作模板:")
    for key, desc in processor.templates.items():
        print(f"  {key}: {desc}")
    
    # 开始处理
    # processor.batch_process_catalog('product_catalog.json')
    print("\n请将商品图片放入对应文件夹后,取消注释上面的代码行开始处理")

这个电商处理器提供了模板化的操作,你可以根据实际需求扩展更多的处理模板。

4. 高级技巧与最佳实践

掌握了基础用法后,我们来看看一些提升效率和效果的高级技巧。

4.1 提示词优化策略

好的提示词能显著提升编辑效果。这里有一些实用技巧:

# prompt_optimizer.py

class PromptOptimizer:
    """提示词优化器"""
    
    @staticmethod
    def optimize_for_editing(base_prompt, style="detailed"):
        """
        优化编辑提示词
        
        参数:
            base_prompt: 基础提示词,如"换个背景"
            style: 优化风格,可选"detailed"/"simple"/"creative"
        """
        templates = {
            "detailed": (
                "请精确编辑图片,具体要求如下:{base_prompt}。"
                "请保持图片其他部分不变,只修改指定区域。"
                "确保编辑后的部分与原始图片自然融合。"
            ),
            "simple": (
                "{base_prompt},保持其他部分不变"
            ),
            "creative": (
                "基于以下创意进行编辑:{base_prompt}。"
                "可以适当发挥创意,但保持图片整体协调。"
            )
        }
        
        return templates.get(style, templates["simple"]).format(base_prompt=base_prompt)
    
    @staticmethod  
    def add_negative_prompt(base_prompt, negative_items=None):
        """
        添加反向提示词
        
        参数:
            base_prompt: 基础提示词
            negative_items: 需要避免的内容列表
        """
        if negative_items is None:
            negative_items = ["模糊", "变形", "多余的手指", "不自然的边缘"]
        
        negative_text = ",".join(negative_items)
        return f"{base_prompt}。避免出现:{negative_text}"
    
    @staticmethod
    def for_specific_scenario(scenario, **kwargs):
        """
        为特定场景生成优化提示词
        
        参数:
            scenario: 场景类型
            **kwargs: 场景特定参数
        """
        scenarios = {
            "product_background": (
                "将商品背景替换为{background},保持商品主体清晰、边缘锐利。"
                "背景与商品要有适当的空间感,避免看起来像贴图。"
            ),
            "text_replacement": (
                "将图片中的文字'{old_text}'替换为'{new_text}',"
                "保持原有的字体、大小、颜色和风格。"
                "确保新文字与背景自然融合。"
            ),
            "object_removal": (
                "移除图片中的{object_to_remove},"
                "用合理的背景内容填充该区域。"
                "确保修复后的区域与周围环境协调一致。"
            ),
            "style_transfer": (
                "将图片风格转换为{target_style}风格,"
                "保持图片的主要内容和构图不变。"
                "色彩和纹理要符合目标风格的特征。"
            )
        }
        
        template = scenarios.get(scenario)
        if template:
            return template.format(**kwargs)
        else:
            return f"执行以下编辑:{scenario}"

# 使用示例
if __name__ == "__main__":
    optimizer = PromptOptimizer()
    
    # 示例1:详细编辑提示词
    simple_prompt = "把天空换成晚霞"
    detailed_prompt = optimizer.optimize_for_editing(simple_prompt, "detailed")
    print("详细提示词示例:")
    print(f"原始: {simple_prompt}")
    print(f"优化后: {detailed_prompt}")
    print()
    
    # 示例2:商品背景替换
    product_prompt = optimizer.for_specific_scenario(
        "product_background",
        background="简约的白色工作室背景"
    )
    print("商品背景替换提示词:")
    print(product_prompt)
    print()
    
    # 示例3:添加反向提示词
    full_prompt = optimizer.add_negative_prompt(detailed_prompt)
    print("带反向提示词的完整提示词:")
    print(full_prompt)

4.2 错误处理与重试机制

在实际生产环境中,网络波动、API限流等问题时有发生。一个健壮的系统需要有完善的错误处理机制。

# error_handler.py
import time
import logging
from functools import wraps

# 配置日志
logging.basicConfig(
    level=logging.INFO,
    format='%(asctime)s - %(levelname)s - %(message)s',
    handlers=[
        logging.FileHandler('api_errors.log'),
        logging.StreamHandler()
    ]
)

def retry_on_failure(max_retries=3, delay=2, backoff=2):
    """
    重试装饰器
    
    参数:
        max_retries: 最大重试次数
        delay: 初始延迟时间(秒)
        backoff: 延迟倍数
    """
    def decorator(func):
        @wraps(func)
        def wrapper(*args, **kwargs):
            retries = 0
            current_delay = delay
            
            while retries <= max_retries:
                try:
                    return func(*args, **kwargs)
                    
                except Exception as e:
                    retries += 1
                    
                    if retries > max_retries:
                        logging.error(f"函数 {func.__name__} 重试{max_retries}次后仍失败: {e}")
                        raise
                    
                    logging.warning(f"函数 {func.__name__} 第{retries}次失败: {e}")
                    logging.info(f"等待{current_delay}秒后重试...")
                    
                    time.sleep(current_delay)
                    current_delay *= backoff  # 指数退避
            
            return None
        return wrapper
    return decorator

class APIErrorHandler:
    """API错误处理器"""
    
    ERROR_CODES = {
        "RateLimitExceeded": "请求频率超限,请稍后重试",
        "InvalidParameter": "参数错误,请检查输入",
        "ModelNotAvailable": "模型暂时不可用",
        "ImageTooLarge": "图片尺寸过大",
        "NetworkError": "网络连接失败"
    }
    
    @staticmethod
    def handle_error(error_code, error_message, context=None):
        """
        处理API错误
        
        参数:
            error_code: 错误代码
            error_message: 错误信息
            context: 上下文信息,用于记录
        """
        user_message = APIErrorHandler.ERROR_CODES.get(
            error_code, 
            f"处理失败: {error_message}"
        )
        
        # 记录错误详情
        error_log = {
            "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
            "error_code": error_code,
            "error_message": error_message,
            "context": context,
            "user_message": user_message
        }
        
        logging.error(f"API错误: {error_log}")
        
        # 根据错误类型采取不同策略
        if error_code == "RateLimitExceeded":
            # 频率限制,建议等待
            wait_time = 60  # 等待60秒
            logging.info(f"达到频率限制,等待{wait_time}秒")
            time.sleep(wait_time)
            return "retry"
        elif error_code == "ModelNotAvailable":
            # 模型不可用,尝试备用模型
            logging.info("尝试使用备用模型")
            return "use_fallback"
        else:
            # 其他错误,返回用户友好的消息
            return user_message
    
    @staticmethod
    @retry_on_failure(max_retries=3, delay=2)
    def safe_api_call(api_func, *args, **kwargs):
        """
        安全的API调用包装器
        
        参数:
            api_func: API函数
            *args, **kwargs: 函数参数
        """
        try:
            return api_func(*args, **kwargs)
        except Exception as e:
            # 这里可以根据具体的异常类型映射到错误代码
            error_code = "NetworkError" if "connection" in str(e).lower() else "UnknownError"
            handled = APIErrorHandler.handle_error(error_code, str(e), {
                "function": api_func.__name__,
                "args": args,
                "kwargs": kwargs
            })
            
            if handled == "retry":
                raise  # 触发重试
            else:
                return None

# 使用示例
if __name__ == "__main__":
    # 模拟一个可能失败的API调用
    def unstable_api_call(success_rate=0.5):
        import random
        if random.random() < success_rate:
            return "API调用成功"
        else:
            raise Exception("模拟的API错误")
    
    # 使用错误处理器包装API调用
    handler = APIErrorHandler()
    
    print("测试错误处理机制...")
    
    # 测试重试机制
    result = handler.safe_api_call(unstable_api_call, success_rate=0.3)
    print(f"结果: {result}")
    
    # 测试错误处理
    error_response = handler.handle_error(
        "RateLimitExceeded",
        "请求过于频繁",
        {"api_endpoint": "qwen-image-edit"}
    )
    print(f"错误处理建议: {error_response}")

4.3 性能优化建议

当处理大量图片时,性能变得很重要。这里有一些优化建议:

# performance_optimizer.py
import concurrent.futures
import time
from queue import Queue
from threading import Thread

class ParallelProcessor:
    """并行处理器"""
    
    def __init__(self, max_workers=3):
        """
        初始化并行处理器
        
        参数:
            max_workers: 最大工作线程数
        """
        self.max_workers = max_workers
        self.task_queue = Queue()
        self.results = []
        
    def add_task(self, image_path, prompt, output_dir):
        """添加任务到队列"""
        self.task_queue.put({
            "image_path": image_path,
            "prompt": prompt,
            "output_dir": output_dir
        })
    
    def worker(self, worker_id):
        """工作线程函数"""
        from basic_edit import edit_single_image
        from download_image import download_image
        
        while not self.task_queue.empty():
            try:
                task = self.task_queue.get_nowait()
            except:
                break
            
            print(f"Worker {worker_id}: 处理 {task['image_path']}")
            
            # 这里应该调用实际的API
            # 为了演示,我们模拟处理
            time.sleep(1)  # 模拟处理时间
            
            # 记录结果
            self.results.append({
                "worker": worker_id,
                "task": task,
                "status": "completed",
                "timestamp": time.time()
            })
            
            self.task_queue.task_done()
    
    def process_parallel(self):
        """并行处理所有任务"""
        print(f"开始并行处理,使用 {self.max_workers} 个工作线程")
        start_time = time.time()
        
        # 创建并启动工作线程
        threads = []
        for i in range(self.max_workers):
            thread = Thread(target=self.worker, args=(i,))
            thread.start()
            threads.append(thread)
        
        # 等待所有线程完成
        for thread in threads:
            thread.join()
        
        # 等待队列清空
        self.task_queue.join()
        
        end_time = time.time()
        duration = end_time - start_time
        
        print(f"\n并行处理完成!")
        print(f"总任务数: {len(self.results)}")
        print(f"总耗时: {duration:.2f}秒")
        print(f"平均每个任务: {duration/len(self.results):.2f}秒")
        
        return self.results

class CacheManager:
    """缓存管理器"""
    
    def __init__(self, cache_dir=".api_cache"):
        self.cache_dir = cache_dir
        os.makedirs(cache_dir, exist_ok=True)
    
    def get_cache_key(self, image_url, prompt, params):
        """生成缓存键"""
        import hashlib
        content = f"{image_url}_{prompt}_{json.dumps(params, sort_keys=True)}"
        return hashlib.md5(content.encode()).hexdigest()
    
    def get_cached_result(self, cache_key):
        """获取缓存结果"""
        cache_file = os.path.join(self.cache_dir, f"{cache_key}.json")
        
        if os.path.exists(cache_file):
            with open(cache_file, 'r') as f:
                return json.load(f)
        return None
    
    def save_to_cache(self, cache_key, result):
        """保存到缓存"""
        cache_file = os.path.join(self.cache_dir, f"{cache_key}.json")
        
        with open(cache_file, 'w') as f:
            json.dump(result, f)
    
    def clear_old_cache(self, max_age_hours=24):
        """清理旧缓存"""
        import glob
        import datetime
        
        cache_files = glob.glob(os.path.join(self.cache_dir, "*.json"))
        
        for cache_file in cache_files:
            file_age = datetime.datetime.now() - datetime.datetime.fromtimestamp(
                os.path.getmtime(cache_file)
            )
            
            if file_age.total_seconds() > max_age_hours * 3600:
                os.remove(cache_file)
                print(f"清理旧缓存: {cache_file}")

# 使用示例
if __name__ == "__main__":
    # 测试并行处理
    print("测试并行处理能力...")
    
    processor = ParallelProcessor(max_workers=3)
    
    # 添加示例任务
    for i in range(10):
        processor.add_task(
            image_path=f"image_{i}.jpg",
            prompt=f"编辑任务{i}",
            output_dir="parallel_outputs"
        )
    
    results = processor.process_parallel()
    
    # 测试缓存管理
    print("\n测试缓存管理...")
    
    cache_manager = CacheManager()
    
    # 生成缓存键
    test_key = cache_manager.get_cache_key(
        "https://example.com/image.jpg",
        "测试提示词",
        {"size": "1024x1024", "n": 1}
    )
    
    print(f"缓存键: {test_key}")
    
    # 清理旧缓存
    cache_manager.clear_old_cache(max_age_hours=1)

5. 完整项目实战:电商图片自动化处理系统

最后,我们把所有组件整合起来,构建一个完整的电商图片处理系统。

# ecommerce_automation_system.py
import os
import json
import yaml
import schedule
import time
from datetime import datetime
from pathlib import Path

class EcommerceAutomationSystem:
    """电商图片自动化处理系统"""
    
    def __init__(self, config_file="config.yaml"):
        """
        初始化系统
        
        参数:
            config_file: 配置文件路径
        """
        self.load_config(config_file)
        self.setup_directories()
        self.setup_logging()
        
    def load_config(self, config_file):
        """加载配置文件"""
        if not os.path.exists(config_file):
            # 创建默认配置
            default_config = {
                "api": {
                    "model": "qwen-image-edit-max",
                    "base_url": "https://dashscope.aliyuncs.com/api/v1",
                    "timeout": 300,
                    "max_retries": 3
                },
                "paths": {
                    "input_dir": "./input_images",
                    "output_dir": "./processed_images",
                    "archive_dir": "./archived_images",
                    "log_dir": "./logs"
                },
                "processing": {
                    "batch_size": 10,
                    "max_workers": 3,
                    "default_prompt": "优化商品图片,白色背景,主体清晰",
                    "file_patterns": ["*.jpg", "*.png", "*.jpeg"]
                },
                "scheduling": {
                    "enabled": False,
                    "interval_minutes": 60,
                    "run_at": ["09:00", "14:00", "19:00"]
                }
            }
            
            with open(config_file, 'w', encoding='utf-8') as f:
                yaml.dump(default_config, f, default_flow_style=False)
            
            print(f"已创建默认配置文件: {config_file}")
        
        with open(config_file, 'r', encoding='utf-8') as f:
            self.config = yaml.safe_load(f)
    
    def setup_directories(self):
        """设置目录结构"""
        paths = self.config['paths']
        
        for dir_path in paths.values():
            os.makedirs(dir_path, exist_ok=True)
        
        print("目录结构已创建")
    
    def setup_logging(self):
        """设置日志系统"""
        import logging
        
        log_dir = self.config['paths']['log_dir']
        log_file = os.path.join(log_dir, f"automation_{datetime.now().strftime('%Y%m%d')}.log")
        
        self.logger = logging.getLogger('EcommerceAutomation')
        self.logger.setLevel(logging.INFO)
        
        # 文件处理器
        file_handler = logging.FileHandler(log_file, encoding='utf-8')
        file_handler.setLevel(logging.INFO)
        
        # 控制台处理器
        console_handler = logging.StreamHandler()
        console_handler.setLevel(logging.INFO)
        
        # 格式器
        formatter = logging.Formatter(
            '%(asctime)s - %(name)s - %(levelname)s - %(message)s'
        )
        file_handler.setFormatter(formatter)
        console_handler.setFormatter(formatter)
        
        self.logger.addHandler(file_handler)
        self.logger.addHandler(console_handler)
        
        self.logger.info("自动化系统初始化完成")
    
    def monitor_input_folder(self):
        """监控输入文件夹"""
        input_dir = self.config['paths']['input_dir']
        file_patterns = self.config['processing']['file_patterns']
        
        new_files = []
        for pattern in file_patterns:
            for file_path in Path(input_dir).glob(pattern):
                # 检查文件是否是新添加的(根据修改时间)
                file_mtime = datetime.fromtimestamp(file_path.stat().st_mtime)
                time_diff = (datetime.now() - file_mtime).total_seconds()
                
                if time_diff < 300:  # 5分钟内的新文件
                    new_files.append(str(file_path))
        
        return new_files
    
    def process_batch(self, file_list):
        """处理一批文件"""
        from batch_processor import BatchImageProcessor
        
        self.logger.info(f"开始处理批次,共 {len(file_list)} 个文件")
        
        processor = BatchImageProcessor()
        output_dir = self.config['paths']['output_dir']
        
        # 为每个文件生成处理指令
        for file_path in file_list:
            filename = Path(file_path).name
            prompt = self.generate_prompt_for_file(filename)
            
            self.logger.info(f"处理文件: {filename}")
            
            # 这里调用实际的处理逻辑
            # 为了演示,我们记录处理状态
            processor.process_single_image(file_path, prompt, output_dir)
            
            # 移动原文件到归档目录
            self.archive_file(file_path)
        
        self.logger.info("批次处理完成")
    
    def generate_prompt_for_file(self, filename):
        """根据文件名生成处理提示词"""
        # 这里可以根据文件名中的关键词生成不同的提示词
        # 例如:根据产品类别、颜色等
        
        prompt_templates = {
            "default": self.config['processing']['default_prompt'],
            "clothing": "优化服装图片,展示细节,自然褶皱,专业摄影风格",
            "electronics": "优化电子产品图片,突出科技感,简洁背景,清晰展示接口",
            "food": "优化食品图片,增强食欲感,自然光线,突出食材新鲜度"
        }
        
        # 简单的关键词匹配
        filename_lower = filename.lower()
        
        for category, template in prompt_templates.items():
            if category != "default" and category in filename_lower:
                return template
        
        return prompt_templates["default"]
    
    def archive_file(self, file_path):
        """归档已处理的文件"""
        archive_dir = self.config['paths']['archive_dir']
        
        if not os.path.exists(file_path):
            return
        
        filename = Path(file_path).name
        archive_path = os.path.join(archive_dir, filename)
        
        # 如果目标文件已存在,添加时间戳
        if os.path.exists(archive_path):
            timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
            name_parts = os.path.splitext(filename)
            archive_path = os.path.join(
                archive_dir, 
                f"{name_parts[0]}_{timestamp}{name_parts[1]}"
            )
        
        os.rename(file_path, archive_path)
        self.logger.info(f"文件已归档: {archive_path}")
    
    def generate_report(self):
        """生成处理报告"""
        output_dir = self.config['paths']['output_dir']
        log_dir = self.config['paths']['log_dir']
        
        # 统计输出文件
        output_files = list(Path(output_dir).glob("*"))
        
        report = {
            "generated_at": datetime.now().isoformat(),
            "total_files": len(output_files),
            "file_types": {},
            "recent_files": []
        }
        
        # 统计文件类型
        for file_path in output_files:
            ext = file_path.suffix.lower()
            report["file_types"][ext] = report["file_types"].get(ext, 0) + 1
        
        # 最近处理的文件
        recent_files = sorted(output_files, key=lambda x: x.stat().st_mtime, reverse=True)[:10]
        report["recent_files"] = [
            {
                "name": f.name,
                "size": f.stat().st_size,
                "modified": datetime.fromtimestamp(f.stat().st_mtime).isoformat()
            }
            for f in recent_files
        ]
        
        # 保存报告
        report_file = os.path.join(log_dir, f"report_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json")
        with open(report_file, 'w', encoding='utf-8') as f:
            json.dump(report, f, ensure_ascii=False, indent=2)
        
        self.logger.info(f"报告已生成: {report_file}")
        return report
    
    def run_once(self):
        """运行一次处理任务"""
        self.logger.info("开始执行处理任务")
        
        # 监控新文件
        new_files = self.monitor_input_folder()
        
        if new_files:
            self.logger.info(f"发现 {len(new_files)} 个新文件")
            self.process_batch(new_files)
        else:
            self.logger.info("没有发现新文件")
        
        # 生成报告
        report = self.generate_report()
        
        self.logger.info("处理任务完成")
        return report
    
    def run_scheduled(self):
        """运行定时任务"""
        if not self.config['scheduling']['enabled']:
            self.logger.warning("定时任务未启用")
            return
        
        self.logger.info("启动定时任务调度器")
        
        # 设置定时任务
        interval = self.config['scheduling']['interval_minutes']
        schedule.every(interval).minutes.do(self.run_once)
        
        # 设置定点任务
        for run_time in self.config['scheduling']['run_at']:
            schedule.every().day.at(run_time).do(self.run_once)
        
        self.logger.info(f"定时任务已设置,每 {interval} 分钟运行一次")
        
        # 运行调度器
        try:
            while True:
                schedule.run_pending()
                time.sleep(60)  # 每分钟检查一次
        except KeyboardInterrupt:
            self.logger.info("定时任务已停止")
    
    def run(self, mode="once"):
        """
        运行系统
        
        参数:
            mode: 运行模式,"once"或"scheduled"
        """
        if mode == "once":
            self.run_once()
        elif mode == "scheduled":
            self.run_scheduled()
        else:
            self.logger.error(f"未知的运行模式: {mode}")

# 使用示例
if __name__ == "__main__":
    print("电商图片自动化处理系统")
    print("=" * 50)
    
    # 创建系统实例
    system = EcommerceAutomationSystem()
    
    # 显示配置信息
    print("\n当前配置:")
    print(f"输入目录: {system.config['paths']['input_dir']}")
    print(f"输出目录: {system.config['paths']['output_dir']}")
    print(f"默认提示词: {system.config['processing']['default_prompt']}")
    
    # 运行模式选择
    print("\n请选择运行模式:")
    print("1. 运行一次")
    print("2. 启动定时任务")
    
    choice = input("请输入选择 (1/2): ").strip()
    
    if choice == "1":
        print("\n开始单次运行...")
        system.run(mode="once")
    elif choice == "2":
        print("\n启动定时任务...")
        system.run(mode="scheduled")
    else:
        print("无效选择,退出系统")

6. 总结

通过上面的内容,我们一步步构建了一个完整的Qwen-Image-Edit自动化处理系统。从最基础的API调用,到批量处理,再到完整的电商自动化系统,这套方案可以显著提升图片处理的效率。

实际用下来,Qwen-Image-Edit的API调用确实比较简单,效果也相当不错。特别是对于电商图片处理、内容创作这些场景,能节省大量时间。Python的灵活性让我们可以轻松定制各种处理流程,满足不同的业务需求。

如果你刚开始接触,建议先从单张图片处理开始,熟悉API的基本用法。等掌握了基础,再尝试批量处理。最后,根据你的具体业务需求,定制自己的自动化系统。

这套系统还有很多可以扩展的地方,比如添加图片质量检测、自动分类、与电商平台API对接等。你可以根据自己的需求继续完善它。


获取更多AI镜像

想探索更多AI镜像和应用场景?访问 CSDN星图镜像广场,提供丰富的预置镜像,覆盖大模型推理、图像生成、视频生成、模型微调等多个领域,支持一键部署。

Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐