参考: https://langchain-doc.cn/v1/python/langchain/quickstart.html
先让程序跑起来

from dataclasses import dataclass
from langchain.tools import tool, ToolRuntime
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import InMemorySaver


@tool
def get_weather_for_location(city: str) -> str:
    """获取指定城市的天气。"""
    return f"{city}总是阳光明媚!"


@dataclass
class Context:
    """自定义运行时上下文模式。"""

    user_id: str


@tool
def get_user_location(runtime: ToolRuntime[Context]) -> str:
    """根据用户 ID 获取用户信息。"""
    user_id = runtime.context.user_id
    return "Florida" if user_id == "1" else "SF"


deepseek_api_base = "https://api.deepseek.com"
deepseek_api_key = "sk-*********************************"
deepseek_model = "deepseek-chat"

# 设置DeepSeek API Key和基础地址
llm = ChatOpenAI(
    model=deepseek_model,
    openai_api_key=deepseek_api_key,
    openai_api_base=deepseek_api_base,  # 关键配置
    temperature=0.7,
)
# 使用自定义chat_model
# from my_chat_model import MyChatModel
# llm = MyChatModel(temperature=0.7, max_tokens=1024)

SYSTEM_PROMPT = """你是一位擅长用双关语表达的专家天气预报员。

你可以使用两个工具:

- get_weather_for_location:用于获取特定地点的天气
- get_user_location:用于获取用户的位置

如果用户询问天气,请确保你知道具体位置。如果从问题中可以判断他们指的是自己所在的位置,请使用 get_user_location 工具来查找他们的位置。"""


# 这里使用 dataclass,但也支持 Pydantic 模型。
@dataclass
class ResponseFormat:
    """代理的响应模式。"""

    # 带双关语的回应(始终必需)
    punny_response: str
    # 天气的任何有趣信息(如果有)
    weather_conditions: str | None = None


checkpointer = InMemorySaver()

agent = create_agent(
    model=llm,
    system_prompt=SYSTEM_PROMPT,
    tools=[get_user_location, get_weather_for_location],
    context_schema=Context,
    response_format=ResponseFormat,
    checkpointer=checkpointer,
)


if __name__ == "__main__":
    # `thread_id` 是给定对话的唯一标识符。
    config = {"configurable": {"thread_id": "1"}}

    response = agent.invoke(
        {"messages": [{"role": "user", "content": "外面的天气怎么样?"}]},
        config=config,
        context=Context(user_id="1"),
    )
    print("第一次:  \n")
    print(response)
    print("\n")

    print(response["structured_response"])

    # 注意,我们可以使用相同的 `thread_id` 继续对话。
    response = agent.invoke(
        {"messages": [{"role": "user", "content": "谢谢!"}]},
        config=config,
        context=Context(user_id="1"),
    )
    print("第二次:  \n")
    print(response)
    print("\n")
    print(response["structured_response"])

    messages = response["messages"]
    for message in messages:
        print(f"消息类型: {type(message).__name__}")
        print(f"内容: {message.content}")
        print(f"ID: {message.id}")
        print("-" * 50)

如果是豆包或者其他只能使用API的模型可以自定义chat_model,
测试过 豆包和deepseek

from __future__ import annotations

import asyncio
import json
import logging
from typing import (
    Any,
    AsyncIterator,
    Dict,
    Iterator,
    List,
    Optional,
    Tuple,
    Type,
    Union,
)

import aiohttp
import requests
from langchain_core.callbacks import (
    AsyncCallbackManagerForLLMRun,
    CallbackManagerForLLMRun,
)
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import (
    AIMessage,
    BaseMessage,
    AIMessageChunk,
    ChatMessage,
    HumanMessage,
    SystemMessage,
)
from langchain_core.outputs import (
    ChatGeneration,
    ChatGenerationChunk,
    ChatResult,
)

from langchain_core.tools import BaseTool

# 修正:使用 Pydantic v2 直接导入
from pydantic import Field, model_validator
from langchain_core.utils import get_from_dict_or_env

logger = logging.getLogger(__name__)



my_api_base = "**************"
my_api_key = "**************"
my_model_name = "**************"


class MyChatModel(BaseChatModel):
    """
    自定义聊天模型封装类,继承自BaseChatModel
    """

    tools: list[str] = Field(default=None)
    tool_names: list[str] = Field(default=None)

    # 模型API基础URL(注意:模型实际API地址可能不同,需替换为官方地址)
    base_url: str = Field(default=my_api_base, alias="base_url")

    # 模型名称(模型实际模型名需参考官方文档)
    model_name: str = Field(default=my_model_name, alias="model")

    # API密钥
    api_key: Optional[str] = Field(default=my_api_key)

    # 温度参数,控制生成的随机性
    temperature: float = 0.7

    # 最大生成 tokens
    max_tokens: int = 1024

    # 请求超时时间(秒)
    timeout: int = 60

    # 额外的API参数
    extra_kwargs: Dict[str, Any] = Field(default_factory=dict)

    # 修正:使用 Pydantic v2 的 model_validator 替代 root_validator
    @model_validator(mode="after")
    def validate_environment(self) -> "MyChatModel":
        """验证环境变量和配置参数"""
        # 从环境变量或配置中获取API密钥
        self.api_key = get_from_dict_or_env(
            {"api_key": self.api_key}, "api_key", "DOUBAO_API_KEY"
        )

        # 验证基础URL
        if not self.base_url.startswith(("http://", "https://")):
            raise ValueError(f"无效的base_url: {self.base_url}")

        return self

    @property
    def _llm_type(self) -> str:
        """返回LLM类型标识"""
        return my_model_name

    @property
    def _identifying_params(self) -> Dict[str, Any]:
        """返回用于识别模型的参数"""
        return {
            "model_name": self.model_name,
            "temperature": self.temperature,
            "max_tokens": self.max_tokens,
            **self.extra_kwargs,
        }

    def _convert_messages_to_doubao_format(
        self, messages: List[BaseMessage]
    ) -> List[Dict[str, str]]:
        """
        将LangChain消息格式转换为模型API所需的格式
        """
        doubao_messages = []
        for message in messages:
            if isinstance(message, HumanMessage):
                doubao_messages.append({"role": "user", "content": message.content})
            elif isinstance(message, AIMessage):
                doubao_messages.append(
                    {"role": "assistant", "content": message.content}
                )
            elif isinstance(message, SystemMessage):
                doubao_messages.append({"role": "system", "content": message.content})
            elif isinstance(message, ChatMessage):
                doubao_messages.append(
                    {"role": message.role, "content": message.content}
                )
            else:
                raise ValueError(f"不支持的消息类型: {type(message)}")
        return doubao_messages

    def _generate(
        self,
        messages: List[BaseMessage],
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> ChatResult:
        """同步生成聊天响应"""
        params = {
            "model": self.model_name,
            "messages": self._convert_messages_to_doubao_format(messages),
            "temperature": self.temperature,
            "max_tokens": self.max_tokens,
            "stop": stop or [],
            **self.extra_kwargs,
            **kwargs,
        }

        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.api_key}",
        }

        try:
            response = requests.post(
                self.base_url,
                headers=headers,
                json=params,
                timeout=self.timeout,
            )
            response.raise_for_status()
            response_json = response.json()
        except Exception as e:
            logger.error(f"调用模型API失败: {e}")
            raise

        if "choices" not in response_json or len(response_json["choices"]) == 0:
            raise ValueError(f"模型API返回无效响应: {response_json}")

        choice = response_json["choices"][0]
        message = choice["message"]

        ai_message = AIMessage(
            content=message["content"],
            additional_kwargs={"model": self.model_name},
        )

        generation = ChatGeneration(message=ai_message)
        return ChatResult(generations=[generation])

    def bind_tools(
        self,
        tools: List[BaseTool],
        **kwargs: Any,
    ) -> "MyChatModel":
        """绑定工具到模型"""
        new_model = self._copy()
        new_model.tools = tools
        new_model.tool_names = [tool.name for tool in tools]
        return new_model

    def _copy(self) -> "MyChatModel":
        return MyChatModel(model_name=self.model_name)

    async def _agenerate(
        self,
        messages: List[BaseMessage],
        stop: Optional[List[str]] = None,
        run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> ChatResult:
        """异步生成聊天响应"""
        params = {
            "model": self.model_name,
            "messages": self._convert_messages_to_doubao_format(messages),
            "temperature": self.temperature,
            "max_tokens": self.max_tokens,
            "stop": stop or [],
            **self.extra_kwargs,
            **kwargs,
        }

        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.api_key}",
        }

        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(
                    self.base_url,
                    headers=headers,
                    json=params,
                    timeout=self.timeout,
                ) as response:
                    response.raise_for_status()
                    response_json = await response.json()
        except Exception as e:
            logger.error(f"异步调用模型API失败: {e}")
            raise

        if "choices" not in response_json or len(response_json["choices"]) == 0:
            raise ValueError(f"模型API返回无效响应: {response_json}")

        choice = response_json["choices"][0]
        message = choice["message"]

        ai_message = AIMessage(
            content=message["content"],
            additional_kwargs={"model": self.model_name},
        )

        generation = ChatGeneration(message=ai_message)
        return ChatResult(generations=[generation])

    def _stream(
        self,
        messages: List[BaseMessage],
        stop: Optional[List[str]] = None,
        run_manager: Optional[CallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> Iterator[ChatGenerationChunk]:
        """同步流式生成聊天响应"""
        params = {
            "model": self.model_name,
            "messages": self._convert_messages_to_doubao_format(messages),
            "temperature": self.temperature,
            "max_tokens": self.max_tokens,
            "stop": stop or [],
            "stream": True,
            **self.extra_kwargs,
            **kwargs,
        }

        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.api_key}",
        }

        try:
            with requests.post(
                self.base_url,
                headers=headers,
                json=params,
                stream=True,
                timeout=self.timeout,
            ) as response:
                response.raise_for_status()

                for line in response.iter_lines():
                    if not line:
                        continue

                    line = line.decode("utf-8").lstrip("data: ").rstrip("\n")
                    if line == "[DONE]":
                        break

                    try:
                        chunk = json.loads(line)
                    except json.JSONDecodeError:
                        logger.warning(f"无效的JSON chunk: {line}")
                        continue

                    if "choices" in chunk and len(chunk["choices"]) > 0:
                        choice = chunk["choices"][0]
                        content = choice.get("delta", {}).get("content", "")

                        if content:
                            # 关键修正:使用AIMessageChunk替代AIMessage
                            chunk = ChatGenerationChunk(
                                message=AIMessageChunk(content=content)
                            )
                            yield chunk

                            if run_manager:
                                run_manager.on_llm_new_token(content)
        except Exception as e:
            logger.error(f"模型API流式调用失败: {e}")
            raise

    async def _astream(
        self,
        messages: List[BaseMessage],
        stop: Optional[List[str]] = None,
        run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
        **kwargs: Any,
    ) -> AsyncIterator[ChatGenerationChunk]:
        """异步流式生成聊天响应"""
        params = {
            "model": self.model_name,
            "messages": self._convert_messages_to_doubao_format(messages),
            "temperature": self.temperature,
            "max_tokens": self.max_tokens,
            "stop": stop or [],
            "stream": True,
            **self.extra_kwargs,
            **kwargs,
        }

        headers = {
            "Content-Type": "application/json",
            "Authorization": f"Bearer {self.api_key}",
        }

        try:
            async with aiohttp.ClientSession() as session:
                async with session.post(
                    self.base_url,
                    headers=headers,
                    json=params,
                    timeout=self.timeout,
                ) as response:
                    response.raise_for_status()

                    async for line in response.content.iter_lines():
                        if not line:
                            continue

                        line = line.decode("utf-8").lstrip("data: ").rstrip("\n")
                        if line == "[DONE]":
                            break

                        try:
                            chunk = json.loads(line)
                        except json.JSONDecodeError:
                            logger.warning(f"无效的JSON chunk: {line}")
                            continue

                        if "choices" in chunk and len(chunk["choices"]) > 0:
                            choice = chunk["choices"][0]
                            content = choice.get("delta", {}).get("content", "")

                            if content:
                                # 关键修正:使用AIMessageChunk替代AIMessage
                                chunk = ChatGenerationChunk(
                                    message=AIMessageChunk(content=content)
                                )
                                yield chunk

                                if run_manager:
                                    await run_manager.on_llm_new_token(content)
        except Exception as e:
            logger.error(f"模型API异步流式调用失败: {e}")
            raise


