""" CardioAI 模块1: 交互式仪表盘 心血管疾病数据可视化系统 - 美化版 """ import streamlit as st import pandas as pd import numpy as np import plotly.express as px import plotly.graph_objects as go from pathlib import Path # ==================== 页面配置 ==================== st.set_page_config( page_title="CardioAI 心血管疾病分析", page_icon="❤️", layout="wide", initial_sidebar_state="expanded" ) # ==================== 自定义CSS样式 ==================== st.markdown(""" """, unsafe_allow_html=True) # ==================== 常量定义 ==================== CODE_ROOT = Path(r"F:\My_Git_Project\CardioAI") DATA_PATH = CODE_ROOT / "data" / "心血管疾病.xlsx" # 配色方案 COLORS = { 'primary': ['#667eea', '#764ba2', '#f093fb', '#f5576c'], 'safe': '#2ecc71', 'risk': '#e74c3c', 'gradient': ['#667eea', '#764ba2'], 'bmi': ['#3498db', '#2ecc71', '#f39c12', '#e74c3c'] } # ==================== 数据加载函数 ==================== @st.cache_data(show_spinner=False) def load_data(file_path: Path) -> pd.DataFrame: """加载数据,支持Excel格式""" try: df = pd.read_excel(file_path, engine='openpyxl') return df except Exception as e: st.error(f"数据加载失败: {e}") return pd.DataFrame() @st.cache_data(show_spinner=False) def clean_and_engineer_features(df: pd.DataFrame) -> pd.DataFrame: """数据清洗和特征工程""" df = df.copy() # 1. 特征工程 df['age_years'] = (df['age'] / 365).round().astype(int) df['bmi'] = df['weight'] / ((df['height'] / 100) ** 2) # 2. 异常值处理 df = df[df['ap_lo'] < df['ap_hi']] df = df[(df['ap_hi'] >= 90) & (df['ap_hi'] <= 250)] df = df[(df['ap_lo'] >= 60) & (df['ap_lo'] <= 150)] # 3. 类别转换 cholesterol_map = {1: '正常', 2: '偏高', 3: '非常高'} gluc_map = {1: '正常', 2: '偏高', 3: '非常高'} df['cholesterol_cat'] = df['cholesterol'].map(cholesterol_map) df['gluc_cat'] = df['gluc'].map(gluc_map) # 4. BMI分类 def categorize_bmi(bmi): if bmi < 18.5: return '体重过低' elif bmi < 25: return '体重正常' elif bmi < 30: return '超重' else: return '肥胖' df['bmi_category'] = df['bmi'].apply(categorize_bmi) return df # ==================== UI组件 ==================== def render_header(): """渲染页面头部""" st.markdown('

❤️ CardioAI

', unsafe_allow_html=True) st.markdown('

心血管疾病智能分析系统 | 数据驱动的健康洞察

