文章目录
- 一、 引言
- 二、 系统设计与技术细节
- 2.1 系统架构
- 2.2 核心组件说明
- 三、 Demo 代码
- 推荐博客:
- 四、输出
- 年度营销报告
- 1. 总销售额 根据提供的数据,年度总销售额为:740.0。
- 2. 各产品销售额
- 3. 各地区销售额
- 4. 各产品在各地区的销售情况
- 分析与建议
- 五、 总结
一、 引言
在当前数字化转型和大数据时代,企业对实时数据分析和自动化决策的需求不断增加分享一个智能体 Demo,该 Demo 通过调用硅基流动 DeepSeek API 实现了自动生成 SQL 查询、执行数据库统计,并最终利用大模型生成详细营销报告的完整流程。
此外,硅基流动提供免费的模型服务,用户只需生成相应的 API Key,即可体验低成本高效能的 AI Infra 平台。
二、 系统设计与技术细节
本智能体 Demo 的目标是:
- 根据自然语言描述生成个性化 SQL 查询;
- 执行 SQL 查询获取数据库统计数据;
- 利用大模型根据查询结果生成详细的年度营销报告。
2.1 系统架构
系统整体由三个主要模块构成:
- 文本解析与 SQL 生成模块:调用硅基流动 DeepSeek API,根据自然语言生成符合业务逻辑的 SQL 查询。
- 数据库交互模块:通过生成的 SQL 语句与 MySQL 数据库交互,获取统计数据。
- 报告生成模块:将查询结果传入大模型,再次生成完整的营销报告。
2.2 核心组件说明
- 硅基流动 DeepSeek API
DeepSeek 是硅基流动公司推出的大模型产品,支持文本生成和任务执行。此 Demo 使用的是 “deepseek-ai/DeepSeek-V3”,通过调用 API 并传入自然语言提示,获得 SQL 语句或报告文本。
官网地址:硅基流动 - LangChain 工具包
利用 LangChain 提供的 SQLDatabaseToolkit、create_sql_agent 等组件,实现数据库交互与自动化任务管理。 - 日志调试与正则表达式清洗
为确保生成的 SQL 语句正确,我们在代码中详细记录了日志,并使用正则表达式剔除模型返回中可能存在的代码块格式标记(例如sql 和
)。
三、 Demo 代码
话不多说,注解很详细了,本代码以思路为主,不涉及开发。
sql建库(test)表:
SET NAMES utf8mb4;
SET FOREIGN_KEY_CHECKS = 0;
-- ----------------------------
-- Table structure for products
-- ----------------------------
DROP TABLE IF EXISTS `products`;
CREATE TABLE `products` (
`product_id` int NOT NULL,
`product_name` text CHARACTER SET utf8mb3 COLLATE utf8mb3_bin NULL,
PRIMARY KEY (`product_id`) USING BTREE
) ENGINE = InnoDB CHARACTER SET = utf8mb3 COLLATE = utf8mb3_bin ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of products
-- ----------------------------
INSERT INTO `products` VALUES (1, 'Product A');
INSERT INTO `products` VALUES (2, 'Product B');
INSERT INTO `products` VALUES (3, 'Product C');
-- ----------------------------
-- Table structure for sales_records
-- ----------------------------
DROP TABLE IF EXISTS `sales_records`;
CREATE TABLE `sales_records` (
`sale_id` int NOT NULL,
`product_id` int NULL DEFAULT NULL,
`sale_date` date NULL DEFAULT NULL,
`customer_region` text CHARACTER SET utf8mb3 COLLATE utf8mb3_bin NULL,
`sales_amount` double NULL DEFAULT NULL,
PRIMARY KEY (`sale_id`) USING BTREE,
INDEX `product_id`(`product_id`) USING BTREE,
CONSTRAINT `sales_records_ibfk_1` FOREIGN KEY (`product_id`) REFERENCES `products` (`product_id`) ON DELETE RESTRICT ON UPDATE RESTRICT
) ENGINE = InnoDB CHARACTER SET = utf8mb3 COLLATE = utf8mb3_bin ROW_FORMAT = Dynamic;
-- ----------------------------
-- Records of sales_records
-- ----------------------------
INSERT INTO `sales_records` VALUES (1, 1, '2024-01-15', 'North', 100);
INSERT INTO `sales_records` VALUES (2, 1, '2024-02-20', 'South', 150);
INSERT INTO `sales_records` VALUES (3, 2, '2024-03-25', 'East', 200);
INSERT INTO `sales_records` VALUES (4, 2, '2024-04-10', 'West', 120);
INSERT INTO `sales_records` VALUES (5, 3, '2024-05-05', 'North', 90);
INSERT INTO `sales_records` VALUES (6, 3, '2024-06-18', 'South', 180);
SET FOREIGN_KEY_CHECKS = 1;
import logging
import requests
import re
import time
# 更新后的导入路径
from langchain_community.agent_toolkits.sql.base import create_sql_agent
from langchain_community.agent_toolkits.sql.toolkit import SQLDatabaseToolkit
from langchain_community.utilities import SQLDatabase
from langchain.llms import BaseLLM
from langchain.callbacks.manager import CallbackManagerForLLMRun
from langchain.schema import LLMResult, Generation
# 配置日志输出(DEBUG 级别)
logging.basicConfig(level=logging.DEBUG, format='%(asctime)s - %(levelname)s - %(message)s')
def clean_sql(raw_text: str) -> str:
"""
清洗返回的 SQL 文本,去除代码块标记(例如 ```sql 开头和 ```结尾)。
"""
# 去除开头的代码块标记(例如 ```sql 或 ```)
cleaned = re.sub(r"^```(?:sql)?\s*", "", raw_text, flags=re.IGNORECASE)
# 去除结尾的代码块标记
cleaned = re.sub(r"\s*```$", "", cleaned, flags=re.IGNORECASE)
return cleaned.strip()
class SiliconeFlowDeepSeek(BaseLLM):
api_url: str
api_key: str
def _call(self, prompt: str, run_manager: CallbackManagerForLLMRun | None = None) -> str:
logging.debug("调用硅基流动 DeepSeek API,输入提示:%s", prompt)
headers = {
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json"
}
# 构造请求 payload,按正确格式传递参数
data = {
"model": "deepseek-ai/DeepSeek-V3",
"messages": [
{"role": "user", "content": prompt}
],
"stream": False,
"max_tokens": 512,
"stop": ["null"],
"temperature": 0.7,
"top_p": 0.7,
"top_k": 50,
"frequency_penalty": 0.5,
"n": 1,
"response_format": {"type": "text"},
"tools": [
{
"type": "function",
"function": {
"description": "<string>",
"name": "<string>",
"parameters": {},
"strict": False
}
}
]
}
try:
response = requests.post(self.api_url, headers=headers, json=data)
if response.status_code == 200:
result = response.json()
logging.debug("DeepSeek API 返回原始结果:%s", result)
# 提取返回的文本内容
return result["choices"][0]["message"]["content"]
else:
logging.error("DeepSeek API 请求失败,状态码:%s,响应内容:%s",
response.status_code, response.text)
return "请求失败"
except Exception as e:
logging.exception("调用 DeepSeek API 时发生异常:")
return "请求失败"
def _generate(self, prompts, stop=None, run_manager: CallbackManagerForLLMRun | None = None) -> LLMResult:
all_generations = []
for prompt in prompts:
text = self._call(prompt, run_manager)
all_generations.append([Generation(text=text)])
return LLMResult(generations=all_generations)
@property
def _llm_type(self) -> str:
return "silicone_flow_deepseek"
if __name__ == '__main__':
# 1. 初始化自定义大模型
api_url = "https://api.siliconflow.cn/v1/chat/completions"
api_key = "使用你的秘钥" # 使用你的秘钥
logging.debug("初始化硅基流动 DeepSeek 模型实例。")
llm = SiliconeFlowDeepSeek(api_url=api_url, api_key=api_key)
# 2. 连接 MySQL 数据库
db_uri = "mysql+mysqlconnector://账号:密码@localhost:3306/test"#账号密码
logging.debug("尝试连接 MySQL 数据库,URI:%s", db_uri)
try:
db = SQLDatabase.from_uri(db_uri)
logging.debug("数据库连接成功,且已自动完成表结构自检。")
except Exception as e:
logging.exception("连接数据库失败:")
raise e
# 3. 初始化 SQL 工具包
logging.debug("初始化 SQLDatabaseToolkit,整合数据库与大模型。")
toolkit = SQLDatabaseToolkit(db=db, llm=llm)
# 4. 创建 SQL Agent 执行器
logging.debug("创建 SQL Agent 执行器,使用 zero-shot-react-description 模型。")
agent_executor = create_sql_agent(
llm=llm,
toolkit=toolkit,
verbose=True,
agent_type="zero-shot-react-description"
)
# 5. 生成并校验 SQL 语句(最多重试 3 次)
nl_query = "统计每个产品在过去一年的总销售金额和按客户地区统计的销售金额分布"
logging.debug("自然语言查询:%s", nl_query)
# 附加数据库表结构信息
schema_info = (
"Table: products (product_id int primary key, product_name text)\n"
"Table: sales_records (sale_id int primary key, product_id int, sale_date date, customer_region text, sales_amount double)"
)
max_attempts = 3
attempts = 0
valid_sql = None
raw_sql_text = ""
# 用于正则提取 SQL 语句,匹配以 SELECT 或 WITH 开头的部分(不区分大小写)
sql_pattern = re.compile(r"(?i)^(SELECT\s+.*?;?)\s*$", re.DOTALL)
while attempts < max_attempts:
attempts += 1
logging.debug("第 %d 次生成 SQL 语句", attempts)
sql_prompt = (
f"请仅生成针对下面查询的 MySQL SQL 语句,不要输出其他内容。\n"
f"查询:{nl_query}\n"
f"数据库表结构:\n{schema_info}"
)
logging.debug("生成 SQL 语句提示:%s", sql_prompt)
try:
sql_llm_result = llm.invoke(sql_prompt)
if isinstance(sql_llm_result, str):
raw_sql_text = sql_llm_result.strip()
else:
raw_sql_text = sql_llm_result.generations[0][0].text.strip()
logging.debug("模型原始返回的 SQL 内容:%s", raw_sql_text)
except Exception as e:
logging.exception("生成 SQL 语句时发生异常:")
raise e
# 清洗返回的 SQL 文本,去除代码块标记
raw_sql_text = clean_sql(raw_sql_text)
logging.debug("清洗后的 SQL 内容:%s", raw_sql_text)
# 尝试用正则表达式提取合理的 SQL 语句
match = sql_pattern.search(raw_sql_text)
if match:
sql_query = match.group(1).strip()
logging.debug("正则表达式提取后的 SQL 语句:%s", sql_query)
else:
sql_query = raw_sql_text
logging.debug("未匹配到合理 SQL,直接使用清洗后的返回:%s", sql_query)
# 校验 SQL 语句
checker_prompt = (
f"请检查下面的 SQL 语句是否正确:\n{sql_query}\n"
"只返回“正确”或者具体的错误信息。"
)
logging.debug("SQL 查询校验提示:%s", checker_prompt)
try:
checker_llm_result = llm.invoke(checker_prompt)
if isinstance(checker_llm_result, str):
checker_output = checker_llm_result.strip()
else:
checker_output = checker_llm_result.generations[0][0].text.strip()
logging.debug("模型原始返回的校验信息:%s", checker_output)
except Exception as e:
logging.exception("校验 SQL 语句时发生异常:")
raise e
if checker_output == "正确":
valid_sql = sql_query
logging.debug("SQL 查询校验通过。")
break
else:
logging.error("SQL 查询校验失败:%s", checker_output)
time.sleep(1)
if valid_sql is None:
logging.error("经过多次重试后,未能生成有效 SQL。原始返回为:%s", raw_sql_text)
raise Exception("经过多次重试后,仍未生成有效的 SQL 语句。")
# 6. 执行 SQL 查询
try:
logging.debug("执行 SQL 查询:%s", valid_sql)
query_result = db.run(valid_sql)
logging.debug("SQL 查询执行结果:%s", query_result)
except Exception as e:
logging.exception("执行 SQL 查询时发生异常:")
raise e
# 7. 利用大模型生成年度营销报告
report_prompt = f"根据以下销售数据生成一份详细的年度营销报告:\n{query_result}"
logging.debug("生成报表提示:%s", report_prompt)
try:
report_llm_result = llm.invoke(report_prompt)
if isinstance(report_llm_result, str):
report_content = report_llm_result.strip()
else:
report_content = report_llm_result.generations[0][0].text.strip()
logging.debug("生成的年度营销报告内容:%s", report_content)
except Exception as e:
logging.exception("生成报表时发生异常:")
raise e
# 8. 输出生成的 SQL 语句、原始模型返回和年度营销报告
print("==== 生成的 SQL 查询 ====")
print(valid_sql)
print("\n==== 模型返回的原始 SQL 内容 ====")
print(raw_sql_text)
print("\n==== 年度营销报告 ====")
print(report_content)
logging.debug("整个流程执行完毕。")
此外也可以多加提示词等等,阅读精选文章,会更有启发!
推荐博客:
主页:小胡说技书
精选文章:小胡说技书博客分类(部分目录):服务治理、数据治理与安全治理对比表格
四、输出
有调试信息,这里太多了,去掉了。
==== 生成的 SQL 查询 ==== SELECT
p.product_id,
p.product_name,
SUM(sr.sales_amount) AS total_sales_amount,
sr.customer_region,
SUM(sr.sales_amount) OVER (PARTITION BY sr.customer_region) AS region_sales_amount FROM
products p JOIN
sales_records sr ON p.product_id = sr.product_id WHERE
sr.sale_date >= DATE_SUB(CURDATE(), INTERVAL 1 YEAR) GROUP BY
p.product_id, sr.customer_region;==== 模型返回的原始 SQL 内容 ==== SELECT
p.product_id,
p.product_name,
SUM(sr.sales_amount) AS total_sales_amount,
sr.customer_region,
SUM(sr.sales_amount) OVER (PARTITION BY sr.customer_region) AS region_sales_amount FROM
products p JOIN
sales_records sr ON p.product_id = sr.product_id WHERE
sr.sale_date >= DATE_SUB(CURDATE(), INTERVAL 1 YEAR) GROUP BY
p.product_id, sr.customer_region;==== 年度营销报告 ==== 为了生成一份详细的年度营销报告,我们需要对销售数据进行分析,包括总销售额、各产品销售额、各地区销售额等。以下是基于提供的数据生成的年度营销报告:
年度营销报告
1. 总销售额 根据提供的数据,年度总销售额为:740.0。
2. 各产品销售额
- Product A: 150.0
- Product B: 320.0 (200.0 + 120.0)
- Product C: 270.0 (90.0 + 180.0)
3. 各地区销售额
- East: 200.0
- North: 90.0
- South: 330.0 (150.0 + 180.0)
- West: 120.0
4. 各产品在各地区的销售情况
- Product A:
- South: 150.0
- Product B:
- East: 200.0
- West: 120.0
- Product C:
- North: 90.0
- South: 180.0
分析与建议
- 产品表现:
- Product B的销售额最高,达到320.0,显示出较强的市场竞争力。
- Product C和Product A的销售额分别为270.0和150.0,需要进一步分析其市场定位和推广策略。
- 地区表现:
- South地区的销售额最高,达到330.0,表明该地区市场需求旺盛。
- East和West地区的销售额分别为200.0和120.0,需要加强市场推广活动。
- North地区的销售额最低,仅为90.0,建议进行市场调研以了解原因并制定相应的营销策略。
3.建议:
- 产品推广:
- Product B的成功经验可以复制到其他产品上。
- Product A和Product C需要加强品牌宣传和市场推广。
- 地区策略:
- South地区继续保持并优化销售策略。
- East和West地区增加市场推广力度。
- North地区进行深入的市场调研,制定针对性的营销计划。
这份报告基于提供的数据进行了详细的分析和建议,希望能为您的年度营销策略提供有价值的参考。
五、 总结
本文以一个智能体 Demo 为例,展示了如何利用硅基流动 DeepSeek API 生成 SQL 查询、执行数据库统计,并进一步自动生成详细的营销报告。硅基流动提供的免费模型服务为企业和开发者降低了 AI 应用门槛,使得智能体系统在各行业落地成为可能。
希望这篇博客文章能为您在智能体及大模型应用方面提供新的思路,并激发更多开发者探索 AI 技术的无限可能!
封面图: