目录
  1. 1. 一、Pandas 基础速览
    1. 1.1. 1.1 核心数据结构
    2. 1.2. 1.2 快速理解 Index
  2. 2. 二、MultiIndex —— 多级索引
    1. 2.1. 2.1 创建 MultiIndex
    2. 2.2. 2.2 多级索引的切片与查找
    3. 2.3. 2.3 get_level_values 和 xs
    4. 2.4. 2.4 MultiIndex 的操作与重设
    5. 2.5. 2.5 聚合中的 MultiIndex 使用
    6. 2.6. 2.6 模拟 Panel Data(三维数据)
  3. 3. 三、窗口函数 —— rolling / expanding / ewm
    1. 3.1. 3.1 rolling —— 滚动窗口
    2. 3.2. 3.2 rolling 的关键参数
    3. 3.3. 3.3 expanding —— 扩展窗口
    4. 3.4. 3.4 ewm —— 指数加权移动窗口
  4. 4. 四、数据重塑 —— pivot / melt / stack / unstack / crosstab
    1. 4.1. 4.1 pivot —— 长表转宽表
    2. 4.2. 4.2 pivot_table —— 支持聚合的数据透视表
    3. 4.3. 4.3 melt —— 宽表转长表
    4. 4.4. 4.4 stack / unstack
    5. 4.5. 4.5 crosstab —— 交叉表
  5. 5. 五、Categorical —— 分类数据类型
    1. 5.1. 5.1 创建分类数据
    2. 5.2. 5.2 分类数据的内存优势
    3. 5.3. 5.3 分类数据的操作
  6. 6. 六、方法链式编程 —— pipe / assign / query
    1. 6.1. 6.1 assign —— 动态添加列
    2. 6.2. 6.2 pipe —— 链式调用自定义函数
    3. 6.3. 6.3 query —— 用字符串表达式筛选行
    4. 6.4. 6.4 完整链式示例:数据管道
  7. 7. 七、eval / query 的高性能评估
    1. 7.1. 7.1 DataFrame.eval
    2. 7.2. 7.2 使用局部变量和 @ 前缀
    3. 7.3. 7.3 运算符对照
  8. 8. 八、数据库互操作 —— read_sql / to_sql
    1. 8.1. 8.1 SQLAlchemy 集成
    2. 8.2. 8.2 写入数据库
    3. 8.3. 8.3 分块读取大查询
  9. 9. 九、时间序列 —— resample 与时间处理
    1. 9.1. 9.1 生成日期范围
    2. 9.2. 9.2 resample —— 时间重采样
    3. 9.3. 9.3 自定义重采样规则
    4. 9.4. 9.4 时间属性与操作
    5. 9.5. 9.5 shift / diff / pct_change —— 滞后与变化
  10. 10. 十、绘图集成 —— plot
  11. 11. 十一、Styler —— DataFrame 格式化输出
    1. 11.1. 11.1 基本格式化
    2. 11.2. 11.2 条件背景色和文字色
    3. 11.3. 11.3 内置条形图
    4. 11.4. 11.4 自定义多条件着色
    5. 11.5. 11.5 导出样式
  12. 12. 十二、生产级数据流水线
    1. 12.1. 12.1 设计模式:函数式管道
    2. 12.2. 12.2 带日志的管道
    3. 12.3. 12.3 大数据的分块处理
    4. 12.4. 12.4 使用 Dask 进行大规模 Pandas 操作
    5. 12.5. 12.5 内存优化技巧
  13. 13. 十三、高级操作示例
    1. 13.1. 13.1 merge / join 的各种模式
    2. 13.2. 13.2 使用 transform 进行组内转换
    3. 13.3. 13.3 使用 cut / qcut 进行分箱
    4. 13.4. 13.4 使用 explode 展开列表列
  14. 14. 十四、总结
【Python系列】Pandas权威指南

Pandas 是 Python 数据科学生态中最核心的库之一。它以 DataFrame 和 Series 为基本数据结构,在 NumPy 基础上提供了标签化的数据操作能力,涵盖数据导入/导出、清洗、转换、聚合、窗口计算、时间序列、可视化等全流程。本文聚焦 Pandas 中高级特性:MultiIndex 多级索引、窗口函数、数据重塑、分类数据类型、方法链式编程、高性能 eval/query、数据库互操作、时间序列、绘图集成、Styler 格式化以及生产级数据处理流水线。

一、Pandas 基础速览

1.1 核心数据结构

Pandas 有两个核心结构:

  • Series:带标签的一维数组,可理解为「带索引的 ndarray」。
  • DataFrame:带标签的二维表格,每列可以是不同的 dtype。
import pandas as pd
import numpy as np

# Series
s = pd.Series([10, 20, 30, 40], index=['a', 'b', 'c', 'd'], name='value')
print(s.index) # Index(['a', 'b', 'c', 'd'])
print(s.values) # [10 20 30 40]
print(s['b']) # 20

# DataFrame
df = pd.DataFrame({
'name': ['Alice', 'Bob', 'Charlie'],
'age': [25, 30, 35],
'score': [92.5, 85.0, 88.5],
})
print(df.shape) # (3, 3)
print(df.columns) # Index(['name', 'age', 'score'])
print(df.dtypes)
# name object
# age int64
# score float64

1.2 快速理解 Index

Index 是 Pandas 中所有数据结构的标签轴。它不是普通的 NumPy 数组——它是不可变的、支持集合运算的、可哈希的:

idx1 = pd.Index(['a', 'b', 'c', 'd'])
idx2 = pd.Index(['c', 'd', 'e', 'f'])

print(idx1.intersection(idx2)) # ['c', 'd']
print(idx1.union(idx2)) # ['a', 'b', 'c', 'd', 'e', 'f']
print(idx1.difference(idx2)) # ['a', 'b']
print(idx1.symmetric_difference(idx2)) # ['a', 'b', 'e', 'f']

二、MultiIndex —— 多级索引

