feat: demo

This commit is contained in:
liangfangxing
2026-03-19 19:28:48 +08:00
parent a5d15c099d
commit 65baccba87
22 changed files with 1304 additions and 1 deletions

View File

@@ -0,0 +1,10 @@
from python_a2a import AgentNetwork
network = AgentNetwork(name="MyNetwork")
network.add("TicketAgent", "http://127.0.0.1:5010")
print(f"agent network-->{network.agent_cards}")
print('*'*80)
# 调用
client = network.get_agent("TicketAgent")
print(client.ask("预订一张从北京到上海的火车票"))

View File

@@ -0,0 +1,22 @@
from python_a2a import AIAgentRouter, AgentNetwork
from langchain_openai import ChatOpenAI
from conf import settings
# 创建网络
network = AgentNetwork(name="MyNetwork")
network.add("TicketAgent", "http://127.0.0.1:5010")
# 创建模型
llm = ChatOpenAI(
base_url=settings.base_url,
api_key=settings.api_key,
model=settings.model_name,
temperature=0.1
)
# 创建路由器
router = AIAgentRouter(llm_client=llm, agent_network=network)
agent_name, confidence = router.route_query("预订票")
print(agent_name, confidence)
client = network.get_agent(agent_name)
print(client.ask("预订一张从北京到上海的火车票"))

View File

@@ -0,0 +1,14 @@
import asyncio
from python_a2a import A2AClient
async def main():
ticket_client = A2AClient("http://127.0.0.1:5010")
#预订火车票
ticket_query = "预订一张从北京到上海的火车票"
print(f"[主控客户端日志]预订票务 -> '{ticket_query}'")
ticket_result = ticket_client.ask(ticket_query)
print(f"[主控客户端日志] 收到票务预订结果: {ticket_result}")
if __name__ == "__main__":
asyncio.run(main())

View File

@@ -0,0 +1,38 @@
from python_a2a import A2AServer, run_server, AgentCard, AgentSkill, TaskStatus, TaskState
# 定义代理卡片
ticket_card = AgentCard(
name="TicketAgentServer",
description="票务代理",
url="http://127.0.0.1:5010",
skills=[AgentSkill(name="book_ticket", description="预订票务")]
)
# 自定义 A2AServer 子类
class TicketServer(A2AServer):
def __init__(self):
super().__init__(agent_card=ticket_card)
def handle_task(self, task):
print("收到A2A任务的task:=>", task)
#默认写法:获取任务内容
query = (task.message or {}).get("content", {}).get("text", "")
if "上海" in query and "北京" in query:
# 这里的结果可以来自于 MCP 模块,这里我们直接模拟结果
train_result = "上海到北京的火车票已经预订成功! G1001,10车1A "
else:
train_result = "请输入明确的出发地和目的地。"
task.artifacts = [{"parts": [{"type": "text", "text": train_result}]}]
task.status = TaskStatus(state=TaskState.COMPLETED)
print(f"[{self.agent_card.name} 日志] 任务处理完毕")
print(f"[{self.agent_card.name} 日志] 输出结果task: {task}")
print(f"[{self.agent_card.name} 日志] 输出结果task.artifacts: {task.artifacts}")
return task
# 启动服务器
if __name__ == "__main__":
server = TicketServer()
print(f"[{server.agent_card.name}] 启动成功,服务地址: {server.agent_card.url}")
run_server(server, host="127.0.0.1", port=5010, debug=True)

138
demo_agent/muiti_agent.py Normal file
View File

