45 lines
2.0 KiB
Python
45 lines
2.0 KiB
Python
import mysql.connector
|
||
import json
|
||
from datetime import date, datetime, timedelta
|
||
from decimal import Decimal
|
||
from create_logger import logger
|
||
from utils.format import DateEncoder, default_encoder
|
||
from conf import settings
|
||
|
||
|
||
class SqlService:
|
||
def __init__(self, service_name="数据库查询"):
|
||
# 连接数据库
|
||
self.conn = mysql.connector.connect(
|
||
host=settings.mysql_host,
|
||
port=settings.mysql_port,
|
||
user=settings.mysql_user,
|
||
password=settings.mysql_password,
|
||
database=settings.mysql_database
|
||
)
|
||
self.service_name = service_name
|
||
|
||
# 定义执行SQL查询方法,输入SQL字符串,返回JSON字符串
|
||
def execute_query(self, sql: str, no_data_message="未找数据") -> str:
|
||
try:
|
||
cursor = self.conn.cursor(dictionary=True)
|
||
cursor.execute(sql)
|
||
results = cursor.fetchall()
|
||
cursor.close()
|
||
# 格式化结果
|
||
for result in results: # 遍历每个结果字典
|
||
for key, value in result.items():
|
||
if isinstance(value, (date, datetime, timedelta, Decimal)): # 检查值是否为特殊类型
|
||
result[key] = default_encoder(value) # 使用自定义编码器格式化该值
|
||
# 序列化为JSON,如果有结果返回success,否则no_data;使用DateEncoder,非ASCII不转义
|
||
return json.dumps({"status": "success", "data": results} if results else {"status": "no_data", "message": no_data_message}, cls=DateEncoder, ensure_ascii=False)
|
||
except Exception as e:
|
||
logger.error(f"{self.service_name}错误: {str(e)}")
|
||
# 返回错误JSON响应
|
||
return json.dumps({"status": "error", "message": str(e)}, ensure_ascii=False)
|
||
|
||
|
||
if __name__ == "__main__":
|
||
service = SqlService()
|
||
sql = "SELECT * FROM flight_tickets WHERE departure_city = '上海' AND arrival_city = '北京' AND DATE(departure_time) = '2025-10-28' AND cabin_type = '公务舱'"
|
||
print(service.execute_query(sql)) |