MultiIndex(分层索引)是 Pandas 处理高维数据的核心机制。它允许在一个轴上拥有多个索引级别,使得你可以在二维的 DataFrame 中表达多维数据。

2.1 创建 MultiIndex

# 方式 1:from_tuples
tuples = [('2023', 'Q1'), ('2023', 'Q2'), ('2023', 'Q3'), ('2023', 'Q4'),
('2024', 'Q1'), ('2024', 'Q2')]
mi = pd.MultiIndex.from_tuples(tuples, names=['year', 'quarter'])

# 方式 2:from_product(笛卡尔积)
mi = pd.MultiIndex.from_product(
[['2023', '2024'], ['Q1', 'Q2', 'Q3', 'Q4']],
names=['year', 'quarter']
)

# 方式 3:from_arrays
mi = pd.MultiIndex.from_arrays(
[['2023']*4 + ['2024']*4, ['Q1','Q2','Q3','Q4']*2],
names=['year', 'quarter']
)

# 方式 4:from_frame(从 DataFrame 创建,pandas 1.0+)
df_idx = pd.DataFrame({'year': ['2023']*4+['2024']*4,
'quarter': ['Q1','Q2','Q3','Q4']*2})
mi = pd.MultiIndex.from_frame(df_idx)

# 创建带 MultiIndex 的 DataFrame
data = np.random.randn(8, 3)
df = pd.DataFrame(data, index=mi, columns=['revenue', 'cost', 'profit'])
print(df)

2.2 多级索引的切片与查找

# loc —— 标签索引
print(df.loc['2023']) # 2023 的所有行(单一级别)
print(df.loc[('2023', 'Q1')]) # 精确匹配
print(df.loc['2023':'2024']) # 跨年范围切片

# 多级切片的语法
print(df.loc[('2023', ['Q1', 'Q3']), :]) # 2023 年的 Q1 和 Q3
print(df.loc[(slice(None), 'Q1'), :]) # 所有年份的 Q1

# 在 MultiIndex 中使用 pd.IndexSlice(更优雅)
idx = pd.IndexSlice
print(df.loc[idx['2023', :], :]) # 等价 df.loc['2023']
print(df.loc[idx[:, 'Q1'], 'revenue']) # 所有年份 Q1 的 revenue
print(df.loc[idx['2023':'2024', ['Q1','Q4']], ['revenue', 'profit']])

2.3 get_level_values 和 xs

# get_level_values:获取某一级的所有值
print(df.index.get_level_values(0)) # ['2023','2023',...,'2024','2024']
print(df.index.get_level_values('year')) # 同上

# xs (cross-section):按某一层级的特定值提取
print(df.xs('Q2', level='quarter')) # 所有年份的 Q2
print(df.xs('2024', level='year')) # 2024 年所有季度

# xs 的 drop_level 参数
result = df.xs('Q1', level='quarter', drop_level=True) # 去掉 quarter 层级
# result 的 index 只剩 year 这一级

# xs 的 axis 参数:也可以对 columns 的 MultiIndex 操作

2.4 MultiIndex 的操作与重设

# reset_index:将 MultiIndex 转为列
df_flat = df.reset_index() # year 和 quarter 变成普通列

# set_index:将列转为 MultiIndex
df_reindexed = df_flat.set_index(['year', 'quarter'])

# swaplevel:交换层级顺序
df_swapped = df.swaplevel('year', 'quarter')
df_swapped = df.swaplevel(0, 1) # 等价,按位置

# sort_index:按 MultiIndex 排序
df_sorted = df.sort_index(level='quarter', ascending=False)

# reorder_levels:重新排列层级
df_reordered = df.reorder_levels(['quarter', 'year'])

2.5 聚合中的 MultiIndex 使用

# groupby 多列默认产生 MultiIndex
df = pd.DataFrame({
'region': ['East', 'East', 'West', 'West', 'East', 'West'],
'product': ['A', 'B', 'A', 'B', 'A', 'B'],
'sales': [100, 150, 200, 120, 180, 90],
'year': [2023, 2023, 2023, 2023, 2024, 2024],
})

# 多列 groupby → MultiIndex columns
pivot = df.groupby(['region', 'year'])['sales'].agg(['sum', 'mean', 'std'])
print(pivot.columns) # MultiIndex([('sum',), ('mean',), ('std',)])

# 展平 MultiIndex columns
pivot.columns = ['_'.join(col).strip() for col in pivot.columns.values]

2.6 模拟 Panel Data(三维数据)

MultiIndex 可以将三维或更高维的数据「折叠」进二维 DataFrame:

# 模拟(时间, 资产, 特征)的三维数据
dates = pd.date_range('2024-01-01', periods=5, freq='D')
assets = ['AAPL', 'GOOGL', 'MSFT']
features = ['open', 'high', 'low', 'close', 'volume']

# 用 MultiIndex columns 表示 (asset, feature)
mi_columns = pd.MultiIndex.from_product([assets, features],
names=['asset', 'feature'])
df_panel = pd.DataFrame(np.random.randn(5, 15),
index=dates, columns=mi_columns)

# 提取单个 asset 的单个 feature
print(df_panel.loc[:, ('AAPL', 'close')])

# 提取所有 asset 的 close(xs with axis=1)
print(df_panel.xs('close', level='feature', axis=1))

三、窗口函数 —— rolling / expanding / ewm

窗口函数是时序分析的核心工具。Pandas 提供了三种窗口机制:固定大小窗口(rolling)、累积窗口(expanding)和指数加权窗口(ewm)。

3.1 rolling —— 滚动窗口

import pandas as pd
import numpy as np

# 创建日收益率序列
np.random.seed(42)
dates = pd.date_range('2024-01-01', periods=252, freq='B') # 一年交易日
returns = pd.Series(np.random.randn(252) * 0.01, index=dates, name='daily_return')

# 基础滚动操作
ma20 = returns.rolling(window=20).mean() # 20 日移动平均
vol20 = returns.rolling(window=20).std() # 20 日滚动标准差(波动率)

# 滚动窗口的自定义函数
rolling_max_drawdown = returns.rolling(window=60).apply(
lambda x: (x.cummax() - x).max()
)

# 滚动相关系数
df = pd.DataFrame({
'a': np.random.randn(252),
'b': np.random.randn(252),
})
rolling_corr = df['a'].rolling(60).corr(df['b'])

# 滚动聚合:多统计量一次计算
result = returns.rolling(60).agg(['mean', 'std', 'skew', 'kurtosis'])

3.2 rolling 的关键参数

# min_periods:需要的最小观测数,默认为 window
ma = returns.rolling(window=20, min_periods=10).mean() # 有无 10 个数据点就开始计算

# center:窗口居中(标签在窗口中央)
ma_center = returns.rolling(window=20, center=True).mean()

# win_type:加权窗口类型
ma_triang = returns.rolling(window=20, win_type='triang').mean() # 三角加权
ma_gauss = returns.rolling(window=20, win_type='gaussian').mean(std=5)

# closed:区间闭合方式
# 'right'(默认):窗口为 (t-window, t]
# 'left':窗口为 [t, t+window)
# 'both':窗口为 [t-window, t]
# 'neither':窗口为 (t-window, t)

3.3 expanding —— 扩展窗口

expanding 窗口从第一个观测值开始,不断增大窗口直到包含所有数据。对于计算「到当前时刻为止」的累积统计非常有用:

# 累积均值(从一开始到当前的均值)
cum_mean = returns.expanding().mean()

# 累积极值
cum_max = returns.expanding().max()
cum_min = returns.expanding().min()

# 累积求和
cum_sum = returns.expanding().sum()

# 自定义累积函数
cum_sharpe = returns.expanding().apply(
lambda x: np.sqrt(252) * x.mean() / x.std() if x.std() > 0 else 0
)

3.4 ewm —— 指数加权移动窗口

ewm 对近期数据赋予更高的权重,权重按指数衰减。这是金融时间序列中计算波动率等指标的标准工具:

# 指数加权移动平均(EWMA)
ewma = returns.ewm(span=20).mean()

# 参数选择:span, halflife, alpha 和 com 均等效
# span=20 → alpha = 2/(span+1) = 2/21 ≈ 0.095
# halflife=14 → alpha ≈ 0.048
# com=9.5 → alpha = 1/(com+1) ≈ 0.095
ewma_span = returns.ewm(span=20).mean()
ewma_halflife = returns.ewm(halflife=14).mean()

# 指数加权标准差
ewm_vol = returns.ewm(span=20).std()

# 指数加权协方差 / 相关系数
ewm_corr = df['a'].ewm(span=60).corr(df['b'])

四、数据重塑 —— pivot / melt / stack / unstack / crosstab

4.1 pivot —— 长表转宽表

pivot 通过指定 index、columns 和 values 来重塑数据:

df = pd.DataFrame({
'date': ['2024-01-01', '2024-01-01', '2024-01-02', '2024-01-02'],
'city': ['Beijing', 'Shanghai', 'Beijing', 'Shanghai'],
'temperature': [1.5, 8.2, 2.1, 9.5],
'humidity': [30, 65, 28, 70],
})

# 将 city 从行变成列
pivoted = df.pivot(index='date', columns='city', values='temperature')
print(pivoted)
# city Beijing Shanghai
# date
# 2024-01-01 1.5 8.2
# 2024-01-02 2.1 9.5

# 当没有 values 时,剩余所有列都会成为 value 列
pivoted_all = df.pivot(index='date', columns='city')
# 产生 MultiIndex columns: (temperature, Beijing), (temperature, Shanghai), ...

4.2 pivot_table —— 支持聚合的数据透视表

当 index 和 columns 的组合不唯一时,pivot_table 通过 aggregation 来处理重复:

df = pd.DataFrame({
'date': ['2024-01-01', '2024-01-01', '2024-01-01', '2024-01-02'],
'city': ['Beijing', 'Shanghai', 'Beijing', 'Beijing'],
'temperature': [1.5, 8.2, 2.1, 3.5],
'humidity': [30, 65, 28, 25],
})

# pivot 会报错(Beijing 在 2024-01-01 出现了两次)
# 使用 pivot_table 并指定 aggfunc
pt = pd.pivot_table(df,
index='date',
columns='city',
values='temperature',
aggfunc='mean',
fill_value=0)
print(pt)

# 多值聚合
pt_multi = pd.pivot_table(df,
index='date',
columns='city',
values=['temperature', 'humidity'],
aggfunc={'temperature': ['mean', 'min', 'max'],
'humidity': 'mean'},
margins=True, # 添加汇总行/列
margins_name='Total')

4.3 melt —— 宽表转长表

meltpivot 的逆操作,将多个列「熔」成两列:变量名和值:

df_wide = pd.DataFrame({
'name': ['Alice', 'Bob', 'Charlie'],
'math': [85, 90, 78],
'physics': [92, 85, 88],
'chemistry': [88, 82, 91],
})

# melt:将所有学科列转为行
df_long = pd.melt(df_wide,
id_vars=['name'], # 保持不变的列
value_vars=['math', 'physics', 'chemistry'],
var_name='subject', # 变量名列的新列名
value_name='score') # 值列的新列名
print(df_long)
# name subject score
# 0 Alice math 85
# 1 Bob math 90
# ...
# 8 Charlie chemistry 91

4.4 stack / unstack

这两个操作在内层/外层的轴层级上进行旋转:

# 创建带 MultiIndex 的 DataFrame
mi = pd.MultiIndex.from_product([['Alice','Bob'], ['Math','Physics']],
names=['student', 'subject'])
df = pd.DataFrame({'score': [85, 92, 90, 85]}, index=mi)
print(df)
# score
# student subject
# Alice Math 85
# Physics 92
# Bob Math 90
# Physics 85

# unstack:将行索引层「旋转」为列
df_unstacked = df.unstack(level='subject')
print(df_unstacked)
# score
# subject Math Physics
# student
# Alice 85 92
# Bob 90 85

# unstack 可以指定 fill_value
df_unstacked = df.unstack(level='subject', fill_value=0)

# stack:将列「压」回行索引
df_stacked = df_unstacked.stack(level='subject')
# 结果回到了初始的 MultiIndex 形式

# 指定 level
df_unstacked = df.unstack(level=1) # 按位置指定
df_stacked = df_unstacked.stack(level=1)

4.5 crosstab —— 交叉表

crosstab 是专门用于计算分组频率/比例的便捷函数:

# 模拟调查数据
df = pd.DataFrame({
'gender': ['M', 'F', 'M', 'F', 'M', 'F', 'M', 'F', 'M', 'M'],
'age_group': ['18-25', '18-25', '26-35', '26-35',
'26-35', '36+', '36+', '18-25', '36+', '26-35'],
'purchased': [1, 1, 0, 1, 1, 0, 0, 1, 1, 0],
})

# 频率计数
ct = pd.crosstab(df['gender'], df['age_group'])
print(ct)

# 带百分比
ct_pct = pd.crosstab(df['gender'], df['age_group'], normalize='index')
print(ct_pct)

# 带聚合值(margins=True 加总)
ct_agg = pd.crosstab(df['gender'], df['age_group'],
values=df['purchased'],
aggfunc='mean',
margins=True)
print(ct_agg)

五、Categorical —— 分类数据类型

对于取值有限且重复率高的列(如性别、城市、等级等),使用 category dtype 可大幅降低内存占用并加速某些操作。

5.1 创建分类数据

# 显式转换
df['gender'] = df['gender'].astype('category')

# 通过 pd.Categorical 构造
s = pd.Series(pd.Categorical(['low', 'medium', 'high', 'medium', 'low'],
categories=['low', 'medium', 'high'],
ordered=True))

# 查看类别信息
print(s.cat.categories) # ['low', 'medium', 'high']
print(s.cat.ordered) # True
print(s.cat.codes) # [0, 1, 2, 1, 0] 底层的整数编码

5.2 分类数据的内存优势

import numpy as np

# 1000 万个字符串 vs 分类
n = 10_000_000
labels = np.random.choice(['cat', 'dog', 'bird', 'fish', 'hamster'], n)

# 字符串存储
s_str = pd.Series(labels)
print(f"object dtype: {s_str.memory_usage(deep=True) / 1024**2:.1f} MB")

# 分类存储
s_cat = s_str.astype('category')
print(f"category dtype: {s_cat.memory_usage(deep=True) / 1024**2:.1f} MB")
# 典型结果:
# object dtype: ~600 MB
# category dtype: ~10 MB(减少 98%)

5.3 分类数据的操作

# 重新排序类别
s_cat = s_cat.cat.reorder_categories(['hamster', 'bird', 'cat', 'dog', 'fish'])

# 添加新类别
s_cat = s_cat.cat.add_categories(['rabbit'])

# 移除未使用的类别
s_cat = s_cat.cat.remove_unused_categories()

# 重命名类别
s_cat = s_cat.cat.rename_categories({'cat': 'feline', 'dog': 'canine'})

# 设置/取消有序性
s_cat = s_cat.cat.as_ordered()
s_cat = s_cat.cat.as_unordered()

# groupby 可以保留所有类别(即使某些没有数据)
df = pd.DataFrame({'grade': pd.Categorical(['A', 'B', 'A', 'A'],
categories=['A','B','C','D']),
'score': [95, 80, 92, 88]})
print(df.groupby('grade', observed=False)['score'].mean())
# A 91.666667
# B 80.000000
# C NaN ← C 类没有数据但被保留
# D NaN

六、方法链式编程 —— pipe / assign / query

Pandas 的方法链(method chaining)风格避免中间变量污染,使数据处理流水线清晰可读。核心工具包括 pipeassignquerywhere/mask

6.1 assign —— 动态添加列

import pandas as pd
import numpy as np

df = pd.DataFrame({'x': np.random.randn(100), 'y': np.random.randn(100)})

# 链式添加多列(每列可以依赖刚创建的前一列)
result = (df
.assign(z=lambda d: d['x'] + d['y']) # z = x + y
.assign(magnitude=lambda d: np.sqrt(d['x']**2 + d['y']**2))
.assign(quadrant=lambda d: np.select(
[(d['x']>=0) & (d['y']>=0), (d['x']<0) & (d['y']>=0),
(d['x']<0) & (d['y']<0), (d['x']>=0) & (d['y']<0)],
['Q1', 'Q2', 'Q3', 'Q4'], default='axis'))
)

6.2 pipe —— 链式调用自定义函数

当内置方法不够用,需要插入自定义函数时,pipe 允许将任意函数注入方法链:

def standardize(df, columns):
"""将指定列标准化为 z-score"""
df = df.copy()
for col in columns:
df[col] = (df[col] - df[col].mean()) / df[col].std()
return df

def remove_outliers(df, columns, n_std=3):
"""移除超过 n 倍标准差的异常值"""
mask = pd.Series(True, index=df.index)
for col in columns:
mask &= (np.abs(df[col]) <= n_std)
return df[mask]

def add_interaction_terms(df, col1, col2):
"""添加交互项"""
return df.assign(**{f'{col1}_{col2}_interact': df[col1] * df[col2]})

# 将所有步骤链接起来
processed = (pd.DataFrame({'a': np.random.randn(1000),
'b': np.random.randn(1000),
'c': np.random.randn(1000)})
.pipe(standardize, ['a', 'b', 'c'])
.pipe(remove_outliers, ['a', 'b', 'c'], n_std=3)
.pipe(add_interaction_terms, 'a', 'b')
.assign(result=lambda d: d['a'] + d['b'] - d['a_b_interact'])
)

pipe 的第一个参数 func 接收 DataFrame,其后的 *args**kwargs 传递给 func。通过 (callable, *args, **kwargs) 元组语法还可以在 pipe 中串联多个函数:

result = (df
.pipe((standardize, 'data'), columns=['a', 'b']) # 名字为元组时的特殊语法
.pipe(remove_outliers, ['a', 'b'])
)

6.3 query —— 用字符串表达式筛选行

query 允许使用字符串表达式进行行过滤,语法近似 SQL 的 WHERE 子句,在某些场景下比布尔索引更易读:

df = pd.DataFrame({
'name': ['Alice', 'Bob', 'Charlie', 'Diana', 'Eve'],
'age': [25, 17, 35, 28, 16],
'score': [92, 78, 88, 95, 82],
'grade': ['A', 'B', 'B', 'A', 'C'],
})

# 简单查询
adults = df.query('age >= 18')

# 复合条件
result = df.query('age >= 18 and score > 85')

# 引用外部变量用 @
min_age = 21
min_score = 80
result = df.query('age >= @min_age and score >= @min_score')

# 字符串匹配
result = df.query('name.str.startswith("A")')

# in 操作
result = df.query('grade in ["A", "B"]')

# 索引上的查询(index 级别用反引号引用)
df_indexed = df.set_index('name')
result = df_indexed.query('age > 20') # 列名直接引用

6.4 完整链式示例:数据管道

def load_and_clean(filepath):
"""生产级数据加载与清洗管道"""
return (pd.read_csv(filepath, parse_dates=['date'])
.rename(columns=str.lower) # 列名小写化
.dropna(subset=['date', 'amount']) # 去除关键列缺失的行
.assign(
amount=lambda d: pd.to_numeric(d['amount'], errors='coerce'),
year=lambda d: d['date'].dt.year,
month=lambda d: d['date'].dt.month,
)
.query('amount > 0')
.drop_duplicates(subset=['date', 'account_id'])
.astype({'account_id': 'category', 'type': 'category'})
.sort_values('date')
.reset_index(drop=True)
)

# 使用
cleaned = load_and_clean('transactions.csv')

七、eval / query 的高性能评估

7.1 DataFrame.eval

DataFrame.eval 使用 numexpr 库在底层对表达式求值,可以避免中间数组分配,尤其是在计算涉及多个列时:

df = pd.DataFrame(np.random.randn(1_000_000, 4), columns=['a', 'b', 'c', 'd'])

# 普通方式:每一步都创建中间数组
%timeit df['a'] + df['b'] * df['c'] / (df['d'] + 1)

# eval 方式:按表达式一次性计算
%timeit df.eval('a + b * c / (d + 1)')
# 典型加速:1.5x - 3x(数据集越大,优势越明显)

# eval 支持赋值
df.eval('ratio = a / (b + 0.001)', inplace=True)
# 现在 df 中多了 'ratio' 列

# 多表达式用分号分隔
df.eval('''
x = a + b
y = c - d
z = x / (y + 1)
''', inplace=True)

7.2 使用局部变量和 @ 前缀

threshold = 0.5
df.eval('a > @threshold') # 局部变量加 @
df.eval('flag = a > @threshold', inplace=True)

7.3 运算符对照

运算符 等效写法
+ - * / ** % 算术,同 Python
== != < <= > >= 比较
& | ~ 逻辑与、或、非
and or not 等效的单词形式
sin cos sqrt abs log exp 数学函数
str.contains 字符串方法

八、数据库互操作 —— read_sql / to_sql

8.1 SQLAlchemy 集成

Pandas 通过 SQLAlchemy 的 Engine 对象连接各种数据库:

from sqlalchemy import create_engine

# 创建数据库引擎
engine = create_engine('postgresql://user:password@localhost:5432/mydb')
# engine = create_engine('mysql+pymysql://user:pass@localhost/mydb')
# engine = create_engine('sqlite:///mydb.db')

# 读取整个表
df = pd.read_sql_table('users', engine)

# 读取 SQL 查询结果
df = pd.read_sql_query('''
SELECT department, AVG(salary) as avg_salary
FROM employees
WHERE year = 2024
GROUP BY department
HAVING AVG(salary) > 50000
ORDER BY avg_salary DESC
''', engine)

# read_sql 会自动判断表名还是 SQL 语句
df = pd.read_sql('SELECT * FROM users WHERE active = 1', engine)

8.2 写入数据库

# 写入(新表或替换)
df.to_sql('processed_data', engine,
if_exists='replace', # 'fail', 'replace', 'append'
index=False, # 不写入 DataFrame 的索引
chunksize=1000, # 分批写入(大表)
method='multi') # 多行 INSERT(某些 DB 更快)

# 使用 dtype 参数控制列类型
from sqlalchemy.types import Integer, String, Float, DateTime
df.to_sql('table_name', engine,
dtype={'id': Integer, 'name': String(100), 'amount': Float(precision=2)},
if_exists='append',
index=False)

8.3 分块读取大查询

# 使用 chunksize 逐块处理(避免一次占用太多内存)
chunks = []
for chunk in pd.read_sql_query(
'SELECT * FROM huge_table WHERE date >= "2024-01-01"',
engine,
chunksize=10000
):
# 逐块处理
processed_chunk = chunk.groupby('category').agg({'amount': 'sum'})
chunks.append(processed_chunk)

final_result = pd.concat(chunks).groupby('category').sum()

九、时间序列 —— resample 与时间处理

9.1 生成日期范围

# 日频
dates = pd.date_range('2024-01-01', '2024-12-31', freq='D')

# 交易日(B = business day)
biz_dates = pd.date_range('2024-01-01', periods=252, freq='B')

# 小时级
hours = pd.date_range('2024-01-01', periods=24*7, freq='H')

# 使用 offset 别名
# 'D'=日, 'B'=交易日, 'W'=周, 'M'=月尾, 'MS'=月初
# 'Q'=季尾, 'QS'=季初, 'A'=年尾, 'AS'=年初
# 'H'=时, 'T'或'min'=分, 'S'=秒

# 自定义频率
custom_freq = pd.date_range('2024-01-01', periods=10, freq='2W-WED') # 每隔两周的周三

9.2 resample —— 时间重采样

resample 是时间序列聚合的核心工具,将时间序列从一个频率转换为另一个频率:

# 生成分钟级数据
idx = pd.date_range('2024-01-01', periods=10000, freq='T')
ts = pd.Series(np.random.randn(10000).cumsum(), index=idx, name='value')

# 降采样:分钟 → 小时
hourly = ts.resample('H').agg(['mean', 'min', 'max', 'std', 'first', 'last'])

# 升采样:分钟 → 秒,需要插值
seconds = ts.resample('30S').interpolate(method='linear')

# OHLC 专用方法(金融数据)
ohlc = ts.resample('H').ohlc() # 返回 open, high, low, close

9.3 自定义重采样规则

# 加权时区感知
ts_utc = ts.tz_localize('UTC')

# 使用标签方向
ts.resample('H', label='right') # 用区间右侧时间作为标签
ts.resample('H', label='left') # 用区间左侧时间作为标签(默认)

# closed:区间闭合侧
ts.resample('H', closed='right') # (t-H, t]
ts.resample('H', closed='left') # [t, t+H) 默认

# 非均等聚合:通过自定义函数
def weighted_mean(x):
return (x * np.linspace(1, 2, len(x))).mean() # 后段权重大

ts.resample('H').apply(weighted_mean)

9.4 时间属性与操作

# 提取日期成分
df = pd.DataFrame({'date': pd.date_range('2024-01-01', periods=100)})
df['year'] = df['date'].dt.year
df['month'] = df['date'].dt.month
df['day'] = df['date'].dt.day
df['weekday'] = df['date'].dt.weekday # 0=周一
df['day_name'] = df['date'].dt.day_name() # 'Monday'
df['quarter'] = df['date'].dt.quarter
df['is_quarter_end'] = df['date'].dt.is_quarter_end
df['days_in_month'] = df['date'].dt.days_in_month

# 时间偏移
df['next_month'] = df['date'] + pd.DateOffset(months=1)
df['prev_biz_day'] = df['date'] - pd.offsets.BDay(1)

# 日期差
df['days_since'] = (df['date'] - pd.Timestamp('2024-01-01')).dt.days

9.5 shift / diff / pct_change —— 滞后与变化

ts = pd.Series(np.random.randn(100).cumsum() + 100)

# shift:滞后值
ts_lag1 = ts.shift(1) # 前一天的
ts_lag5 = ts.shift(5) # 5 天前的
ts_lead1 = ts.shift(-1) # 后一天

# diff:一阶差分
ts_diff1 = ts.diff(1) # 日变化
ts_diff5 = ts.diff(5) # 5 日变化

# pct_change:百分比变化
ts_pct = ts.pct_change() # 日收益率
ts_pct5 = ts.pct_change(5) # 5 日收益率

十、绘图集成 —— plot

Pandas 内置了基于 Matplotlib 的绘图接口,通过 DataFrame.plotSeries.plot 方法可以快速生成各种图表:

import matplotlib.pyplot as plt

# 生成数据
df = pd.DataFrame({
'sales': np.random.randint(100, 300, 100),
'cost': np.random.randint(50, 150, 100),
'marketing_spend': np.random.randint(10, 50, 100),
})

# ---- 折线图 ----
df.plot(figsize=(12, 6), title='Sales & Cost Over Time')
plt.show()

# 子图
df.plot(subplots=True, layout=(3, 1), figsize=(10, 8), sharex=True)

# ---- 柱状图 ----
df.iloc[:10].plot.bar(figsize=(12, 5), title='First 10 Days')
df.iloc[:10].plot.barh(figsize=(12, 5)) # 水平柱状图
df.iloc[:10].plot.bar(stacked=True) # 堆叠柱状图

# ---- 直方图 ----
df.plot.hist(alpha=0.5, bins=20, figsize=(10, 6))

# ---- 箱线图 ----
df.plot.box(figsize=(8, 6))

# ---- 密度图 ----
df.plot.density(figsize=(10, 6))

# ---- 散点图 ----
df.plot.scatter(x='sales', y='cost', c='marketing_spend',
colormap='viridis', figsize=(8, 6),
title='Cost vs Sales')

# ---- 面积图 ----
df.plot.area(figsize=(10, 6), alpha=0.6, stacked=False)

# ---- 饼图 ----
df_agg = df.sum() # Series
df_agg.plot.pie(autopct='%1.1f%%', figsize=(8, 8))

配合时间序列索引:

ts = pd.Series(np.random.randn(252).cumsum(), 
index=pd.date_range('2024-01-01', periods=252, freq='B'))

ts.plot(figsize=(14, 5), title='Cumulative Return')
# 自动使用索引作为 x 轴标签

十一、Styler —— DataFrame 格式化输出

Styler 类允许对 DataFrame 的 HTML 输出(Jupyter Notebook / Web)进行丰富的格式化,包括条件颜色、渐变、条形图和工具提示等。

11.1 基本格式化

df = pd.DataFrame({
'product': ['Widget A', 'Widget B', 'Widget C', 'Widget D'],
'revenue': [1250000, 890000, 2150000, 560000],
'growth_pct': [0.125, -0.032, 0.215, 0.058],
'margin': [0.45, 0.38, 0.52, 0.41],
})

styled = (df.style
.format({
'revenue': '¥{:,.0f}',
'growth_pct': '{:+.2%}',
'margin': '{:.1%}',
})
.hide(axis='index') # 隐藏索引
.set_caption('2024 Q2 产品销售报告') # 表格标题
)
# 在 Jupyter 中直接显示 styled 对象即可

11.2 条件背景色和文字色

import seaborn as sns

