langchain基础教程(1)--快速搭建一个简单的应用(基于BaseChatModel自定义chatModel)
·
参考: https://langchain-doc.cn/v1/python/langchain/quickstart.html
先让程序跑起来
from dataclasses import dataclass
from langchain.tools import tool, ToolRuntime
from langchain.agents import create_agent
from langchain_openai import ChatOpenAI
from langgraph.checkpoint.memory import InMemorySaver
@tool
def get_weather_for_location(city: str) -> str:
"""获取指定城市的天气。"""
return f"{city}总是阳光明媚!"
@dataclass
class Context:
"""自定义运行时上下文模式。"""
user_id: str
@tool
def get_user_location(runtime: ToolRuntime[Context]) -> str:
"""根据用户 ID 获取用户信息。"""
user_id = runtime.context.user_id
return "Florida" if user_id == "1" else "SF"
deepseek_api_base = "https://api.deepseek.com"
deepseek_api_key = "sk-*********************************"
deepseek_model = "deepseek-chat"
# 设置DeepSeek API Key和基础地址
llm = ChatOpenAI(
model=deepseek_model,
openai_api_key=deepseek_api_key,
openai_api_base=deepseek_api_base, # 关键配置
temperature=0.7,
)
# 使用自定义chat_model
# from my_chat_model import MyChatModel
# llm = MyChatModel(temperature=0.7, max_tokens=1024)
SYSTEM_PROMPT = """你是一位擅长用双关语表达的专家天气预报员。
你可以使用两个工具:
- get_weather_for_location:用于获取特定地点的天气
- get_user_location:用于获取用户的位置
如果用户询问天气,请确保你知道具体位置。如果从问题中可以判断他们指的是自己所在的位置,请使用 get_user_location 工具来查找他们的位置。"""
# 这里使用 dataclass,但也支持 Pydantic 模型。
@dataclass
class ResponseFormat:
"""代理的响应模式。"""
# 带双关语的回应(始终必需)
punny_response: str
# 天气的任何有趣信息(如果有)
weather_conditions: str | None = None
checkpointer = InMemorySaver()
agent = create_agent(
model=llm,
system_prompt=SYSTEM_PROMPT,
tools=[get_user_location, get_weather_for_location],
context_schema=Context,
response_format=ResponseFormat,
checkpointer=checkpointer,
)
if __name__ == "__main__":
# `thread_id` 是给定对话的唯一标识符。
config = {"configurable": {"thread_id": "1"}}
response = agent.invoke(
{"messages": [{"role": "user", "content": "外面的天气怎么样?"}]},
config=config,
context=Context(user_id="1"),
)
print("第一次: \n")
print(response)
print("\n")
print(response["structured_response"])
# 注意,我们可以使用相同的 `thread_id` 继续对话。
response = agent.invoke(
{"messages": [{"role": "user", "content": "谢谢!"}]},
config=config,
context=Context(user_id="1"),
)
print("第二次: \n")
print(response)
print("\n")
print(response["structured_response"])
messages = response["messages"]
for message in messages:
print(f"消息类型: {type(message).__name__}")
print(f"内容: {message.content}")
print(f"ID: {message.id}")
print("-" * 50)
如果是豆包或者其他只能使用API的模型可以自定义chat_model,
测试过 豆包和deepseek
from __future__ import annotations
import asyncio
import json
import logging
from typing import (
Any,
AsyncIterator,
Dict,
Iterator,
List,
Optional,
Tuple,
Type,
Union,
)
import aiohttp
import requests
from langchain_core.callbacks import (
AsyncCallbackManagerForLLMRun,
CallbackManagerForLLMRun,
)
from langchain_core.language_models import BaseChatModel
from langchain_core.messages import (
AIMessage,
BaseMessage,
AIMessageChunk,
ChatMessage,
HumanMessage,
SystemMessage,
)
from langchain_core.outputs import (
ChatGeneration,
ChatGenerationChunk,
ChatResult,
)
from langchain_core.tools import BaseTool
# 修正:使用 Pydantic v2 直接导入
from pydantic import Field, model_validator
from langchain_core.utils import get_from_dict_or_env
logger = logging.getLogger(__name__)
my_api_base = "**************"
my_api_key = "**************"
my_model_name = "**************"
class MyChatModel(BaseChatModel):
"""
自定义聊天模型封装类,继承自BaseChatModel
"""
tools: list[str] = Field(default=None)
tool_names: list[str] = Field(default=None)
# 模型API基础URL(注意:模型实际API地址可能不同,需替换为官方地址)
base_url: str = Field(default=my_api_base, alias="base_url")
# 模型名称(模型实际模型名需参考官方文档)
model_name: str = Field(default=my_model_name, alias="model")
# API密钥
api_key: Optional[str] = Field(default=my_api_key)
# 温度参数,控制生成的随机性
temperature: float = 0.7
# 最大生成 tokens
max_tokens: int = 1024
# 请求超时时间(秒)
timeout: int = 60
# 额外的API参数
extra_kwargs: Dict[str, Any] = Field(default_factory=dict)
# 修正:使用 Pydantic v2 的 model_validator 替代 root_validator
@model_validator(mode="after")
def validate_environment(self) -> "MyChatModel":
"""验证环境变量和配置参数"""
# 从环境变量或配置中获取API密钥
self.api_key = get_from_dict_or_env(
{"api_key": self.api_key}, "api_key", "DOUBAO_API_KEY"
)
# 验证基础URL
if not self.base_url.startswith(("http://", "https://")):
raise ValueError(f"无效的base_url: {self.base_url}")
return self
@property
def _llm_type(self) -> str:
"""返回LLM类型标识"""
return my_model_name
@property
def _identifying_params(self) -> Dict[str, Any]:
"""返回用于识别模型的参数"""
return {
"model_name": self.model_name,
"temperature": self.temperature,
"max_tokens": self.max_tokens,
**self.extra_kwargs,
}
def _convert_messages_to_doubao_format(
self, messages: List[BaseMessage]
) -> List[Dict[str, str]]:
"""
将LangChain消息格式转换为模型API所需的格式
"""
doubao_messages = []
for message in messages:
if isinstance(message, HumanMessage):
doubao_messages.append({"role": "user", "content": message.content})
elif isinstance(message, AIMessage):
doubao_messages.append(
{"role": "assistant", "content": message.content}
)
elif isinstance(message, SystemMessage):
doubao_messages.append({"role": "system", "content": message.content})
elif isinstance(message, ChatMessage):
doubao_messages.append(
{"role": message.role, "content": message.content}
)
else:
raise ValueError(f"不支持的消息类型: {type(message)}")
return doubao_messages
def _generate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
"""同步生成聊天响应"""
params = {
"model": self.model_name,
"messages": self._convert_messages_to_doubao_format(messages),
"temperature": self.temperature,
"max_tokens": self.max_tokens,
"stop": stop or [],
**self.extra_kwargs,
**kwargs,
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
}
try:
response = requests.post(
self.base_url,
headers=headers,
json=params,
timeout=self.timeout,
)
response.raise_for_status()
response_json = response.json()
except Exception as e:
logger.error(f"调用模型API失败: {e}")
raise
if "choices" not in response_json or len(response_json["choices"]) == 0:
raise ValueError(f"模型API返回无效响应: {response_json}")
choice = response_json["choices"][0]
message = choice["message"]
ai_message = AIMessage(
content=message["content"],
additional_kwargs={"model": self.model_name},
)
generation = ChatGeneration(message=ai_message)
return ChatResult(generations=[generation])
def bind_tools(
self,
tools: List[BaseTool],
**kwargs: Any,
) -> "MyChatModel":
"""绑定工具到模型"""
new_model = self._copy()
new_model.tools = tools
new_model.tool_names = [tool.name for tool in tools]
return new_model
def _copy(self) -> "MyChatModel":
return MyChatModel(model_name=self.model_name)
async def _agenerate(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> ChatResult:
"""异步生成聊天响应"""
params = {
"model": self.model_name,
"messages": self._convert_messages_to_doubao_format(messages),
"temperature": self.temperature,
"max_tokens": self.max_tokens,
"stop": stop or [],
**self.extra_kwargs,
**kwargs,
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
self.base_url,
headers=headers,
json=params,
timeout=self.timeout,
) as response:
response.raise_for_status()
response_json = await response.json()
except Exception as e:
logger.error(f"异步调用模型API失败: {e}")
raise
if "choices" not in response_json or len(response_json["choices"]) == 0:
raise ValueError(f"模型API返回无效响应: {response_json}")
choice = response_json["choices"][0]
message = choice["message"]
ai_message = AIMessage(
content=message["content"],
additional_kwargs={"model": self.model_name},
)
generation = ChatGeneration(message=ai_message)
return ChatResult(generations=[generation])
def _stream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[CallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> Iterator[ChatGenerationChunk]:
"""同步流式生成聊天响应"""
params = {
"model": self.model_name,
"messages": self._convert_messages_to_doubao_format(messages),
"temperature": self.temperature,
"max_tokens": self.max_tokens,
"stop": stop or [],
"stream": True,
**self.extra_kwargs,
**kwargs,
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
}
try:
with requests.post(
self.base_url,
headers=headers,
json=params,
stream=True,
timeout=self.timeout,
) as response:
response.raise_for_status()
for line in response.iter_lines():
if not line:
continue
line = line.decode("utf-8").lstrip("data: ").rstrip("\n")
if line == "[DONE]":
break
try:
chunk = json.loads(line)
except json.JSONDecodeError:
logger.warning(f"无效的JSON chunk: {line}")
continue
if "choices" in chunk and len(chunk["choices"]) > 0:
choice = chunk["choices"][0]
content = choice.get("delta", {}).get("content", "")
if content:
# 关键修正:使用AIMessageChunk替代AIMessage
chunk = ChatGenerationChunk(
message=AIMessageChunk(content=content)
)
yield chunk
if run_manager:
run_manager.on_llm_new_token(content)
except Exception as e:
logger.error(f"模型API流式调用失败: {e}")
raise
async def _astream(
self,
messages: List[BaseMessage],
stop: Optional[List[str]] = None,
run_manager: Optional[AsyncCallbackManagerForLLMRun] = None,
**kwargs: Any,
) -> AsyncIterator[ChatGenerationChunk]:
"""异步流式生成聊天响应"""
params = {
"model": self.model_name,
"messages": self._convert_messages_to_doubao_format(messages),
"temperature": self.temperature,
"max_tokens": self.max_tokens,
"stop": stop or [],
"stream": True,
**self.extra_kwargs,
**kwargs,
}
headers = {
"Content-Type": "application/json",
"Authorization": f"Bearer {self.api_key}",
}
try:
async with aiohttp.ClientSession() as session:
async with session.post(
self.base_url,
headers=headers,
json=params,
timeout=self.timeout,
) as response:
response.raise_for_status()
async for line in response.content.iter_lines():
if not line:
continue
line = line.decode("utf-8").lstrip("data: ").rstrip("\n")
if line == "[DONE]":
break
try:
chunk = json.loads(line)
except json.JSONDecodeError:
logger.warning(f"无效的JSON chunk: {line}")
continue
if "choices" in chunk and len(chunk["choices"]) > 0:
choice = chunk["choices"][0]
content = choice.get("delta", {}).get("content", "")
if content:
# 关键修正:使用AIMessageChunk替代AIMessage
chunk = ChatGenerationChunk(
message=AIMessageChunk(content=content)
)
yield chunk
if run_manager:
await run_manager.on_llm_new_token(content)
except Exception as e:
logger.error(f"模型API异步流式调用失败: {e}")
raise
if __name__ == "__main__":
# 初始化模型聊天模型
chat = MyChatModel(temperature=0.7, max_tokens=1024)
# 发送消息
messages = [
SystemMessage(content="你是一个 helpful 的助手"),
HumanMessage(content="你好,介绍一下你自己"),
]
# 同步调用
response = chat.invoke(messages)
print(response.content)
# 流式调用
for chunk in chat.stream(messages):
print(chunk.content, end="", flush=True)
python 3.10
requirements.txt
aiohappyeyeballs==2.6.1
aiohttp==3.13.2
aiosignal==1.4.0
annotated-types==0.7.0
anyio==4.11.0
async-timeout==5.0.1
attrs==25.4.0
certifi==2025.10.5
charset-normalizer==3.4.4
colorama==0.4.6
distro==1.9.0
exceptiongroup==1.3.0
frozenlist==1.8.0
h11==0.16.0
httpcore==1.0.9
httpx==0.28.1
idna==3.11
jiter==0.11.1
jsonpatch==1.33
jsonpointer==3.0.0
langchain==1.0.3
langchain-core==1.0.3
langchain-openai==1.0.2
langgraph==1.0.2
langgraph-checkpoint==3.0.0
langgraph-prebuilt==1.0.2
langgraph-sdk==0.2.9
langsmith==0.4.40
multidict==6.7.0
openai==2.7.1
orjson==3.11.4
ormsgpack==1.11.0
packaging==25.0
propcache==0.4.1
pydantic==2.12.3
pydantic_core==2.41.4
PyYAML==6.0.3
regex==2025.11.3
requests==2.32.5
requests-toolbelt==1.0.0
sniffio==1.3.1
tenacity==9.1.2
tiktoken==0.12.0
tqdm==4.67.1
typing-inspection==0.4.2
typing_extensions==4.15.0
urllib3==2.5.0
xxhash==3.6.0
yarl==1.22.0
zstandard==0.25.0
更多推荐

所有评论(0)