"""
CardioAI 模块1: 交互式仪表盘
心血管疾病数据可视化系统 - 美化版
"""
import streamlit as st
import pandas as pd
import numpy as np
import plotly.express as px
import plotly.graph_objects as go
from pathlib import Path
# ==================== 页面配置 ====================
st.set_page_config(
page_title="CardioAI 心血管疾病分析",
page_icon="❤️",
layout="wide",
initial_sidebar_state="expanded"
)
# ==================== 自定义CSS样式 ====================
st.markdown("""
""", unsafe_allow_html=True)
# ==================== 常量定义 ====================
CODE_ROOT = Path(r"F:\My_Git_Project\CardioAI")
DATA_PATH = CODE_ROOT / "data" / "心血管疾病.xlsx"
# 配色方案
COLORS = {
'primary': ['#667eea', '#764ba2', '#f093fb', '#f5576c'],
'safe': '#2ecc71',
'risk': '#e74c3c',
'gradient': ['#667eea', '#764ba2'],
'bmi': ['#3498db', '#2ecc71', '#f39c12', '#e74c3c']
}
# ==================== 数据加载函数 ====================
@st.cache_data(show_spinner=False)
def load_data(file_path: Path) -> pd.DataFrame:
"""加载数据,支持Excel格式"""
try:
df = pd.read_excel(file_path, engine='openpyxl')
return df
except Exception as e:
st.error(f"数据加载失败: {e}")
return pd.DataFrame()
@st.cache_data(show_spinner=False)
def clean_and_engineer_features(df: pd.DataFrame) -> pd.DataFrame:
"""数据清洗和特征工程"""
df = df.copy()
# 1. 特征工程
df['age_years'] = (df['age'] / 365).round().astype(int)
df['bmi'] = df['weight'] / ((df['height'] / 100) ** 2)
# 2. 异常值处理
df = df[df['ap_lo'] < df['ap_hi']]
df = df[(df['ap_hi'] >= 90) & (df['ap_hi'] <= 250)]
df = df[(df['ap_lo'] >= 60) & (df['ap_lo'] <= 150)]
# 3. 类别转换
cholesterol_map = {1: '正常', 2: '偏高', 3: '非常高'}
gluc_map = {1: '正常', 2: '偏高', 3: '非常高'}
df['cholesterol_cat'] = df['cholesterol'].map(cholesterol_map)
df['gluc_cat'] = df['gluc'].map(gluc_map)
# 4. BMI分类
def categorize_bmi(bmi):
if bmi < 18.5:
return '体重过低'
elif bmi < 25:
return '体重正常'
elif bmi < 30:
return '超重'
else:
return '肥胖'
df['bmi_category'] = df['bmi'].apply(categorize_bmi)
return df
# ==================== UI组件 ====================
def render_header():
"""渲染页面头部"""
st.markdown('
❤️ CardioAI
', unsafe_allow_html=True)
st.markdown('心血管疾病智能分析系统 | 数据驱动的健康洞察
', unsafe_allow_html=True)
st.markdown("
", unsafe_allow_html=True)
def create_sidebar(df: pd.DataFrame) -> dict:
"""创建美观的侧边栏"""
with st.sidebar:
st.markdown("### 🎛️ 数据筛选器")
st.markdown("---")
# 年龄范围
with st.container():
st.markdown("**📅 年龄范围**")
age_range = st.slider(
"",
min_value=int(df['age_years'].min()),
max_value=int(df['age_years'].max()),
value=(int(df['age_years'].min()), int(df['age_years'].max())),
key="age_slider"
)
st.markdown("
", unsafe_allow_html=True)
# 性别选择
with st.container():
st.markdown("**👤 性别**")
gender_options = st.multiselect(
"",
options=[1, 2],
default=[1, 2],
format_func=lambda x: "👩 女性" if x == 1 else "👨 男性",
key="gender_select"
)
st.markdown("
", unsafe_allow_html=True)
# 心血管疾病状态
with st.container():
st.markdown("**🏥 心血管健康状态**")
cardio_options = st.multiselect(
"",
options=[0, 1],
default=[0, 1],
format_func=lambda x: "✅ 健康" if x == 0 else "⚠️ 有风险",
key="cardio_select"
)
st.markdown("---")
# 数据统计
st.markdown("### 📊 数据概览")
st.metric("总记录数", f"{len(df):,}")
st.metric("平均BMI", f"{df['bmi'].mean():.1f}")
st.metric("平均年龄", f"{df['age_years'].mean():.1f} 岁")
return {
'age_range': age_range,
'gender': gender_options,
'cardio': cardio_options
}
def apply_filters(df: pd.DataFrame, filters: dict) -> pd.DataFrame:
"""应用筛选条件"""
return df[
(df['age_years'] >= filters['age_range'][0]) &
(df['age_years'] <= filters['age_range'][1]) &
(df['gender'].isin(filters['gender'])) &
(df['cardio'].isin(filters['cardio']))
]
def render_metrics(filtered_df: pd.DataFrame, total_count: int):
"""渲染指标卡片"""
st.markdown('📊 关键指标
', unsafe_allow_html=True)
col1, col2, col3, col4 = st.columns(4)
with col1:
st.markdown(f"""
📋 筛选记录数
{len(filtered_df):,}
占比 {(len(filtered_df)/total_count*100):.1f}%
""", unsafe_allow_html=True)
with col2:
risk_rate = (filtered_df['cardio'].sum() / len(filtered_df) * 100) if len(filtered_df) > 0 else 0
st.markdown(f"""
⚠️ 风险率
{risk_rate:.1f}%
心血管疾病患者占比
""", unsafe_allow_html=True)
with col3:
avg_age = filtered_df['age_years'].mean() if len(filtered_df) > 0 else 0
st.markdown(f"""
""", unsafe_allow_html=True)
with col4:
avg_bmi = filtered_df['bmi'].mean() if len(filtered_df) > 0 else 0
st.markdown(f"""
⚖️ 平均BMI
{avg_bmi:.1f}
{get_bmi_status(avg_bmi)}
""", unsafe_allow_html=True)
def get_bmi_status(bmi: float) -> str:
"""获取BMI状态"""
if bmi < 18.5:
return "体重过低"
elif bmi < 25:
return "体重正常"
elif bmi < 30:
return "超重"
return "肥胖"
def plot_age_distribution(df: pd.DataFrame):
"""年龄分布图 - 美化版"""
fig = px.histogram(
df,
x='age_years',
color='cardio',
nbins=30,
title="年龄分布趋势",
labels={'age_years': '年龄', 'count': '人数'},
color_discrete_map={0: '#2ecc71', 1: '#e74c3c'},
barmode='overlay',
opacity=0.8
)
fig.update_layout(
template='plotly_white',
title_font_size=18,
title_x=0.5,
legend_title_text="",
legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1
),
plot_bgcolor='rgba(0,0,0,0)',
paper_bgcolor='rgba(0,0,0,0)',
font=dict(family="Arial, sans-serif", size=12),
margin=dict(t=60, b=40, l=40, r=40)
)
fig.for_each_trace(lambda t: t.update(
name="✅ 健康" if t.name == "0" else "⚠️ 有风险",
marker_line_width=1,
marker_line_color='white'
))
return fig
def plot_bmi_distribution(df: pd.DataFrame):
"""BMI分布饼图"""
bmi_counts = df['bmi_category'].value_counts().reindex(['体重过低', '体重正常', '超重', '肥胖'])
fig = go.Figure(data=[go.Pie(
labels=bmi_counts.index,
values=bmi_counts.values,
hole=0.5,
marker=dict(
colors=COLORS['bmi'],
line=dict(color='white', width=2)
),
textinfo='label+percent',
textposition='outside',
textfont=dict(size=12)
)])
fig.update_layout(
title=dict(text="BMI分布", font=dict(size=18), x=0.5),
template='plotly_white',
showlegend=False,
plot_bgcolor='rgba(0,0,0,0)',
paper_bgcolor='rgba(0,0,0,0)',
margin=dict(t=60, b=40, l=40, r=40),
annotations=[dict(text='BMI', x=0.5, y=0.5, font_size=20, showarrow=False)]
)
return fig
def plot_bmi_cardio_relation(df: pd.DataFrame):
"""BMI与心血管疾病关系 - 堆叠柱状图"""
bmi_cardio = df.groupby(['bmi_category', 'cardio']).size().unstack(fill_value=0)
bmi_order = ['体重过低', '体重正常', '超重', '肥胖']
bmi_cardio = bmi_cardio.reindex(bmi_order)
fig = go.Figure()
fig.add_trace(go.Bar(
name='✅ 健康',
x=bmi_cardio.index,
y=bmi_cardio[0],
marker_color='#2ecc71',
marker_line=dict(color='white', width=1)
))
fig.add_trace(go.Bar(
name='⚠️ 有风险',
x=bmi_cardio.index,
y=bmi_cardio[1],
marker_color='#e74c3c',
marker_line=dict(color='white', width=1)
))
fig.update_layout(
title=dict(text="BMI与心血管疾病关联分析", font=dict(size=18), x=0.5),
xaxis_title="BMI类别",
yaxis_title="人数",
barmode='stack',
template='plotly_white',
legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1
),
plot_bgcolor='rgba(0,0,0,0)',
paper_bgcolor='rgba(0,0,0,0)',
margin=dict(t=60, b=40, l=40, r=40)
)
return fig
def plot_blood_pressure_scatter(df: pd.DataFrame):
"""血压散点图"""
sample_df = df.sample(min(2000, len(df))) # 采样避免过多点
fig = px.scatter(
sample_df,
x='ap_lo',
y='ap_hi',
color='cardio',
color_discrete_map={0: '#2ecc71', 1: '#e74c3c'},
opacity=0.6,
title="血压分布散点图",
labels={'ap_lo': '舒张压 (mmHg)', 'ap_hi': '收缩压 (mmHg)'}
)
fig.update_layout(
template='plotly_white',
title_font_size=18,
title_x=0.5,
legend_title_text="",
legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1
),
plot_bgcolor='rgba(0,0,0,0)',
paper_bgcolor='rgba(0,0,0,0)',
margin=dict(t=60, b=40, l=40, r=40)
)
fig.for_each_trace(lambda t: t.update(
name="✅ 健康" if t.name == "0" else "⚠️ 有风险"
))
return fig
def plot_cholesterol_analysis(df: pd.DataFrame):
"""胆固醇与心血管疾病关系"""
chol_cardio = df.groupby(['cholesterol_cat', 'cardio']).size().unstack(fill_value=0)
chol_order = ['正常', '偏高', '非常高']
chol_cardio = chol_cardio.reindex(chol_order)
fig = go.Figure()
fig.add_trace(go.Bar(
name='✅ 健康',
x=chol_cardio.index,
y=chol_cardio[0] if 0 in chol_cardio.columns else [0, 0, 0],
marker_color='#2ecc71'
))
fig.add_trace(go.Bar(
name='⚠️ 有风险',
x=chol_cardio.index,
y=chol_cardio[1] if 1 in chol_cardio.columns else [0, 0, 0],
marker_color='#e74c3c'
))
fig.update_layout(
title=dict(text="胆固醇水平与心血管疾病", font=dict(size=18), x=0.5),
xaxis_title="胆固醇水平",
yaxis_title="人数",
barmode='group',
template='plotly_white',
legend=dict(
orientation="h",
yanchor="bottom",
y=1.02,
xanchor="right",
x=1
),
plot_bgcolor='rgba(0,0,0,0)',
paper_bgcolor='rgba(0,0,0,0)',
margin=dict(t=60, b=40, l=40, r=40)
)
return fig
def render_data_table(df: pd.DataFrame):
"""渲染数据表格"""
st.markdown('📋 数据明细
', unsafe_allow_html=True)
display_cols = ['id', 'age_years', 'gender', 'height', 'weight', 'bmi', 'bmi_category',
'ap_hi', 'ap_lo', 'cholesterol_cat', 'gluc_cat', 'cardio']
display_df = df[display_cols].copy()
display_df['gender'] = display_df['gender'].map({1: '女性', 2: '男性'})
display_df['cardio'] = display_df['cardio'].map({0: '✅ 健康', 1: '⚠️ 有风险'})
display_df.columns = ['ID', '年龄', '性别', '身高(cm)', '体重(kg)', 'BMI', 'BMI分类',
'收缩压', '舒张压', '胆固醇', '血糖', '心血管状态']
st.dataframe(
display_df,
use_container_width=True,
height=400,
column_config={
"ID": st.column_config.NumberColumn(width="small"),
"年龄": st.column_config.NumberColumn(width="small"),
"心血管状态": st.column_config.TextColumn(width="medium")
}
)
def main():
"""主函数"""
render_header()
# 加载数据
with st.spinner("正在加载数据..."):
raw_df = load_data(DATA_PATH)
if raw_df.empty:
st.error("❌ 数据加载失败,请检查数据文件路径!")
return
# 数据处理
with st.spinner("正在进行数据清洗..."):
df = clean_and_engineer_features(raw_df)
total_count = len(df)
# 侧边栏筛选
filters = create_sidebar(df)
# 应用筛选
filtered_df = apply_filters(df, filters)
# 指标卡片
render_metrics(filtered_df, total_count)
st.markdown("
", unsafe_allow_html=True)
# 图表区域
st.markdown('📈 可视化分析
', unsafe_allow_html=True)
# 第一行图表
col1, col2 = st.columns(2)
with col1:
fig1 = plot_age_distribution(filtered_df)
st.plotly_chart(fig1, use_container_width=True, key="age_chart")
with col2:
fig2 = plot_bmi_distribution(filtered_df)
st.plotly_chart(fig2, use_container_width=True, key="bmi_pie")
# 第二行图表
col3, col4 = st.columns(2)
with col3:
fig3 = plot_bmi_cardio_relation(filtered_df)
st.plotly_chart(fig3, use_container_width=True, key="bmi_cardio")
with col4:
fig4 = plot_cholesterol_analysis(filtered_df)
st.plotly_chart(fig4, use_container_width=True, key="chol_chart")
# 第三行图表
col5, _ = st.columns([1, 1])
with col5:
fig5 = plot_blood_pressure_scatter(filtered_df)
st.plotly_chart(fig5, use_container_width=True, key="bp_scatter")
st.markdown("
", unsafe_allow_html=True)
# 数据表格
render_data_table(filtered_df)
# 页脚
st.markdown("
", unsafe_allow_html=True)
st.markdown(
"❤️ CardioAI © 2024 | 心血管疾病智能分析系统
",
unsafe_allow_html=True
)
if __name__ == "__main__":
main()