# 基于值着色
styled = df.style.background_gradient(subset=['revenue'], cmap='Blues')
styled = df.style.background_gradient(subset=['growth_pct'],
cmap=sns.diverging_palette(10, 240, n=20),
vmin=-0.2, vmax=0.3)

# 文字颜色
styled = df.style.text_gradient(subset=['margin'], cmap='RdYlGn')

# 离散条件
def color_negative_red(val):
color = 'red' if val < 0 else 'green'
return f'color: {color}'

styled = df.style.applymap(color_negative_red, subset=['growth_pct'])

# 行级样式
def highlight_max(s):
is_max = s == s.max()
return ['background-color: yellow' if v else '' for v in is_max]

styled = df.style.apply(highlight_max, subset=['revenue', 'growth_pct', 'margin'])

11.3 内置条形图

styled = df.style.bar(subset=['revenue'], color='#5fba7d')
styled = df.style.bar(subset=['growth_pct'],
color=['#d65f5f', '#5fba7d'], # [负值色, 正值色]
align='mid')

11.4 自定义多条件着色

def traffic_light(val):
"""三色灯:红 / 黄 / 绿"""
if val > 0.15:
color = '#4CAF50' # 绿色
elif val > 0:
color = '#FFC107' # 黄色
else:
color = '#F44336' # 红色
return f'background-color: {color}; color: white'

styled = df.style.applymap(traffic_light, subset=['growth_pct'])

11.5 导出样式

# 导出为 HTML
html = styled.to_html()

# 导出为 Excel 并保持格式(需要 openpyxl)
styled.to_excel('report.xlsx', engine='openpyxl')

十二、生产级数据流水线

实际项目中的数据处理往往涉及多个步骤:加载、清洗、转换、特征工程、聚合、输出。良好的管道设计应满足:可读性、可测试性、可复用性和性能。

12.1 设计模式:函数式管道

from typing import List, Callable
import pandas as pd

Step = Callable[[pd.DataFrame], pd.DataFrame]

def pipeline(steps: List[Step]) -> Step:
"""将多个处理步骤组合成一个管道函数"""
def run(df: pd.DataFrame) -> pd.DataFrame:
for step in steps:
df = step(df)
return df
return run

# 定义各个步骤
def parse_dates(df: pd.DataFrame) -> pd.DataFrame:
df['date'] = pd.to_datetime(df['date'], errors='coerce')
return df

def validate_amount(df: pd.DataFrame) -> pd.DataFrame:
df['amount'] = pd.to_numeric(df['amount'], errors='coerce')
return df[df['amount'] > 0]

def add_features(df: pd.DataFrame) -> pd.DataFrame:
return df.assign(
year=df['date'].dt.year,
month=df['date'].dt.month,
day_of_week=df['date'].dt.dayofweek,
amount_log=np.log(df['amount'] + 1),
)

def clean_strings(df: pd.DataFrame) -> pd.DataFrame:
string_cols = df.select_dtypes(include='object').columns
for col in string_cols:
df[col] = df[col].str.strip().str.lower()
return df

# 组装管道
processing_pipeline = pipeline([
parse_dates,
validate_amount,
add_features,
clean_strings,
])

# 运行
df_raw = pd.read_csv('raw_data.csv')
df_clean = processing_pipeline(df_raw)

12.2 带日志的管道

import logging
from functools import wraps

logger = logging.getLogger(__name__)

def log_step(func):
"""装饰器:记录处理前后 DataFrame 的形状"""
@wraps(func)
def wrapper(df: pd.DataFrame, *args, **kwargs) -> pd.DataFrame:
logger.info(f"[{func.__name__}] 输入: {df.shape}")
result = func(df, *args, **kwargs)
logger.info(f"[{func.__name__}] 输出: {result.shape}")
return result
return wrapper

@log_step
def remove_duplicates(df):
return df.drop_duplicates(subset=['date', 'user_id'])

@log_step
def filter_recent(df, days=90):
cutoff = pd.Timestamp.now() - pd.Timedelta(days=days)
return df[df['date'] >= cutoff]

12.3 大数据的分块处理

当数据量超过内存时,使用 chunksize 参数分块读取和处理:

def process_in_chunks(filepath: str, chunksize: int = 100_000):
"""分块处理超大 CSV 文件"""
chunk_results = []

for chunk in pd.read_csv(filepath, chunksize=chunksize,
parse_dates=['date']):
# 每块的管道处理
processed = (chunk
.pipe(validate_amount)
.pipe(add_features)
.pipe(clean_strings)
)

# 逐块聚合(仅保留聚合结果以节省内存)
summary = processed.groupby(['year', 'month', 'category']) \
.agg({'amount': ['sum', 'count']})
chunk_results.append(summary)

# 合并所有块的聚合结果
final = pd.concat(chunk_results).groupby(level=[0,1,2]).sum()
return final

12.4 使用 Dask 进行大规模 Pandas 操作

当数据量超过单机内存(例如 10GB+ 的 CSV)且分块处理不够方便时,可以使用 Dask DataFrame,它提供了与 Pandas 几乎相同的 API,但延迟计算,可分布式执行:

import dask.dataframe as dd

# 读取(延迟加载,不立即执行)
ddf = dd.read_csv('huge_dataset/*.csv', parse_dates=['timestamp'])

# 操作与 Pandas 几乎相同
ddf['year'] = ddf['timestamp'].dt.year
ddf_clean = ddf[ddf['amount'] > 0]
grouped = ddf_clean.groupby(['year', 'category'])['amount'].agg(['sum', 'mean'])

# 触发计算(此时才开始执行全部 DAG)
result = grouped.compute() # 返回 Pandas DataFrame

12.5 内存优化技巧

def optimize_dtypes(df):
"""自动优化 DataFrame 的 dtype 以减少内存占用"""
for col in df.columns:
col_type = df[col].dtype