@@ -0,0 +1,138 @@
from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder, SystemMessagePromptTemplate, \
HumanMessagePromptTemplate # 导入所有必需的 Prompt 类
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain.agents import AgentExecutor, create_tool_calling_agent
from langchain_core.output_parsers import StrOutputParser
from conf import settings
# 创建模型
llm = ChatOpenAI(
base_url=settings.base_url,
api_key=settings.api_key,
model=settings.model_name,
temperature=0.1
)
# 2.定义工具
# 2.1 计算工具
@tool
def multiply(a: int, b: int) -> int:
"""用于计算两个整数的乘积。
参数:
a (int): 第一个整数。
b (int): 第二个整数。
"""
print(f"\n[计算专家] -> 正在执行乘法: {a} * {b}")
return a * b
@tool
def add(a: int, b: int) -> int:
"""用于计算两个整数的和。
参数:
a (int): 第一个整数。
b (int): 第二个整数。
"""
print(f"\n[计算专家] -> 正在执行加法: {a} + {b}")
return a + b
# 2.2 信息查询工具
@tool
def search_weather(city: str) -> str:
"""用于查询指定城市的实时天气。"""
print(f"正在查询天气: {city}")
if "北京" in city:
return "北京今天是晴天气温25摄氏度。"
elif "上海" in city:
return "上海今天是阴天有小雨气温22摄氏度。"
else:
return f"抱歉,我没有'{city}'的天气信息。"
@tool
def get_current_date() -> str:
"""用于获取当前日期。"""
print("\n[信息专家] -> 正在获取当前日期...")
import datetime
return datetime.date.today().strftime("%Y年%m月%d")
# 3 创建两个独立的 Agent
# 3.1 创建“计算专家” Agent
math_tools = [multiply, add]
# 创建完整的 Tool Calling Prompt
# 这包括一个系统消息,一个用户消息占位符,以及一个 Agent 中间思考过程的占位符。
math_prompt = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template("你是一个强大的数学计算专家,可以访问和使用各种数学工具。"),
HumanMessagePromptTemplate.from_template("{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
# 创建 math_Agent
math_agent = create_tool_calling_agent(llm, math_tools, math_prompt)
# 创建 math Agent Executor
math_executor = AgentExecutor(
agent=math_agent,
tools=math_tools,
verbose=True
)
# 3.2 创建“信息专家” Agent
info_tools = [search_weather, get_current_date]
# 手动创建完整的 Tool Calling Prompt
info_prompt = ChatPromptTemplate.from_messages([
SystemMessagePromptTemplate.from_template("你是一个强大的信息查询专家,可以访问和使用各种查询工具。"),
HumanMessagePromptTemplate.from_template("{input}"),
MessagesPlaceholder(variable_name="agent_scratchpad")
])
# 创建 info Agent
info_agent = create_tool_calling_agent(llm, info_tools, info_prompt)
# 创建 info Agent Executor
info_executor = AgentExecutor(
agent=info_agent,
tools=info_tools,
verbose=True
)
# 4.协调和总结工作流
def multi_agent_workflow(query: str, math_task: str, info_task: str):
print("--- 启动多智能体协作流程 ---")
print(f"\n用户原始请求: {query}")
# 4.1 让“计算专家”执行任务
print("\n[主程序] -> 将任务分配给计算专家...")
math_result = math_executor.invoke({"input": math_task}).get("output")
print(f"\n[主程序] -> 计算专家返回结果: {math_result}")
# 4.2 让“信息专家”执行任务
print("\n[主程序] -> 将任务分配给信息专家...")
info_result = info_executor.invoke({"input": info_task}).get("output")
print(f"\n[主程序] -> 信息专家返回结果: {info_result}")
# 4.3 使用 LLM 进行最终结果总结
print("\n[主程序] -> 使用大模型进行最终总结...")
summarize_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个善于总结和整合信息的助手。请根据以下信息,为用户原始请求生成一个完整的回答。"),
("human",
f"用户请求: {query}\n\n计算结果: {math_result}\n\n信息查询结果: {info_result}\n\n请整合以上信息,生成一个连贯的最终回答。")
])
summarize_chain = summarize_prompt | llm | StrOutputParser()
final_answer = summarize_chain.invoke({"query": query})
print("\n--- 协作流程已完成!---")
print(f"最终综合回答:\n{final_answer}")
return final_answer
if __name__ == "__main__":
# 定义用户原始请求和分配给每个Agent的子任务
original_query = "请先计算 25 乘以 4然后告诉我北京今天的天气和当前日期。"
math_task = "计算 25 乘以 4"
info_task = "查询北京今天的天气和当前日期"
# 启动工作流
multi_agent_workflow(original_query, math_task, info_task)

View File

@@ -0,0 +1,129 @@
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain.agents import AgentExecutor, create_react_agent
from conf import settings
# 创建模型
llm = ChatOpenAI(
base_url=settings.base_url,
api_key=settings.api_key,
model=settings.model_name,
temperature=0.1
)
# 2.定义工具
@tool
def multiply(numbers_str: str) -> int:
"""用于计算两个整数的乘积。
参数:
numbers_str (str): 包含两个整数的字符串,用逗号分隔,例如:"100,25"
返回:
int: 两个整数的乘积。
"""
print(f"正在执行乘法: {numbers_str}")
try:
a_str, b_str = numbers_str.split(',')
a = int(a_str.strip())
b = int(b_str.strip())
return a * b
except ValueError:
return "输入的格式不正确,请确保是两个用逗号分隔的整数,例如:'100,25'"
@tool
def search_weather(city: str) -> str:
"""用于查询指定城市的实时天气。"""
print(f"正在查询天气: {city}")
if "北京" in city:
return "北京今天是晴天气温25摄氏度。"
elif "上海" in city:
return "上海今天是阴天有小雨气温22摄氏度。"
else:
return f"抱歉,我没有'{city}'的天气信息。"
# 将工具列表放入一个变量
tools = [multiply, search_weather]
# 3.定义规划器 (Planner) 和执行者 (Executor) 的 Prompt
# 3.1 规划器的 Prompt
# 规划器的职责是分析用户任务,并将其分解成一系列简单的、可执行的子任务。
planner_prompt = ChatPromptTemplate.from_template(
"""你是一个任务规划师,你的工作是将用户提出的一个复杂任务分解成一系列清晰、可执行的步骤。
你的输出应该是一个简单的任务列表,每行一个任务。
例子:
用户任务: "请先查上海的天气然后计算20乘以30。"
任务列表:
- 查找上海的天气
- 计算20乘以30的结果
用户任务: {user_input}
任务列表:
"""
)
# 规划器链,它只负责生成文本化的任务列表
planner_chain = planner_prompt | llm | StrOutputParser()
# 3.2 执行者的 Prompt
# 执行者的职责是执行单个任务。在这里,我们使用 ReAct 模式作为执行者,因为它能根据任务描述选择并调用正确的工具。
# 注意:我们使用一个简化版的 ReAct Prompt因为它只需要处理单个任务。
executor_react_prompt_template = """你是一个专业的工具执行者,可以访问以下工具:
{tools}
根据你的思考Thought决定下一步的行动Action。你的行动必须遵循以下格式
Thought: 我需要思考如何完成任务。
Action: [工具名称]
Action Input: [工具的输入参数对于multiply工具请使用'100,25'这样的格式]
可用的工具名称有: {tool_names}
当你获取了所有必要信息并可以给出最终答案时,请以以下格式结束:
Thought: 我已经有了最终答案。
Final Answer: [最终答案]
请执行以下任务:
{input}
{agent_scratchpad}
"""
executor_prompt = ChatPromptTemplate.from_template(executor_react_prompt_template)
# 4.创建 ReAct Agent 作为执行者
executor_agent = create_react_agent(llm, tools, executor_prompt)
executor_executor = AgentExecutor(
agent=executor_agent,
tools=tools,
verbose=True,
handle_parsing_errors=True # 启用错误处理,自动重试解析错误
)
# 5.定义并运行规划模式的工作流
def execute_planning_pattern(query: str):
print("--- 启动规划模式 ---")
# 规划器分解任务
print("\n规划器正在分解任务...")
plan = planner_chain.invoke({"user_input": query})
tasks = [task.strip() for task in plan.split('\n') if task.strip()]
print("规划器生成的任务列表:")
for i, task in enumerate(tasks):
print(f" {i + 1}. {task}")
# 执行者逐一执行任务
print("\n执行者正在逐一执行任务...")
for i, task in enumerate(tasks):
print(f"\n--- 执行任务 {i + 1}: {task} ---")
executor_executor.invoke({"input": task})
print("\n--- 所有任务执行完毕!---")
if __name__ == "__main__":
test_query = "请先计算 50 乘以 60 的结果,然后告诉我上海的天气怎么样?"
execute_planning_pattern(test_query)

106
demo_agent/react_agent.py Normal file
View File

@@ -0,0 +1,106 @@
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain_core.prompts import ChatPromptTemplate
from langchain.agents import AgentExecutor, create_react_agent
from conf import settings
# 创建模型
llm = ChatOpenAI(
base_url=settings.base_url,
api_key=settings.api_key,
model=settings.model_name,
temperature=0.1
)
# 2.定义工具
# 关键修改:重写 multiply 工具,使其只接受一个字符串参数,并在内部解析。
@tool
def multiply(numbers_str: str) -> int:
"""用于计算两个整数的乘积。
参数:
numbers_str (str): 包含两个整数的字符串,用逗号分隔,例如:"100,25"
返回:
int: 两个整数的乘积。
"""
print(f"正在执行乘法: {numbers_str}")
try:
a_str, b_str = numbers_str.split(',')
a = int(a_str.strip())
b = int(b_str.strip())
return a * b
except ValueError:
return "输入的格式不正确,请确保是两个用逗号分隔的整数,例如:'100,25'"
@tool
def search_weather(city: str) -> str:
"""用于查询指定城市的实时天气。"""
print(f"正在查询天气: {city}")
if "北京" in city:
return "北京今天是晴天气温25摄氏度。"
elif "上海" in city:
return "上海今天是阴天有小雨气温22摄氏度。"
else:
return f"抱歉,我没有'{city}'的天气信息。"
# 将工具列表放入一个变量
tools = [multiply, search_weather]
# 3.自定义 ReAct 风格的 Prompt
react_prompt_template = """你是一个有用的 AI 助手,可以访问以下工具:
{tools}
请根据用户输入一步步推理,并按以下规则操作:
1. 每次输出只能包含一个动作Action 和 Action Input或一个最终答案Final Answer
2. 如果用户输入包含多个任务,依次处理每个任务,不要一次性输出所有步骤。
3. 每次行动前说明你的思考Thought并选择合适的工具或直接给出最终答案。
4. 如果需要使用工具,格式必须为:
Thought: [你的思考]
Action: [工具名称]
Action Input: [工具的输入参数例如对于multiply工具使用'100,25'格式]
5. 如果可以直接回答或所有任务都完成,格式为:
Thought: [你的思考]
Final Answer: [最终答案]
可用的工具名称有: {tool_names}
用户输入: {input}
{agent_scratchpad}
"""
react_prompt = ChatPromptTemplate.from_template(react_prompt_template)
# 4.创建 ReAct 风格的 Agent
react_agent = create_react_agent(llm, tools, react_prompt)
# 5.创建 Agent Executor
react_executor = AgentExecutor(
agent=react_agent,
tools=tools,
verbose=True,
handle_parsing_errors=True # 启用错误处理,自动重试解析错误
)
# 6: 运行并测试 Agent
if __name__ == "__main__":
# 测试用例1: 查询天气
print("--- 运行Agent查询: 上海今天的天气怎么样? ---")
response_weather = react_executor.invoke({"input": "上海今天的天气怎么样?"})
print(f"\n--- Agent响应: ---")
print(response_weather.get("output", "没有找到输出。"))
print("-" * 30 + "\n")
# 测试用例2: 数学计算
print("--- 运行Agent查询: 100乘以25等于多少 ---")
response_math = react_executor.invoke({"input": "100乘以25等于多少"})
print(f"\n--- Agent响应: ---")
print(response_math.get("output", "没有找到输出。"))
print("-" * 30 + "\n")
# 测试用例3: 包含多个任务
print("--- 运行Agent查询: 100乘以25等于多少 上海的天气如何? ---")
response_multi = react_executor.invoke({"input": "100乘以25等于多少 上海的天气如何?"})
print(f"\n--- Agent响应: ---")
print(response_multi.get("output", "没有找到输出。"))
print("-" * 30 + "\n")