', unsafe_allow_html=True) st.markdown("
", unsafe_allow_html=True) def create_sidebar(df: pd.DataFrame) -> dict: """创建美观的侧边栏""" with st.sidebar: st.markdown("### 🎛️ 数据筛选器") st.markdown("---") # 年龄范围 with st.container(): st.markdown("**📅 年龄范围**") age_range = st.slider( "", min_value=int(df['age_years'].min()), max_value=int(df['age_years'].max()), value=(int(df['age_years'].min()), int(df['age_years'].max())), key="age_slider" ) st.markdown("
", unsafe_allow_html=True) # 性别选择 with st.container(): st.markdown("**👤 性别**") gender_options = st.multiselect( "", options=[1, 2], default=[1, 2], format_func=lambda x: "👩 女性" if x == 1 else "👨 男性", key="gender_select" ) st.markdown("
", unsafe_allow_html=True) # 心血管疾病状态 with st.container(): st.markdown("**🏥 心血管健康状态**") cardio_options = st.multiselect( "", options=[0, 1], default=[0, 1], format_func=lambda x: "✅ 健康" if x == 0 else "⚠️ 有风险", key="cardio_select" ) st.markdown("---") # 数据统计 st.markdown("### 📊 数据概览") st.metric("总记录数", f"{len(df):,}") st.metric("平均BMI", f"{df['bmi'].mean():.1f}") st.metric("平均年龄", f"{df['age_years'].mean():.1f} 岁") return { 'age_range': age_range, 'gender': gender_options, 'cardio': cardio_options } def apply_filters(df: pd.DataFrame, filters: dict) -> pd.DataFrame: """应用筛选条件""" return df[ (df['age_years'] >= filters['age_range'][0]) & (df['age_years'] <= filters['age_range'][1]) & (df['gender'].isin(filters['gender'])) & (df['cardio'].isin(filters['cardio'])) ] def render_metrics(filtered_df: pd.DataFrame, total_count: int): """渲染指标卡片""" st.markdown('
📊 关键指标
', unsafe_allow_html=True) col1, col2, col3, col4 = st.columns(4) with col1: st.markdown(f"""
📋 筛选记录数
{len(filtered_df):,}
占比 {(len(filtered_df)/total_count*100):.1f}%
""", unsafe_allow_html=True) with col2: risk_rate = (filtered_df['cardio'].sum() / len(filtered_df) * 100) if len(filtered_df) > 0 else 0 st.markdown(f"""
⚠️ 风险率
{risk_rate:.1f}%
心血管疾病患者占比
""", unsafe_allow_html=True) with col3: avg_age = filtered_df['age_years'].mean() if len(filtered_df) > 0 else 0 st.markdown(f"""
📅 平均年龄
{avg_age:.1f}
""", unsafe_allow_html=True) with col4: avg_bmi = filtered_df['bmi'].mean() if len(filtered_df) > 0 else 0 st.markdown(f"""
⚖️ 平均BMI
{avg_bmi:.1f}
{get_bmi_status(avg_bmi)}
""", unsafe_allow_html=True) def get_bmi_status(bmi: float) -> str: """获取BMI状态""" if bmi < 18.5: return "体重过低" elif bmi < 25: return "体重正常" elif bmi < 30: return "超重" return "肥胖" def plot_age_distribution(df: pd.DataFrame): """年龄分布图 - 美化版""" fig = px.histogram( df, x='age_years', color='cardio', nbins=30, title="年龄分布趋势", labels={'age_years': '年龄', 'count': '人数'}, color_discrete_map={0: '#2ecc71', 1: '#e74c3c'}, barmode='overlay', opacity=0.8 ) fig.update_layout( template='plotly_white', title_font_size=18, title_x=0.5, legend_title_text="", legend=dict( orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1 ), plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)', font=dict(family="Arial, sans-serif", size=12), margin=dict(t=60, b=40, l=40, r=40) ) fig.for_each_trace(lambda t: t.update( name="✅ 健康" if t.name == "0" else "⚠️ 有风险", marker_line_width=1, marker_line_color='white' )) return fig def plot_bmi_distribution(df: pd.DataFrame): """BMI分布饼图""" bmi_counts = df['bmi_category'].value_counts().reindex(['体重过低', '体重正常', '超重', '肥胖']) fig = go.Figure(data=[go.Pie( labels=bmi_counts.index, values=bmi_counts.values, hole=0.5, marker=dict( colors=COLORS['bmi'], line=dict(color='white', width=2) ), textinfo='label+percent', textposition='outside', textfont=dict(size=12) )]) fig.update_layout( title=dict(text="BMI分布", font=dict(size=18), x=0.5), template='plotly_white', showlegend=False, plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)', margin=dict(t=60, b=40, l=40, r=40), annotations=[dict(text='BMI', x=0.5, y=0.5, font_size=20, showarrow=False)] ) return fig def plot_bmi_cardio_relation(df: pd.DataFrame): """BMI与心血管疾病关系 - 堆叠柱状图""" bmi_cardio = df.groupby(['bmi_category', 'cardio']).size().unstack(fill_value=0) bmi_order = ['体重过低', '体重正常', '超重', '肥胖'] bmi_cardio = bmi_cardio.reindex(bmi_order) fig = go.Figure() fig.add_trace(go.Bar( name='✅ 健康', x=bmi_cardio.index, y=bmi_cardio[0], marker_color='#2ecc71', marker_line=dict(color='white', width=1) )) fig.add_trace(go.Bar( name='⚠️ 有风险', x=bmi_cardio.index, y=bmi_cardio[1], marker_color='#e74c3c', marker_line=dict(color='white', width=1) )) fig.update_layout( title=dict(text="BMI与心血管疾病关联分析", font=dict(size=18), x=0.5), xaxis_title="BMI类别", yaxis_title="人数", barmode='stack', template='plotly_white', legend=dict( orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1 ), plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)', margin=dict(t=60, b=40, l=40, r=40) ) return fig def plot_blood_pressure_scatter(df: pd.DataFrame): """血压散点图""" sample_df = df.sample(min(2000, len(df))) # 采样避免过多点 fig = px.scatter( sample_df, x='ap_lo', y='ap_hi', color='cardio', color_discrete_map={0: '#2ecc71', 1: '#e74c3c'}, opacity=0.6, title="血压分布散点图", labels={'ap_lo': '舒张压 (mmHg)', 'ap_hi': '收缩压 (mmHg)'} ) fig.update_layout( template='plotly_white', title_font_size=18, title_x=0.5, legend_title_text="", legend=dict( orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1 ), plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)', margin=dict(t=60, b=40, l=40, r=40) ) fig.for_each_trace(lambda t: t.update( name="✅ 健康" if t.name == "0" else "⚠️ 有风险" )) return fig def plot_cholesterol_analysis(df: pd.DataFrame): """胆固醇与心血管疾病关系""" chol_cardio = df.groupby(['cholesterol_cat', 'cardio']).size().unstack(fill_value=0) chol_order = ['正常', '偏高', '非常高'] chol_cardio = chol_cardio.reindex(chol_order) fig = go.Figure() fig.add_trace(go.Bar( name='✅ 健康', x=chol_cardio.index, y=chol_cardio[0] if 0 in chol_cardio.columns else [0, 0, 0], marker_color='#2ecc71' )) fig.add_trace(go.Bar( name='⚠️ 有风险', x=chol_cardio.index, y=chol_cardio[1] if 1 in chol_cardio.columns else [0, 0, 0], marker_color='#e74c3c' )) fig.update_layout( title=dict(text="胆固醇水平与心血管疾病", font=dict(size=18), x=0.5), xaxis_title="胆固醇水平", yaxis_title="人数", barmode='group', template='plotly_white', legend=dict( orientation="h", yanchor="bottom", y=1.02, xanchor="right", x=1 ), plot_bgcolor='rgba(0,0,0,0)', paper_bgcolor='rgba(0,0,0,0)', margin=dict(t=60, b=40, l=40, r=40) ) return fig def render_data_table(df: pd.DataFrame): """渲染数据表格""" st.markdown('
📋 数据明细
', unsafe_allow_html=True) display_cols = ['id', 'age_years', 'gender', 'height', 'weight', 'bmi', 'bmi_category', 'ap_hi', 'ap_lo', 'cholesterol_cat', 'gluc_cat', 'cardio'] display_df = df[display_cols].copy() display_df['gender'] = display_df['gender'].map({1: '女性', 2: '男性'}) display_df['cardio'] = display_df['cardio'].map({0: '✅ 健康', 1: '⚠️ 有风险'}) display_df.columns = ['ID', '年龄', '性别', '身高(cm)', '体重(kg)', 'BMI', 'BMI分类', '收缩压', '舒张压', '胆固醇', '血糖', '心血管状态'] st.dataframe( display_df, use_container_width=True, height=400, column_config={ "ID": st.column_config.NumberColumn(width="small"), "年龄": st.column_config.NumberColumn(width="small"), "心血管状态": st.column_config.TextColumn(width="medium") } ) def main(): """主函数""" render_header() # 加载数据 with st.spinner("正在加载数据..."): raw_df = load_data(DATA_PATH) if raw_df.empty: st.error("❌ 数据加载失败,请检查数据文件路径!") return # 数据处理 with st.spinner("正在进行数据清洗..."): df = clean_and_engineer_features(raw_df) total_count = len(df) # 侧边栏筛选 filters = create_sidebar(df) # 应用筛选 filtered_df = apply_filters(df, filters) # 指标卡片 render_metrics(filtered_df, total_count) st.markdown("
", unsafe_allow_html=True) # 图表区域 st.markdown('
📈 可视化分析
', unsafe_allow_html=True) # 第一行图表 col1, col2 = st.columns(2) with col1: fig1 = plot_age_distribution(filtered_df) st.plotly_chart(fig1, use_container_width=True, key="age_chart") with col2: fig2 = plot_bmi_distribution(filtered_df) st.plotly_chart(fig2, use_container_width=True, key="bmi_pie") # 第二行图表 col3, col4 = st.columns(2) with col3: fig3 = plot_bmi_cardio_relation(filtered_df) st.plotly_chart(fig3, use_container_width=True, key="bmi_cardio") with col4: fig4 = plot_cholesterol_analysis(filtered_df) st.plotly_chart(fig4, use_container_width=True, key="chol_chart") # 第三行图表 col5, _ = st.columns([1, 1]) with col5: fig5 = plot_blood_pressure_scatter(filtered_df) st.plotly_chart(fig5, use_container_width=True, key="bp_scatter") st.markdown("
", unsafe_allow_html=True) # 数据表格 render_data_table(filtered_df) # 页脚 st.markdown("
", unsafe_allow_html=True) st.markdown( "

❤️ CardioAI © 2024 | 心血管疾病智能分析系统

", unsafe_allow_html=True ) if __name__ == "__main__": main()