if col_type == 'object':
# 尝试转为 category
num_unique = df[col].nunique()
num_total = len(df[col])
if num_unique / num_total < 0.5: # 重复率 > 50%
df[col] = df[col].astype('category')
elif 'int' in str(col_type):
# 降级整数类型
c_min, c_max = df[col].min(), df[col].max()
if c_min >= 0:
if c_max < 255:
df[col] = df[col].astype('uint8')
elif c_max < 65535:
df[col] = df[col].astype('uint16')
elif c_max < 4294967295:
df[col] = df[col].astype('uint32')
else:
if c_min > -128 and c_max < 127:
df[col] = df[col].astype('int8')
elif c_min > -32768 and c_max < 32767:
df[col] = df[col].astype('int16')
elif c_min > -2147483648 and c_max < 2147483647:
df[col] = df[col].astype('int32')
elif 'float' in str(col_type):
# 降级浮点类型
df[col] = pd.to_numeric(df[col], downcast='float')

return df

# 查看优化效果
df = pd.DataFrame({'id': range(1000000), 'value': np.random.random(1000000)})
print(f"优化前: {df.memory_usage(deep=True).sum() / 1024**2:.1f} MB")
df_optimized = optimize_dtypes(df)
print(f"优化后: {df_optimized.memory_usage(deep=True).sum() / 1024**2:.1f} MB")
# 典型结果: 优化前 ~15 MB, 优化后 ~8 MB

十三、高级操作示例

13.1 merge / join 的各种模式

# 模拟订单和用户表
orders = pd.DataFrame({
'order_id': [1, 2, 3, 4],
'user_id': [1, 2, 1, 5],
'amount': [100, 200, 150, 300],
})
users = pd.DataFrame({
'user_id': [1, 2, 3],
'name': ['Alice', 'Bob', 'Charlie'],
'vip': [True, False, True],
})

# inner join (默认)
merged = pd.merge(orders, users, on='user_id', how='inner')

# left join
merged = pd.merge(orders, users, on='user_id', how='left')
# user_id=5 的订单被保留,name 和 vip 为 NaN

# 多键合并
pd.merge(left, right, on=['key1', 'key2'], how='outer')

# 索引 join(使用 left_index / right_index)
orders_indexed = orders.set_index('user_id')
users_indexed = users.set_index('user_id')
result = orders_indexed.join(users_indexed, how='inner')

# validate 参数:验证合并键的唯一性
pd.merge(orders, users, on='user_id', validate='m:1') # 多对一

13.2 使用 transform 进行组内转换

transformagg 不同——agg 返回缩小形状的聚合结果,transform 返回与输入相同形状的广播结果:

df = pd.DataFrame({
'department': ['HR', 'HR', 'Tech', 'Tech', 'Tech', 'Sales', 'Sales'],
'employee': ['A','B','C','D','E','F','G'],
'salary': [5000, 6000, 8000, 9000, 10000, 5500, 6500],
})

# 添加部门均值列
df['dept_avg'] = df.groupby('department')['salary'].transform('mean')

# 计算 Z-score(组内标准化)
df['salary_zscore'] = df.groupby('department')['salary'].transform(
lambda x: (x - x.mean()) / x.std()
)

# rank:组内排名
df['dept_rank'] = df.groupby('department')['salary'].transform('rank', ascending=False)

13.3 使用 cut / qcut 进行分箱

ages = np.random.randint(0, 90, 100)

# cut:等宽分箱
bins = [0, 18, 35, 50, 65, 100]
labels = ['未成年', '青年', '中年', '中老年', '老年']
age_cat = pd.cut(ages, bins=bins, labels=labels)

# qcut:等频分箱(按分位数)
age_quartile = pd.qcut(ages, q=4, labels=['Q1', 'Q2', 'Q3', 'Q4'])

# 统计各箱的数量
print(pd.value_counts(age_cat))

13.4 使用 explode 展开列表列

df = pd.DataFrame({
'user': ['Alice', 'Bob'],
'tags': [['python', 'ml'], ['java', 'spring', 'k8s']],
})

# 每行的一个 tag 展开为单独一行
exploded = df.explode('tags')
print(exploded)
# user tags
# 0 Alice python
# 0 Alice ml
# 1 Bob java
# 1 Bob spring
# 1 Bob k8s

十四、总结

Pandas 远不止读 CSV + groupby。经过本文的梳理,你应该已经掌握了以下专家级工具:

  1. MultiIndex:通过多级索引在二维 DataFrame 中表示高维数据,配合 xsIndexSlice 灵活切片。
  2. 窗口函数rolling(固定窗口)、expanding(累积窗口)、ewm(指数加权窗口),是时序特征工程的核心。
  3. 数据重塑pivot/pivot_table 宽化、melt 长化、stack/unstack 翻层、crosstab 交叉统计。
  4. 分类数据类型category dtype 大幅降低字符型列的内存占用(可达 98%),且支持有序类别和图统计。
  5. 方法链式编程pipe + assign + query 的组合,使数据处理流水线声明式、无中间变量。
  6. eval / query:利用 numexpr 加速表达式求值,避免中间数组分配。
  7. 数据库互操作:通过 SQLAlchemy + read_sql / to_sql 无缝连接各种数据库。
  8. 时间序列resample 重采样、shift/diff/pct_change 滞后特征、丰富的日期属性提取。
  9. 绘图DataFrame.plot 一键生成折线、柱状、散点、直方图、密度图等。
  10. Styler:HTML 表格的精细化格式化,条件着色、条形图、渐变背景。
  11. 生产管道:函数式管道设计、分块处理、dtype 优化、Dask 扩展。

掌握这些工具后,你可以将大部分数据清洗和特征工程任务表示为声明式的 Pandas 管道,而非零散的探索性代码,从而显著提高代码的可维护性和执行效率。

参考资料

文章作者: Leo·Cheung
文章链接: http://tufusi.com/2021/09/10/%E3%80%90Python%E7%B3%BB%E5%88%97%E3%80%91Pandas%E6%9D%83%E5%A8%81%E6%8C%87%E5%8D%97/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 ONE·PIECE
打赏
  • 微信
  • 支付宝

评论