View File

@@ -0,0 +1,71 @@
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from conf import settings
# 创建模型
llm = ChatOpenAI(
base_url=settings.base_url,
api_key=settings.api_key,
model=settings.model_name,
temperature=0.1
)
# 3.初始响应 Prompt: 用于生成第一次的回答
initial_response_prompt = ChatPromptTemplate.from_template(
"请根据以下问题给出你的初步回答: {question}"
)
initial_response_chain = initial_response_prompt | llm | StrOutputParser()
# 4.反思 Prompt: 用于接收用户反馈并优化回答
reflection_prompt = ChatPromptTemplate.from_template(
"""你是一个专业的、善于反思的AI助手。你之前给出了以下回答
---
{previous_response}
---
现在,你收到了用户对你的回答给出的反馈:
---
{user_feedback}
---
请根据用户的反馈,认真反思你之前的回答,并生成一个更准确、更完善的新回答。
新回答:"""
)
reflection_chain = reflection_prompt | llm | StrOutputParser()
# 5.模拟反射过程
def reflect_and_refine(query: str, feedback: str):
"""模拟一个完整的反射过程,从初始响应到优化后的响应。"""
print("--- 启动反射模式 ---")
print(f"用户查询: {query}")
# LLM 生成初步响应
print("\n生成初步响应...")
initial_response = initial_response_chain.invoke({"question": query})
print(f"LLM 初步响应:\n{initial_response}")
# 模拟用户反馈
print(f"\n用户反馈:\n{feedback}")
# LLM 进行反思,并生成新的回答
print("\nLLM 正在反思并生成新响应...")
refined_response = reflection_chain.invoke({
"previous_response": initial_response,
"user_feedback": feedback
})
print("\n--- LLM 经过反思后的新响应 ---")
print(refined_response)
return refined_response
# 6.运行并测试
if __name__ == "__main__":
# 模拟用户查询
initial_question = "请用一句话介绍一下 LangChain。"
# 模拟用户反馈,指出初步回答的不足
user_feedback_text = "你的回答太简单了,请更详细地解释一下 LangChain 的核心概念,比如 Agent 和 Chain 的区别。"
# 运行反射过程
reflect_and_refine(initial_question, user_feedback_text)

View File

@@ -0,0 +1,69 @@
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from langchain_core.tools import tool
from langchain.agents import AgentExecutor, create_tool_calling_agent, create_react_agent
from conf import settings
# 创建模型
llm = ChatOpenAI(
base_url=settings.base_url,
api_key=settings.api_key,
model=settings.model_name,
temperature=0.1
)
# 2.定义工具
@tool
def multiply(a: int, b: int) -> int:
"""用于计算两个整数的乘积。"""
print(f"正在执行乘法: {a} * {b}")
return a * b
@tool
def search_weather(city: str) -> str:
"""用于查询指定城市的实时天气。"""
print(f"正在查询天气: {city}")
if "北京" in city:
return "北京今天是晴天气温25摄氏度。"
elif "上海" in city:
return "上海今天是阴天有小雨气温22摄氏度。"
else:
return f"抱歉,我没有'{city}'的天气信息。"
# 将工具列表放入一个变量
tools = [multiply, search_weather]
# 3.定义一个提示模板用于控制Agent的思考过程和工具调用
tool_use_prompt = ChatPromptTemplate.from_messages([
("system", "你是一个强大的AI助手可以访问和使用各种工具来回答问题。请根据用户的问题决定是否需要调用工具。当需要调用工具时请使用正确的JSON格式。"),
("user", "{input}"),
("placeholder", "{agent_scratchpad}") # 这个占位符用于保存 Agent 的思考过程和工具调用历史
])
# 4.创建一个 LLM 能够识别和使用的 Agent
# 使用 create_tool_calling_agent 函数,它能让 LLM 自动判断何时以及如何调用工具
tool_calling_agent = create_tool_calling_agent(llm, tools, tool_use_prompt)
# 5.创建 Agent Executor
# AgentExecutor 负责 Agent 和工具之间的协调
tool_use_executor = AgentExecutor(
agent=tool_calling_agent,
tools=tools,
verbose=True # 开启 verbose 模式,可以打印详细的执行过程
)
# 6.通用的执行函数用于运行agent并打印结果
def run_agent_and_print(agent_executor, query):
"""一个通用函数用于运行Agent并打印结果。"""
print(f"--- 运行Agent查询: {query} ---")
response = agent_executor.invoke({"input": query})
print(f"\n--- Agent响应: ---")
print(response.get("output", "没有找到输出。"))
print("-" * 30 + "\n")
if __name__ == "__main__":
run_agent_and_print(tool_use_executor, "上海今天的天气怎么样?")
run_agent_and_print(tool_use_executor, "30乘以5等于多少 上海天气怎么样")

View File

@@ -81,7 +81,7 @@ llm_with_tools = llm.bind(tools=tools, tool_choice="auto")
# todo: 第三步:调用回复
query = "2+1等于多少"
query = "2*10+1"
messages = [HumanMessage(query)]
try:

74
demo_mcp/a2a_agent.py Normal file
View File