if __name__ == "__main__":
    # 初始化模型聊天模型
    chat = MyChatModel(temperature=0.7, max_tokens=1024)

    # 发送消息
    messages = [
        SystemMessage(content="你是一个 helpful 的助手"),
        HumanMessage(content="你好,介绍一下你自己"),
    ]

    # 同步调用
    response = chat.invoke(messages)
    print(response.content)

    # 流式调用
    for chunk in chat.stream(messages):
        print(chunk.content, end="", flush=True)

python 3.10
requirements.txt

aiohappyeyeballs==2.6.1
aiohttp==3.13.2
aiosignal==1.4.0
annotated-types==0.7.0
anyio==4.11.0
async-timeout==5.0.1
attrs==25.4.0
certifi==2025.10.5
charset-normalizer==3.4.4
colorama==0.4.6
distro==1.9.0
exceptiongroup==1.3.0
frozenlist==1.8.0
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
jiter==0.11.1
jsonpatch==1.33
jsonpointer==3.0.0
langchain==1.0.3
langchain-core==1.0.3
langchain-openai==1.0.2
langgraph==1.0.2
langgraph-checkpoint==3.0.0
langgraph-prebuilt==1.0.2
langgraph-sdk==0.2.9
langsmith==0.4.40
multidict==6.7.0
openai==2.7.1
orjson==3.11.4
ormsgpack==1.11.0
packaging==25.0
propcache==0.4.1
pydantic==2.12.3
pydantic_core==2.41.4
PyYAML==6.0.3
regex==2025.11.3
requests==2.32.5
requests-toolbelt==1.0.0
sniffio==1.3.1
tenacity==9.1.2
tiktoken==0.12.0
tqdm==4.67.1
typing-inspection==0.4.2
typing_extensions==4.15.0
urllib3==2.5.0
xxhash==3.6.0
yarl==1.22.0
zstandard==0.25.0
Logo

Agent 垂直技术社区,欢迎活跃、内容共建。

更多推荐