@@ -0,0 +1,74 @@
import asyncio
import json
import logging
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate
from langchain_openai import ChatOpenAI
from python_a2a.mcp import MCPClient
from python_a2a.langchain import to_langchain_tool
from conf import settings
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 创建模型
llm = ChatOpenAI(
base_url=settings.base_url,
api_key=settings.api_key,
model=settings.model_name,
temperature=0.1
)
async def test_mcp_tools():
# 连接到服务端,端口 8000
client = MCPClient("http://localhost:8010")
try:
# 步骤 1获取可用工具列表
tools = await client.get_tools()
logger.info("可用工具列表:")
for tool in tools:
print(tool)
logger.info(f"- {tool.get('name', '未知')}: {tool.get('description', '无描述')}")
# 将 MCP tool 转成 LangChain 的工具
get_weather_tool = to_langchain_tool("http://localhost:8010", "get_weather")
query_high_frequency_question = to_langchain_tool("http://localhost:8010", "query_high_frequency_question")
tools=[get_weather_tool, query_high_frequency_question]
# 创建prompt模板
prompt_template = ChatPromptTemplate.from_messages([
("system", "你是一个乐于助人的助手,能够调用工具回答用户问题。工具不需要传参。"),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
# 构建工具调用代理
agent = create_tool_calling_agent(llm, tools, prompt_template)
# 创建代理执行器
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
# 代理调用
print("MCP客户端启动输入'quit'退出")
while True:
# 接收用户查询
query = input("\nQuery: ").strip()
if query.lower() == "quit":
break
# 发送用户查询给代理,并打印
try:
response = await agent_executor.ainvoke({"input": query})
print(f"response-->{response}")
except Exception:
print("解析有问题")
except Exception as e:
logger.error(f"MCP 客户端出错:{str(e)}", exc_info=True)
finally:
await client.close()
async def main():
await test_mcp_tools()
if __name__ == "__main__":
asyncio.run(main())

38
demo_mcp/a2a_client.py Normal file
View File

@@ -0,0 +1,38 @@
import asyncio
import logging
from python_a2a.mcp import MCPClient
# 配置日志
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
async def test_mcp_tools():
# 连接到服务端,端口 8010
client = MCPClient("http://localhost:8010")
try:
# 步骤 1获取可用工具列表
tools = await client.get_tools()
logger.info("可用工具列表:")
for tool in tools:
print(tool)
logger.info(f"- {tool.get('name', '未知')}: {tool.get('description', '无描述')}")
# 步骤 2调用查询高频问题工具
result_qhf = await client.call_tool("query_high_frequency_question")
logger.info(f"高频问题查询结果:{result_qhf}")
# 步骤 3调用查询天气工具
result_weather = await client.call_tool("get_weather")
logger.info(f"天气查询结果:{result_weather}")
except Exception as e:
logger.error(f"MCP 客户端出错:{str(e)}", exc_info=True)
finally:
await client.close()
async def main():
await test_mcp_tools()
if __name__ == "__main__":
asyncio.run(main())

65
demo_mcp/a2a_server.py Normal file
View File

@@ -0,0 +1,65 @@
import logging
import uvicorn
from python_a2a.mcp import FastMCP, create_fastapi_app
# 配置日志,方便调试
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
logger = logging.getLogger(__name__)
# 创建 MCP 服务器实例
mcp = FastMCP(
name="MyMCPTools",
description="提供高频问题和天气查询工具",
version="1.0.0"
)
# 定义工具 1查询高频问题
@mcp.tool(
name="query_high_frequency_question",
description="获取知识库中的高频问答,返回 JSON 数据",
)
async def query_high_frequency_question(**kwargs) -> str:
"""
query_high_frequency_question 不需要任何传参
查询高频问题并返回答案
返回示例:[{"question_id": 1, "question_text": "恐龙怎么灭绝", "answer_text": "小行星撞击", ...}]
"""
try:
logger.info(f"调用查询高频问题的工具,参数为:{kwargs}")
return '{"status": "success", "data": [{"question_id": 1, "question_text": "恐龙是怎么灭绝的?", "answer_text": "可能是小行星撞击", "category": "历史", "frequency_score": 0.9}]}'
except Exception as e:
logger.error(f"查询高频问题出错: {str(e)}")
raise
# 定义工具 2查询天气
@mcp.tool(
name="get_weather",
description="查询天气",
)
async def get_weather(**kwargs) -> str:
"""
get_weather 不需要任何传参
查询天气并返回结果
返回示例:{"status": "success", "data": "北京的天气是多云"}
"""
try:
logger.info(f"调用查询天气的工具,参数为{kwargs}")
return '{"status": "success", "data": "北京的天气是多云"}'
except Exception as e:
logger.error(f"查询天气出错: {str(e)}")
raise
# 启动服务器
def start_server():
logger.info("=== MCP 服务器信息 ===")
logger.info(f"名称: {mcp.name}")
logger.info(f"描述: {mcp.description}")
port = 8010
app = create_fastapi_app(mcp)
logger.info(f"启动 MCP 服务器于 http://localhost:{port}")
uvicorn.run(app, host="0.0.0.0", port=port)
if __name__ == "__main__":
start_server()

69
demo_mcp/sse_agent.py Normal file
View File

@@ -0,0 +1,69 @@
import json
import asyncio
from langchain_openai import ChatOpenAI
from mcp import ClientSession
from mcp.client.sse import sse_client
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate
from conf import settings
# 创建模型
llm = ChatOpenAI(
base_url=settings.base_url,
api_key=settings.api_key,
model=settings.model_name,
temperature=0.1
)
# MCP server URL for SSE connection
server_url = "http://localhost:8001/sse"
# 定义mcp客户端
mcp_client = None
# Main async function: connect, load tools, create agent, run chat loop
async def run_agent():
global mcp_client
# 启动 MCP server通过 SSE 建立异步连接。
async with sse_client(url=server_url) as streams:
# 使用读写通道创建 MCP 会话
async with ClientSession(*streams) as session:
await session.initialize()
# 动态创建一个临时类 MCPClientHolder把 session 放进去。这样就可以在函数外部通过 mcp_client.session 调用 MCP 工具
mcp_client = type("MCPClientHolder", (), {"session": session})()
# 从 session 自动获取 MCP server 提供的工具列表。
tools = await load_mcp_tools(session)
# print(f"tools-->{tools}")
# 创建prompt模板
prompt_template = ChatPromptTemplate.from_messages([
("system", "你是一个乐于助人的助手,能够调用工具回答用户问题。"),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
# 构建工具调用代理
agent = create_tool_calling_agent(llm, tools, prompt_template)
# 创建代理执行器
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
# 代理调用
print("MCP客户端启动输入'quit'退出")
while True:
# 接收用户查询
query = input("\nQuery: ").strip()
if query.lower() == "quit":
break
# 发送用户查询给代理,并打印
try:
response = await agent_executor.ainvoke({"input": query})
print(f"response-->{response}")
except Exception:
print("解析有问题")
return
if __name__ == "__main__":
asyncio.run(run_agent())

34
demo_mcp/sse_client.py Normal file
View File

@@ -0,0 +1,34 @@
import asyncio
from mcp import ClientSession
from mcp.client.sse import sse_client
from langchain_mcp_adapters.tools import load_mcp_tools
# MCP server URL for SSE connection
server_url = "http://localhost:8001/sse"
# 定义mcp客户端
mcp_client = None
# 主要的异步函数run_agent
async def run():
global mcp_client
# 启动 MCP server通过 SSE 建立异步连接。
async with sse_client(url=server_url) as streams:
# 使用读写通道创建 MCP 会话
async with ClientSession(*streams) as session:
await session.initialize()
# 动态创建一个临时类 MCPClientHolder把 session 放进去。这样就可以在函数外部通过 mcp_client.session 调用 MCP 工具
mcp_client = type("MCPClientHolder", (), {"session": session})()
# 从 session 自动获取 MCP server 提供的工具列表。
tools = await load_mcp_tools(session)
# print(f"tools-->{tools}")
# 调用 MCP server 的 get_weather 工具
response=await session.call_tool("get_weather", arguments={})
print(f"response-->{response}")
# 启动运行agent
if __name__ == "__main__":
asyncio.run(run())

53
demo_mcp/sse_server.py Normal file
View File

@@ -0,0 +1,53 @@
from mcp.server.fastmcp import FastMCP
# 在创建FastMCP实例时指定host和port
mcp = FastMCP("sdg", log_level="ERROR", host="127.0.0.1", port=8001)
@mcp.tool(
name="query_high_frequency_question",
description="从知识库中检索常见问题解答FAQ,返回包含问题和答案的结构化JSON数据。",
)
async def query_high_frequency_question() -> str:
"""
高频问题查询
"""
try:
print("调用查询高频问题的tool成功")
return "高频问题是: 恐龙是怎么灭绝的?"
except Exception as e:
print(f"Unexpected error in question retrieval: {str(e)}")
raise
@mcp.tool(
name="get_weather",
description="查询天气"
)
async def get_weather() -> str:
"""
查询天气的tools
"""
try:
print("调用查询天气的tools")
return "北京的天气是多云"
except Exception as e:
print(f"Unexpected error in question retrieval: {str(e)}")
raise
def main():
print("正在启动MCP SSE服务器...")
print("SSE端点: http://localhost:8001/sse")
print("按 Ctrl+C 停止服务器")
try:
# 运行SSE服务器
mcp.run(transport="sse")
except KeyboardInterrupt:
print("\n服务器已停止")
except Exception as e:
print(f"服务器启动失败: {e}")
if __name__ == "__main__":
main()

78
demo_mcp/stdio_agent.py Normal file
View File

@@ -0,0 +1,78 @@
import os
import sys
sys.path.append(os.path.join(os.path.dirname(__file__), "../../.."))
import asyncio
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain_openai import ChatOpenAI
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
from conf import settings
# 创建模型
llm = ChatOpenAI(
base_url=settings.base_url,
api_key=settings.api_key,
model=settings.model_name,
temperature=0.1
)
# 配置mcp服务器脚本路径
server_script = r".\stdio_server.py"
# 配置mcp服务器启动参数
server_params = StdioServerParameters(
command="python" if server_script.endswith(".py") else "node",
args=[server_script],
)
# 定义mcp客户端
mcp_client = None
# 主要的异步函数run_agent
async def run_agent():
global mcp_client
# 启动 MCP server并通过标准输入输出建立异步连接。
async with stdio_client(server_params) as (read, write):
# 使用读写通道创建 MCP 会话。
async with ClientSession(read, write) as session:
# 初始化会话
await session.initialize()
# 动态创建一个临时类 MCPClientHolder把 session 放进去。这样就可以在函数外部通过 mcp_client.session 调用 MCP 工具
mcp_client = type("MCPClientHolder", (), {"session": session})()
# 从 session 自动获取 MCP server 提供的工具列表
tools = await load_mcp_tools(session)
# print(f"tools-->{tools}")
# 创建prompt模板
prompt_template = ChatPromptTemplate.from_messages([
("system", "你是一个乐于助人的助手,能够调用工具回答用户问题。"),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
# 构建工具调用代理
agent = create_tool_calling_agent(llm, tools, prompt_template)
# 创建代理执行器
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
# 代理调用
print("MCP客户端启动输入'quit'退出")
while True:
# 接收用户查询
query = input("\nQuery: ").strip()
if query.lower() == "quit":
break
# 发送用户查询给代理,并打印
try:
response = await agent_executor.ainvoke({"input": query})
print(f"response-->{response}")
except Exception:
print("解析有问题")
return
if __name__ == "__main__":
asyncio.run(run_agent())

57
demo_mcp/stdio_client.py Normal file
View File

@@ -0,0 +1,57 @@
import asyncio
from langchain_mcp_adapters.tools import load_mcp_tools
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# 配置mcp服务器脚本路径
server_script = r".\stdio_server.py"
# 配置mcp服务器启动参数
server_params = StdioServerParameters(
command="python" if server_script.endswith(".py") else "node",
args=[server_script],
)
# 定义mcp客户端
mcp_client = None
# 主要的异步函数run_agent
async def run():
global mcp_client
# 启动 MCP server并通过标准输入输出建立异步连接。
async with stdio_client(server_params) as (read, write):
# 使用读写通道创建 MCP 会话。
async with ClientSession(read, write) as session:
await session.initialize()
# 动态创建一个临时类 MCPClientHolder把 session 放进去。这样就可以在函数外部通过 mcp_client.session 调用 MCP 工具
mcp_client = type("MCPClientHolder", (), {"session": session})()
# 从 session 自动获取 MCP server 提供的工具列表。
tools = await load_mcp_tools(session)
print(f"tools-->{tools}")
# 调用 MCP server 的 get_weather 工具
response = await session.call_tool("get_weather", arguments={})
print(f"response-->{response}")
return
# 启动运行
if __name__ == "__main__":
asyncio.run(run())
# tools -->[StructuredTool(name='query_high_frequency_question',
# description='从知识库中检索常见问题解答FAQ,返回包含问题和答案的结构化JSON数据。',
# args_schema={'properties': {}, 'title': 'query_high_frequency_questionArguments',
# 'type': 'object'}, response_format='content_and_artifact',
# coroutine= < function convert_mcp_tool_to_langchain_tool. < locals >.call_tool
# at
# 0x000001DB7A188FE0 >), StructuredTool(name='get_weather', description='查询天气',
# args_schema={'properties': {}, 'title': 'get_weatherArguments',
# 'type': 'object'}, response_format='content_and_artifact',
# coroutine= < function
# convert_mcp_tool_to_langchain_tool. < locals >.call_tool
# at
# 0x000001DB7A1E3060 >)]
# response -->meta = None
# content = [TextContent(type='text', text='北京的天气是多云', annotations=None, meta=None)]
# structuredContent = {'result': '北京的天气是多云'}
# isError = False

38
demo_mcp/stdio_server.py Normal file
View File

@@ -0,0 +1,38 @@
from mcp.server.fastmcp import FastMCP
mcp = FastMCP("sdg", log_level="ERROR")
@mcp.tool(
name="query_high_frequency_question",
description="从知识库中检索常见问题解答FAQ,返回包含问题和答案的结构化JSON数据。",
)
async def query_high_frequency_question() -> str:
"""
高频问题查询
"""
try:
print("调用查询高频问题的tool成功")
return "高频问题是: 恐龙是怎么灭绝的?"
except Exception as e:
print(f"Unexpected error in question retrieval: {str(e)}")
raise
@mcp.tool(
name="get_weather",
description="查询天气"
)
async def get_weather() -> str:
"""
查询天气的tools
"""
try:
print("调用查询天气的tools")
return "北京的天气是多云"
except Exception as e:
print(f"Unexpected error in question retrieval: {str(e)}")
raise
if __name__ == "__main__":
mcp.run(transport="stdio")

View File

@@ -0,0 +1,84 @@
import json
import logging
import asyncio
from langchain_openai import ChatOpenAI
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
from langchain_mcp_adapters.tools import load_mcp_tools
from langchain.agents import create_tool_calling_agent, AgentExecutor
from langchain_core.prompts import ChatPromptTemplate
from conf import settings
# 创建模型
llm = ChatOpenAI(
base_url=settings.base_url,
api_key=settings.api_key,
model=settings.model_name,
temperature=0.1
)
# MCP 服务器的 Streamable-HTTP 连接地址
server_url = "http://127.0.0.1:8002/mcp"
# 配置日志
logging.basicConfig(
level=logging.DEBUG, # 提高日志级别以捕获更多信息
format='[客户端] %(asctime)s - %(levelname)s - %(message)s'
)
# 定义mcp客户端
mcp_client = None
async def run_agent():
global mcp_client
logging.info(f"准备连接到 Streamable-HTTP 服务器: {server_url}")
# 启动 MCP server通过streamable建立连接
async with streamablehttp_client(server_url) as (read, write, _):
logging.info("连接已成功建立!")
# 使用读写通道创建 MCP 会话
async with ClientSession(read, write) as session:
try:
await session.initialize()
logging.info("会话初始化成功,可以开始加载工具。")
# 动态创建一个临时类 MCPClientHolder把 session 放进去。这样就可以在函数外部通过 mcp_client.session 调用 MCP 工具
mcp_client = type("MCPClientHolder", (), {"session": session})()
# 从 session 自动获取 MCP server 提供的工具列表。
tools = await load_mcp_tools(session)
# print(f"tools-->{tools}")
# 创建 agent 的提示模板
prompt = ChatPromptTemplate.from_messages([
("system", "你是一个乐于助人的助手,能够调用工具回答用户问题。"),
("human", "{input}"),
("placeholder", "{agent_scratchpad}"),
])
# 构建工具调用代理
agent = create_tool_calling_agent(llm, tools, prompt)
# 创建代理执行器
agent_executor = AgentExecutor(agent=agent, tools=tools, verbose=True)
# 代理调用
print("MCP客户端启动输入'quit'退出")
while True:
query = input("\nQuery: ").strip()
if query.lower() == "quit":
break
# 发送用户查询到 agent 并打印格式化响应
logging.info(f"处理用户查询: {query}")
try:
response = await agent_executor.ainvoke({"input": query})
print(f"response-->{response}")
except Exception:
print("解析有问题")
except Exception as e:
logging.error(f"会话初始化或工具调用时发生错误: {e}", exc_info=True)
raise
if __name__ == "__main__":
try:
asyncio.run(run_agent())
except Exception as e:
logging.error(f"客户端运行失败: {e}", exc_info=True)

View File

@@ -0,0 +1,63 @@
import asyncio
import logging
from langchain_mcp_adapters.tools import load_mcp_tools
from mcp import ClientSession
from mcp.client.streamable_http import streamablehttp_client
# 定义服务器地址
server_url = "http://127.0.0.1:8002/mcp"
# 定义mcp客户端
mcp_client = None
# 配置日志
logging.basicConfig(
level=logging.DEBUG, # 提高日志级别以捕获更多信息
format='[客户端] %(asctime)s - %(levelname)s - %(message)s'
)
async def main():
global mcp_client
logging.info(f"准备连接到 Streamable-HTTP 服务器: {server_url}")
try:
# 启动 MCP server通过streamable建立连接
async with streamablehttp_client(server_url) as (read, write, _):
logging.info("连接已成功建立!")
# 使用读写通道创建 MCP 会话
async with ClientSession(read, write) as session:
try:
await session.initialize()
logging.info("会话初始化成功,可以开始调用工具。")
# 动态创建一个临时类 MCPClientHolder把 session 放进去。这样就可以在函数外部通过 mcp_client.session 调用 MCP 工具
mcp_client = type("MCPClientHolder", (), {"session": session})()
# 从 session 自动获取 MCP server 提供的工具列表。
tools = await load_mcp_tools(session)
# print(f"tools-->{tools}")
# 调用远程工具
logging.info("--> 正在调用工具: query_high_frequency_question")
response = await session.call_tool("query_high_frequency_question", {})
print(f"response-->{response}")
logging.info(f"<-- 收到响应: {response}")
print("-" * 30)
logging.info("--> 正在调用工具: get_weather")
response = await session.call_tool("get_weather", {})
print(f"response-->{response}")
logging.info(f"<-- 收到响应: {response}")
except Exception as e:
logging.error(f"调用工具时发生错误: {e}", exc_info=True)
raise
except Exception as e:
logging.error(f"连接或会话初始化时发生错误: {e}", exc_info=True)
logging.error("请确认服务端脚本已启动并运行在 http://127.0.0.1:8002/mcp")
raise
if __name__ == "__main__":
try:
asyncio.run(main())
except Exception as e:
logging.error(f"客户端运行失败: {e}", exc_info=True)

View File

@@ -0,0 +1,53 @@
from mcp.server.fastmcp import FastMCP
# 创建 MCP 实例,指定服务名称、日志级别、主机和端口
mcp = FastMCP("sdg", log_level="ERROR", host="127.0.0.1", port=8002)
@mcp.tool(
name="query_high_frequency_question",
description="从知识库中检索常见问题解答FAQ,返回包含问题和答案的结构化JSON数据。",
)
async def query_high_frequency_question() -> str:
"""
高频问题查询
"""
try:
print("调用查询高频问题的tool成功")
return "高频问题是: 恐龙是怎么灭绝的?"
except Exception as e:
print(f"Unexpected error in question retrieval: {str(e)}")
raise
@mcp.tool(
name="get_weather",
description="查询天气"
)
async def get_weather() -> str:
"""
查询天气的tools
"""
try:
print("调用查询天气的tools")
return "北京的天气是多云"
except Exception as e:
print(f"Unexpected error in question retrieval: {str(e)}")
raise
def main():
"""
启动 Streamable HTTP 服务器。
"""
print("正在启动MCP Streamable服务器...")
print("服务器将在 http://localhost:8002 上运行")
print("按 Ctrl+C 停止服务器")
try:
mcp.run(transport="streamable-http") # 使用 streamable-http 传输方式
except KeyboardInterrupt:
print("\n服务器已停止")
except Exception as e:
print(f"服务器启动失败: {e}")
if __name__ == "__main__":
main()