NumPy(Numerical Python)是 Python 生态中科学计算与数据分析的基石库。它提供了高性能的多维数组对象 ndarray,以及丰富的数学函数、线性代数、随机数生成等工具。NumPy 底层以 C 和 Fortran 实现,对数组的操作在编译层面展开循环,因此比纯 Python 循环快 1-2 个数量级。本文从数组创建、数据类型、广播机制、索引切片、通用函数、线性代数、随机数、形状变换、einsum 和张量收缩、内存布局等角度全面讲解 NumPy,并给出 PCA 从零实现、图像卷积、蒙特卡洛求圆周率等完整示例。
一、ndarray —— NumPy 的核心数据结构 1.1 什么是 ndarray ndarray(N-dimensional array)是一个固定类型的多维容器,所有元素必须是同构的(homogeneous),即具有相同的 dtype。这一约束是 NumPy 高性能的根本原因:同构元素在内存中连续排布,无需指针间接寻址,CPU 缓存命中率极高。
与 Python list 的对比:
特性
Python list
ndarray
元素类型
异构(存指针)
同构(存值)
内存布局
指针数组 + 分散的对象
连续内存块
运算方式
解释循环
编译层向量化
内存占用
大(每个元素多一个 PyObject 头部)
小(纯数据,无对象开销)
创建一个 ndarray 最简单的方式是从 list 或 tuple 转换:
import numpy as npa = np.array([1 , 2 , 3 , 4 , 5 ]) print (a.shape) print (a.ndim) print (a.dtype) b = np.array([[1 , 2 , 3 ], [4 , 5 , 6 ]]) print (b.shape) print (b.ndim) c = np.array([[[1 , 2 ], [3 , 4 ]], [[5 , 6 ], [7 , 8 ]]]) print (c.shape)
1.2 创建数组的常用函数 NumPy 提供了丰富的工厂函数来创建数组,覆盖从零初始化到特定分布的各类场景:
np.zeros(10 ) np.zeros((3 , 4 ), dtype=np.int32) np.ones((2 , 5 )) np.full((3 , 5 ), 520 ) np.full_like(a, 7 ) np.arange(0 , 20 , 3 ) np.arange(0.0 , 1.0 , 0.1 ) np.linspace(0 , 10 , 5 ) np.linspace(0 , 1 , 11 ) np.logspace(0 , 4 , 5 ) np.geomspace(1 , 16 , 5 ) np.eye(4 ) np.eye(4 , dtype=np.int32) np.identity(4 ) np.diag([1 , 2 , 3 ]) np.diag(np.arange(1 , 5 )) np.random.seed(42 ) np.random.random((2 , 4 )) np.random.randint(4 , 9 , size=(3 , 5 )) np.random.randn(3 , 4 ) np.random.normal(loc=5 , scale=2 , size=(100 ,)) np.empty((3 , 4 )) np.empty_like(a) np.zeros_like(a) np.ones_like(a)
二、dtype —— 数据类型系统 2.1 dtype 的本质 dtype(data type)不仅描述数组中存的「是什么类型」,还精确规定了元素在内存中的字节数 、字节序 (endianness)、以及对于结构化数组而言的字段结构 。NumPy 支持的类型远多于 Python 内置类型:
类型
描述
字节数
简写
np.bool_
布尔值(True/False)
1
'?'
np.int8
有符号 8 位整数
1
'i1'
np.int16
有符号 16 位整数
2
'i2'
np.int32
有符号 32 位整数
4
'i4'
np.int64
有符号 64 位整数
8
'i8'
np.uint8
无符号 8 位(常用于图像)
1
'u1'
np.uint16
无符号 16 位
2
'u2'
np.float16
半精度浮点
2
'f2'
np.float32
单精度浮点
4
'f4'
np.float64
双精度浮点(默认)
8
'f8'
np.complex64
复数(32 位实部 + 32 位虚部)
8
'c8'
np.complex128
复数(64 位实部 + 64 位虚部)
16
'c16'
np.str_
定长 Unicode 字符串
可变
'U<n>'
np.bytes_
定长字节串
可变
'S<n>'
np.object_
Python 对象指针
8/平台
'O'
2.2 dtype 的指定与转换 a = np.array([1 , 2 , 3 ], dtype=np.float32) a = np.array([1.5 , 2.3 , 3.1 ], dtype=np.int32) a = np.zeros((3 , 4 ), dtype='i4' ) a = np.array([1 , 2 , 3 ], dtype='complex128' ) x = np.array([1 , 2 , 3 ], dtype=np.float64) y = x.astype(np.int32) z = x.astype(np.float32, copy=False ) print (a.dtype) print (a.itemsize) print (a.nbytes) print (a.dtype.name) print (a.dtype.kind)
2.3 结构化数组 当需要存储异构数据(类似 C 的 struct 或数据库的行)时,可以使用结构化数组:
dtype = np.dtype([ ('name' , 'U20' ), ('age' , 'i4' ), ('score' , 'f8' ), ]) students = np.array([ ('Alice' , 22 , 92.5 ), ('Bob' , 23 , 85.0 ), ('Charlie' , 21 , 88.5 ), ], dtype=dtype) print (students['name' ]) print (students['score' ].mean()) print (students[students['age' ] > 21 ]) print (students[0 ]['name' ]) print (students[0 ][0 ])
结构化数组虽然灵活,但在性能敏感场景下更推荐使用 Pandas。它的主要优势在于可与 C/Fortran 代码无缝交互,以及零拷贝内存映射。
当计算涉及不同 dtype 时,NumPy 会自动将结果提升为能安全容纳两者的类型:
a = np.array([1 , 2 , 3 ], dtype=np.int32) b = np.array([1.5 , 2.5 , 3.5 ], dtype=np.float64) c = a + b x = np.array([1 ], dtype=np.int8) y = np.array([1000 ], dtype=np.int16) z = x + y result = np.add(a, b, dtype=np.float32)
三、强大的索引系统 NumPy 的索引远比 Python list 丰富,支持基本索引 (返回视图)、高级索引 (返回副本)、布尔索引 以及 np.where / np.take 等实用函数。
3.1 基本索引与切片 基本索引使用整数、切片或 ...(Ellipsis),返回原数组的视图(view) ,即修改视图也会影响原数组:
a = np.arange(24 ).reshape(2 , 3 , 4 ) print (a)a[0 , 1 , 2 ] a[:, 0 , :] a[:, :, ::2 ] a[::-1 , ...] a[..., 2 ] view = a[0 :2 , 1 :3 , :] view[0 , 0 , 0 ] = 999 print (a[0 , 1 , 0 ])
3.2 花式索引(Fancy Indexing) 使用整数数组或列表进行索引,返回副本 而不是视图:
a = np.arange(10 ) ** 2 indices = [2 , 5 , 8 ] print (a[indices]) print (a[[1 , 1 , 3 , 3 , 7 , 7 ]]) b = np.arange(24 ).reshape(6 , 4 ) row_idx = np.array([0 , 2 , 4 ]) col_idx = np.array([1 , 2 , 3 ]) print (b[row_idx, col_idx]) rows = np.array([[0 , 0 ], [3 , 3 ]]) cols = np.array([[0 , 3 ], [0 , 3 ]]) print (b[rows, cols])rows, cols = np.ix_([0 , 2 , 4 ], [1 , 3 ]) print (b[rows, cols])
3.3 布尔索引(Boolean Masking) 布尔索引是数据筛选的利器,返回副本:
a = np.random.randn(100 ) mask = a > 0 print (a[mask]) print (a[a > 0 ]) mask = (a > -0.5 ) & (a < 0.5 ) print (a[mask])a[a < 0 ] = 0 a[a > 2 ] = 2 result = np.where(a > 0 , a, -a)
3.4 np.where / np.nonzero / np.take / np.choose a = np.arange(12 ).reshape(3 , 4 ) indices = np.where(a > 5 ) for row, col in zip (*indices): print (f"a[{row} , {col} ] = {a[row, col]} " ) a = np.arange(10 ) print (np.take(a, [0 , 2 , 4 , 6 , 8 ])) print (a.take([0 , 2 , 4 , 6 , 8 ])) np.put(a, [0 , 2 ], [-44 , -55 ]) print (a)
四、广播机制(Broadcasting)深度剖析 广播是 NumPy 最强大的特性之一,它允许不同形状的数组在算术运算中自动对齐。理解广播规则是写出高效向量化代码的关键。
4.1 广播的三条规则 NumPy 从右向左比较两个数组的 shape,当满足以下条件时触发广播:
规则一 :如果两个数组的维数不同,在维数较少的数组的形状左侧补 1。
规则二 :如果两个数组的某个维度大小不一致,但其中一个为 1,则将该维度拉伸为与对方相同。
规则三 :如果两个数组在某个维度上大小均不为 1 且不相等,则抛出 ValueError。
a = np.array([[1 ], [2 ], [3 ]]) b = np.array([[10 , 20 , 30 , 40 ]]) c = a + b print (c)
4.2 广播的经典应用场景 a = np.arange(10 ) b = a * 3 data = np.random.randn(100 , 5 ) centered = data - data.mean(axis=0 ) zscored = (data - data.mean(axis=0 )) / data.std(axis=0 ) x = np.array([1 , 2 , 3 ]) y = np.array([10 , 20 ]) outer = x[:, np.newaxis] * y[np.newaxis, :] print (outer)A = np.random.randn(4 , 5 ) row_bias = np.random.randn(4 ) result = A + row_bias[:, np.newaxis]
4.3 np.newaxis 与维度扩展 np.newaxis(等价于 None)用于在指定位置插入一个长度为 1 的新维度,是手工控制广播的得力工具:
a = np.array([1 , 2 , 3 ]) print (a[:, np.newaxis].shape) print (a[np.newaxis, :].shape) print (a[:, np.newaxis, np.newaxis].shape) print (np.expand_dims(a, axis=0 ).shape) print (np.expand_dims(a, axis=1 ).shape) print (np.expand_dims(a, axis=-1 ).shape)
五、通用函数(ufuncs) 5.1 什么是 ufunc ufunc(universal function)是对 ndarray 逐元素操作的向量化包装器。它们由 C 实现,性能远超 Python 循环。NumPy 内置了 60+ 个 ufunc,覆盖算术、三角函数、比较、逻辑等。
a = np.arange(10 ) np.sqrt(a) np.exp(a) np.log(a + 1 ) np.sin(a) np.abs (a) np.sign(a) b = np.arange(10 , 20 ) np.add(a, b) np.subtract(a, b) np.multiply(a, b) np.divide(a, b) np.power(a, 2 ) np.maximum(a, b) np.minimum(a, b) np.mod(a, b) np.greater(a, b)
5.2 ufunc 的 out 参数 out 参数允许将结果写入预先分配的内存,避免额外分配:
a = np.random.randn(1000000 ) b = np.random.randn(1000000 ) result = np.empty_like(a) np.add(a, b, out=result) temp = np.empty_like(a) final = np.sqrt(np.add(a, b, out=temp), out=result)
5.3 ufunc 的 reduce / accumulate / reduceat / outer 方法 每个 ufunc 都附带了四种额外方法,提供了灵活的操作维度:
a = np.arange(1 , 6 ) np.add.reduce(a) np.multiply.reduce(a) np.add.accumulate(a) np.multiply.accumulate(a) idx = [0 , 2 , 4 ] np.add.reduceat(a, idx) np.multiply.outer(a, a) np.add.outer(a, a)
5.4 自定义 ufunc(frompyfunc / vectorize) def step_function (x, threshold ): return 1 if x >= threshold else 0 vstep = np.vectorize(step_function, excluded=['threshold' ]) a = np.random.randn(10 ) print (vstep(a, threshold=0.5 ))ufunc_step = np.frompyfunc(lambda x: 1 if x >= 0 else 0 , 1 , 1 ) print (ufunc_step(np.arange(-3 , 4 )))
六、线性代数(numpy.linalg) numpy.linalg 模块提供了标准的线性代数运算,底层一般链接到 BLAS/LAPACK 实现。
6.1 矩阵乘法 A = np.random.randn(3 , 4 ) B = np.random.randn(4 , 5 ) C1 = np.dot(A, B) C2 = A.dot(B) C3 = A @ B x = np.array([1 , 2 , 3 ]) y = np.array([4 , 5 , 6 ]) np.dot(x, y) A.dot(x)
6.2 矩阵分解与求逆 A = np.array([[1 , 2 ], [3 , 4 ]]) A_inv = np.linalg.inv(A) print (A_inv)print (A @ A_inv) det = np.linalg.det(A) rank = np.linalg.matrix_rank(A) trace = np.trace(A) trace2 = np.linalg.trace(A) A = np.array([[3 , 1 ], [1 , 2 ]]) b = np.array([9 , 8 ]) x = np.linalg.solve(A, b) print (A @ x) A_over = np.array([[1 , 1 ], [1 , 2 ], [1 , 3 ]]) b_over = np.array([2 , 2.5 , 3.5 ]) x_lsq, residuals, rank, sv = np.linalg.lstsq(A_over, b_over, rcond=None )
6.3 特征值与特征向量 A = np.array([[1 , 2 , 3 ], [4 , 5 , 6 ], [7 , 8 , 9 ]]) eigenvalues, eigenvectors = np.linalg.eig(A) print ("特征值:" , eigenvalues)print ("特征向量(每列一个):\n" , eigenvectors)for i in range (3 ): v_i = eigenvectors[:, i] lambda_i = eigenvalues[i] left = A @ v_i right = lambda_i * v_i print (f"λ_{i} : 残差 = {np.linalg.norm(left - right):.2 e} " ) S = np.array([[4 , 1 ], [1 , 3 ]]) w, v = np.linalg.eigh(S) print ("特征值:" , w) print ("特征向量正交:" , np.abs (v[:,0 ] @ v[:,1 ]) < 1e-10 )
6.4 SVD(奇异值分解) SVD 是数据科学中最重要的矩阵分解,广泛应用于 PCA、推荐系统、图像压缩等:
A = np.random.randn(5 , 3 ) U, S, Vt = np.linalg.svd(A, full_matrices=False ) print ("U.shape:" , U.shape) print ("S:" , S) print ("Vt.shape:" , Vt.shape) Sigma = np.diag(S) A_reconstructed = U @ Sigma @ Vt print ("重构误差:" , np.linalg.norm(A - A_reconstructed))k = 2 A_approx = U[:, :k] @ np.diag(S[:k]) @ Vt[:k, :] print (f"秩-{k} 近似,相对误差: {np.linalg.norm(A - A_approx) / np.linalg.norm(A):.4 f} " )
6.5 其他 linalg 函数 np.linalg.norm(A, ord ='fro' ) np.linalg.norm(A, ord =2 ) np.linalg.norm(A, ord =np.inf) np.linalg.cond(A) L = np.linalg.cholesky(S) Q, R = np.linalg.qr(A) print ("Q 正交:" , np.allclose(Q.T @ Q, np.eye(Q.shape[1 ])))np.linalg.matrix_power(A, 3 )
七、随机数生成 —— Generator API 自 NumPy 1.17 起,推荐使用 np.random.default_rng() 获得的 Generator 对象,替代旧的全局 np.random.* 函数。Generator API 底层使用更先进的算法(如 PCG64,SFC64,Philox),提供更好的统计属性和性能。
7.1 创建 Generator rng = np.random.default_rng(42 ) from numpy.random import PCG64, Generatorrng = Generator(PCG64(42 )) from numpy.random import SeedSequencess = SeedSequence(12345 ) child_seeds = ss.spawn(4 ) generators = [np.random.default_rng(s) for s in child_seeds]
7.2 主要分布 rng = np.random.default_rng(42 ) rng.random((3 , 4 )) rng.uniform(low=-1 , high=2 , size=(10 ,)) rng.integers(low=0 , high=10 , size=5 ) rng.integers(low=5 , high=20 , size=(3 , 4 )) rng.standard_normal((100 ,)) rng.normal(loc=5 , scale=2 , size=(50 , 2 )) rng.exponential(scale=1.0 , size=100 ) rng.poisson(lam=3 , size=100 ) rng.binomial(n=10 , p=0.5 , size=100 ) rng.gamma(shape=2 , scale=1 , size=100 ) rng.beta(a=2 , b=5 , size=100 ) rng.chisquare(df=3 , size=100 ) a = np.arange(10 ) rng.shuffle(a) permuted = rng.permutation(a) samples = rng.choice(a, size=5 , replace=False ) samples = rng.choice(a, size=5 , replace=True , p=weights)
八、形状变换与数组拼接 8.1 reshape, flatten, ravel a = np.arange(24 ) b = a.reshape(2 , 3 , 4 ) c = a.reshape(3 , 8 ) d = a.reshape(-1 , 6 ) e = a.reshape(2 , -1 , 2 ) orig = np.arange(6 ).reshape(2 , 3 ) print (orig.flags['C_CONTIGUOUS' ]) view = orig.reshape(3 , 2 ) print (view.flags['C_CONTIGUOUS' ]) view[0 , 0 ] = 999 print (orig[0 , 0 ]) flat_copy = a.reshape(2 , 3 , 4 ).flatten() flat_copy[0 ] = 999 ravel_view = a.reshape(2 , 3 , 4 ).ravel() ravel_view[0 ] = 999 auto = a.reshape(2 , -1 )
8.2 transpose 与 swapaxes a = np.arange(24 ).reshape(2 , 3 , 4 ) b = a.transpose() b = a.T c = a.transpose(1 , 0 , 2 ) d = a.transpose(2 , 0 , 1 ) e = a.swapaxes(0 , 2 ) f = np.moveaxis(a, source=0 , destination=-1 )
8.3 拼接与堆叠 a = np.array([[1 , 2 ], [3 , 4 ]]) b = np.array([[5 , 6 ], [7 , 8 ]]) np.concatenate([a, b], axis=0 ) np.concatenate([a, b], axis=1 ) np.vstack([a, b]) np.hstack([a, b]) np.stack([a, b], axis=0 ) np.stack([a, b], axis=-1 ) np.dstack([a, b]) c = np.arange(12 ).reshape(3 , 4 ) np.split(c, 3 , axis=0 ) np.split(c, 2 , axis=1 ) first, second = np.split(c, [2 ], axis=1 ) np.hsplit(c, 2 ) np.vsplit(c, 3 ) np.tile([0 , 1 ], 3 ) np.tile([0 , 1 ], (2 , 2 )) np.repeat([0 , 1 ], 3 ) np.repeat([[1 ,2 ], [3 ,4 ]], 2 , axis=0 ) np.repeat([[1 ,2 ], [3 ,4 ]], 2 , axis=1 )
九、einsum —— 爱因斯坦求和约定 np.einsum 是 NumPy 中最强大也最优雅的函数之一。它使用爱因斯坦求和约定的记号,可以在一次调用中完成各种张量操作。虽然初看语法晦涩,但掌握后能大幅简化代码。
9.1 einsum 基本语法 基本形式:np.einsum(subscripts, *operands)
其中 subscripts 是一个字符串,用逗号分隔输入的下标,用 -> 分隔输出下标:
a = np.array([1 , 2 , 3 ]) b = np.array([4 , 5 , 6 ]) np.einsum('i,i->' , a, b) A = np.random.randn(3 , 4 ) x = np.random.randn(4 ) np.einsum('ij,j->i' , A, x) B = np.random.randn(4 , 5 ) np.einsum('ij,jk->ik' , A, B) np.einsum('i,j->ij' , a, b)
9.2 常用 einsum 模式总结 np.einsum('ij->ji' , A) np.einsum('ii->i' , A) np.einsum('ii->' , square_matrix) np.einsum('ij,ij->ij' , A, B) np.einsum('ij->i' , A) np.einsum('ij->j' , A) np.einsum('ij->' , A) np.einsum('bik,bkj->bij' , A_batch, B_batch) np.einsum('i,ij,j->' , x, A, y)
9.3 einsum 高级:张量收缩 T1 = np.random.randn(3 , 4 , 5 ) T2 = np.random.randn(5 , 6 , 4 ) result = np.einsum('ijk,klj->il' , T1, T2) print (result.shape) tensor = np.random.randn(2 , 3 , 4 ) matrix = np.random.randn(4 , 5 ) result = np.einsum('ijk,kl->ijl' , tensor, matrix) print (result.shape)
9.4 einsum 的 optimize 参数 result = np.einsum('abcd,cdef,efgh->abgh' , T1, T2, T3, optimize=True ) result = np.einsum('abcd,cdef,efgh->abgh' , T1, T2, T3, optimize='optimal' )
十、内存布局:C order vs Fortran order 10.1 行优先与列优先 NumPy 支持两种内存布局:
C order(行优先,row-major) :最后一维的变化最快。默认布局。
Fortran order(列优先,column-major) :第一维的变化最快。
a_c = np.array([[1 , 2 , 3 ], [4 , 5 , 6 ]], order='C' ) a_f = np.array([[1 , 2 , 3 ], [4 , 5 , 6 ]], order='F' ) print (a_c) print (a_f) print (a_c.flags['C_CONTIGUOUS' ]) print (a_f.flags['F_CONTIGUOUS' ])
10.2 布局对性能的影响 沿连续维度遍历通常快得多:
import timesize = 5000 a_c = np.random.randn(size, size) a_f = np.asfortranarray(np.random.randn(size, size)) start = time.perf_counter() s = 0 for i in range (size): for j in range (size): s += a_c[i, j] t_c = time.perf_counter() - start start = time.perf_counter() s = 0 for i in range (size): for j in range (size): s += a_f[i, j] t_f = time.perf_counter() - start print (f"C 连续按行遍历: {t_c:.4 f} s" )print (f"F 连续按行遍历: {t_f:.4 f} s" )
10.3 控制布局 a = np.ones((3 , 4 ), order='F' ) a_c = np.ascontiguousarray(a) a_f = np.asfortranarray(a) b = a.reshape(2 , 6 , order='C' ) b = a.reshape(2 , 6 , order='F' ) b = a.reshape(2 , 6 , order='A' ) print (a.flags.c_contiguous) print (a.flags.f_contiguous) print (a.flags.owndata)
10.4 跨库互操作时的布局 当 NumPy 与 OpenCV、PyTorch、TensorFlow 等库交互时,内存布局至关重要:
import cv2img = cv2.imread('photo.jpg' ) img_rgb = cv2.cvtColor(img, cv2.COLOR_BGR2RGB) img_float = np.ascontiguousarray(img_rgb.astype(np.float32))
十一、向量化 vs 循环 —— 性能实证 11.1 基准测试框架 import timeimport numpy as npdef benchmark (func, *args, n_runs=10 , **kwargs ): """运行 n_runs 次,取中位数时间""" times = [] for _ in range (n_runs): start = time.perf_counter() func(*args, **kwargs) times.append(time.perf_counter() - start) return np.median(times) def python_loop_sum_of_squares (n ): s = 0 for i in range (n): s += i ** 2 return s def numpy_vectorized_sum_of_squares (n ): return np.sum (np.arange(n) ** 2 )
11.2 不同规模下的性能对比 sizes = [1000 , 10000 , 100000 , 1000000 ] results = [] for n in sizes: t_loop = benchmark(lambda : sum (i**2 for i in range (n)), n_runs=3 ) t_numpy = benchmark(lambda : np.sum (np.arange(n)**2 ), n_runs=10 ) results.append((n, t_loop, t_numpy)) print (f"{'Size' :>12 } {'Loop (ms)' :>12 } {'NumPy (ms)' :>12 } {'Speedup' :>10 } " )for n, tl, tn in results: print (f"{n:>12 } {tl*1000 :>12.3 f} {tn*1000 :>12.3 f} {tl/tn:>10.1 f} x" )
11.3 避免 Python 循环的常见模式 result = [] for x in data: if x > 0 : result.append(x) result = data[data > 0 ] for i in range (len (data)): if data[i] < 0 : data[i] = 0 data[data < 0 ] = 0 data = np.where(data < 0 , 0 , data) result = np.empty_like(data) for i in range (len (data)): if data[i] < 0 : result[i] = 0 elif data[i] > 1 : result[i] = 1 else : result[i] = data[i] result = np.clip(data, 0 , 1 ) result = np.where(data < 0 , 0 , np.where(data > 1 , 1 , data)) cumulative = [0 ] for x in data: cumulative.append(cumulative[-1 ] + x) cumulative = np.cumsum(data) from numba import jit@jit(nopython=True ) def compute_with_dependency (data ): result = np.empty_like(data) result[0 ] = data[0 ] for i in range (1 , len (data)): result[i] = result[i-1 ] * 0.9 + data[i] return result
十二、完整示例一:PCA 从零实现 主成分分析(PCA)是数据降维的核心方法。以下用 NumPy 完整实现,展示 linalg.eig、矩阵运算、广播等核心 API 的实际运用:
import numpy as npclass PCA : """从零实现的 PCA,使用特征分解""" def __init__ (self, n_components ): self .n_components = n_components self .components_ = None self .mean_ = None self .explained_variance_ = None self .explained_variance_ratio_ = None def fit (self, X ): """ 拟合 PCA X: shape (n_samples, n_features) """ n_samples, n_features = X.shape self .mean_ = X.mean(axis=0 ) X_centered = X - self .mean_ covariance = (X_centered.T @ X_centered) / (n_samples - 1 ) eigenvalues, eigenvectors = np.linalg.eigh(covariance) eigenvalues = eigenvalues[::-1 ] eigenvectors = eigenvectors[:, ::-1 ] self .components_ = eigenvectors[:, :self .n_components].T self .explained_variance_ = eigenvalues[:self .n_components] self .explained_variance_ratio_ = ( self .explained_variance_ / eigenvalues.sum () ) return self def transform (self, X ): """将数据投影到低维空间""" X_centered = X - self .mean_ return X_centered @ self .components_.T def fit_transform (self, X ): """一步完成拟合 + 转换""" self .fit(X) return self .transform(X) def inverse_transform (self, Z ): """从低维表示重构原始数据""" return Z @ self .components_ + self .mean_ from sklearn.datasets import load_irisiris = load_iris() X = iris.data y = iris.target pca = PCA(n_components=2 ) Z = pca.fit_transform(X) print (f"原始形状: {X.shape} " ) print (f"降维后形状: {Z.shape} " ) print (f"方差解释率: {pca.explained_variance_ratio_} " ) print (f"累积方差解释率: {pca.explained_variance_ratio_.sum ():.4 f} " ) from sklearn.decomposition import PCA as SklearnPCApca_sk = SklearnPCA(n_components=2 ) Z_sk = pca_sk.fit_transform(X) print (f"与 sklearn 投影结果差异(绝对值最大): {np.abs (Z_sk - Z).max ():.6 e} " )
12.1 PCA 的 SVD 实现版本 上面的实现使用协方差矩阵的特征分解。更稳定的做法是直接对数据中心化后的矩阵做 SVD:
class PCA_SVD : """基于 SVD 的 PCA 实现""" def __init__ (self, n_components ): self .n_components = n_components self .components_ = None self .mean_ = None self .explained_variance_ = None self .explained_variance_ratio_ = None def fit (self, X ): self .mean_ = X.mean(axis=0 ) X_centered = X - self .mean_ U, S, Vt = np.linalg.svd(X_centered, full_matrices=False ) n_samples = X.shape[0 ] eigenvalues = S**2 / (n_samples - 1 ) self .components_ = Vt[:self .n_components] self .explained_variance_ = eigenvalues[:self .n_components] self .explained_variance_ratio_ = ( self .explained_variance_ / eigenvalues.sum () ) return self def transform (self, X ): X_centered = X - self .mean_ return X_centered @ self .components_.T def fit_transform (self, X ): self .fit(X) return self .transform(X) def inverse_transform (self, Z ): return Z @ self .components_ + self .mean_
十三、完整示例二:图像卷积 使用 NumPy 实现二维卷积,展示 strides(步幅)和窗口滑动操作。这个例子也揭示了 NumPy 对局部运算的支持方式:
import numpy as npdef conv2d_vectorized (image, kernel ): """ 向量化实现的二维卷积(valid 模式,无 padding) 参数: image: shape (H, W) kernel: shape (kH, kW) 返回: conv: shape (H - kH + 1, W - kW + 1) """ H, W = image.shape kH, kW = kernel.shape out_H = H - kH + 1 out_W = W - kW + 1 from numpy.lib.stride_tricks import as_strided item_size = image.itemsize shape = (out_H, out_W, kH, kW) strides = (image.strides[0 ], image.strides[1 ], image.strides[0 ], image.strides[1 ]) windows = as_strided(image, shape=shape, strides=strides) conv = np.sum (windows * kernel, axis=(2 , 3 )) return conv image = np.array([ [10 , 10 , 10 , 10 , 10 ], [10 , 50 , 50 , 50 , 10 ], [10 , 50 , 100 , 50 , 10 ], [10 , 50 , 50 , 50 , 10 ], [10 , 10 , 10 , 10 , 10 ], ], dtype=np.float64) sobel_x = np.array([[-1 , 0 , 1 ], [-2 , 0 , 2 ], [-1 , 0 , 1 ]], dtype=np.float64) gaussian = np.array([[1 , 2 , 1 ], [2 , 4 , 2 ], [1 , 2 , 1 ]], dtype=np.float64) / 16.0 result_sobel = conv2d_vectorized(image, sobel_x) result_gauss = conv2d_vectorized(image, gaussian) print ("原始图像:\n" , image)print ("\nSobel 水平边缘:\n" , result_sobel)print ("\n高斯模糊:\n" , result_gauss)
as_strided 创建的窗口视图仅是对原始数据的重新解释(零拷贝),因此即使图像很大,内存开销也极低。但使用 as_strided 时必须小心:如果 strides 设置不当,可能访问到越界内存。
13.1 使用 scipy 的卷积进行验证 from scipy.signal import convolve2dresult_scipy = convolve2d(image, sobel_x, mode='valid' ) print ("与 scipy 的差异:" , np.abs (result_sobel - result_scipy).max ())
十四、完整示例三:蒙特卡洛求圆周率 蒙特卡洛方法通过随机采样估计确定性量。求圆周率是经典的入门示例,但我们将展示如何用 NumPy 的批量运算将其效率推向极致:
import numpy as npimport timedef monte_carlo_pi (n_points, rng=None ): """ 蒙特卡洛法估计 π 思路:在 [-1, 1] x [-1, 1] 正方形内均匀撒点, 落在单位圆内的点的比例应趋近于 π/4 """ if rng is None : rng = np.random.default_rng() x = rng.uniform(-1 , 1 , n_points) y = rng.uniform(-1 , 1 , n_points) inside = x**2 + y**2 <= 1.0 n_inside = np.sum (inside) pi_estimate = 4.0 * n_inside / n_points return pi_estimate, n_inside, inside def monte_carlo_pi_batched (total_points, batch_size=10_000_000 , rng=None ): """ 分批计算,避免一次分配过多内存 """ if rng is None : rng = np.random.default_rng() n_inside_total = 0 remaining = total_points while remaining > 0 : n_this_batch = min (remaining, batch_size) _, n_inside, _ = monte_carlo_pi(n_this_batch, rng) n_inside_total += n_inside remaining -= n_this_batch return 4.0 * n_inside_total / total_points for n in [1_000 , 10_000 , 100_000 , 1_000_000 , 10_000_000 ]: start = time.perf_counter() pi_est, n_inside, _ = monte_carlo_pi(n) elapsed = time.perf_counter() - start error = abs (pi_est - np.pi) print (f"n={n:>12 ,} 估计值={pi_est:.8 f} 误差={error:.8 f} 耗时={elapsed:.4 f} s" )
这个例子完美展示了 NumPy 的优势:10,000,000 个点的运算在约 0.07 秒内完成(含随机数生成)。纯 Python 循环实现同样的逻辑将需要数十秒。
十五、实用函数与技巧 15.1 统计函数 a = np.random.randn(1000 ) np.mean(a) np.median(a) np.std(a) np.std(a, ddof=1 ) np.var(a) np.percentile(a, 50 ) np.percentile(a, [25 , 50 , 75 ]) np.quantile(a, [0.25 , 0.5 , 0.75 ]) b = np.random.randn(5 , 10 ) b.mean(axis=0 ) b.mean(axis=1 ) b.mean(axis=0 , keepdims=True ) np.cumsum(a) np.cumprod(a) np.min (a), np.max (a) np.argmin(a), np.argmax(a) np.ptp(a)
15.2 排序与搜索 a = np.array([3 , 1 , 2 , 5 , 4 ]) np.sort(a) a.sort() sorted_indices = np.argsort(a) data = np.array([[2 , 7 ], [1 , 3 ], [4 , 5 ]]) idx = np.argsort(data[:, 0 ]) sorted_data = data[idx] k = 3 top_k_values = np.partition(a, -k)[-k:] top_k_indices = np.argpartition(a, -k)[-k:] np.searchsorted([1 ,2 ,3 ,4 ,5 ], 3.5 ) np.searchsorted([1 ,2 ,3 ,4 ,5 ], [0 , 2.5 , 6 ])
15.3 集合运算 a = np.array([1 , 2 , 3 , 4 , 5 ]) b = np.array([3 , 4 , 5 , 6 , 7 ]) np.intersect1d(a, b) np.union1d(a, b) np.setdiff1d(a, b) np.setxor1d(a, b) np.in1d(a, b) np.isin(a, b) np.unique(a) np.unique(a, return_counts=True ) np.unique(a, return_index=True ) np.unique(a, return_inverse=True )
15.4 保存与加载 np.save('array.npy' , a) a_loaded = np.load('array.npy' ) np.savez('arrays.npz' , x=x_data, y=y_data, params=params) data = np.load('arrays.npz' ) print (data['x' ], data['y' ], data['params' ])np.savez_compressed('arrays_comp.npz' , x=x_data, y=y_data) np.savetxt('data.csv' , data, delimiter=',' , header='col1,col2,col3' ) loaded = np.loadtxt('data.csv' , delimiter=',' , skiprows=1 )
15.5 内存映射(memmap) 对于无法完整加载到内存的超大数组,可以使用 np.memmap 将磁盘上的二进制文件映射到虚拟内存:
mmap = np.memmap('large_array.dat' , dtype='float64' , mode='w+' , shape=(100000 , 1000 )) mmap[0 , :] = np.arange(1000 ) mmap.flush() mmap_ro = np.memmap('large_array.dat' , dtype='float64' , mode='r' , shape=(100000 , 1000 )) print (mmap_ro[0 , :5 ]) del mmap
十六、常见陷阱与最佳实践 16.1 视图 vs 副本 a = np.array([1 , 2 , 3 , 4 ]) b = a[1 :3 ] b[0 ] = 99 print (a) b = a[1 :3 ].copy() b[0 ] = 99 print (a) c = a[[1 , 2 ]] c[0 ] = 99 print (a)
16.2 布尔索引中的括号 a = np.array([1 , 2 , 3 , 4 , 5 ]) mask = (a > 2 ) & (a < 5 )
16.3 浮点比较 a = np.array([0.1 + 0.2 ]) print (a == 0.3 ) print (np.isclose(a, 0.3 )) print (np.isclose(a, 0.3 , atol=1e-8 , rtol=1e-5 ))print (np.allclose(a, 0.3 ))
16.4 axis 参数的理解 axis 指定了操作沿着哪个轴(维度)进行。关键记忆法:「axis=i 意味着沿着第 i 个轴做归约,该维度被消除」。
a = np.arange(24 ).reshape(2 , 3 , 4 ) a.sum (axis=0 ) a.sum (axis=1 ) a.sum (axis=2 ) a.sum (axis=(0 , 2 ))
16.5 不要滥用 np.append np.append 每次调用都会分配新的数组并复制数据,在循环中使用会导致 O(n²) 的时间复杂度:
result = np.array([]) for x in data: result = np.append(result, x) result = [] for x in data: result.append(x) result = np.array(result)
十七、总结 NumPy 是 Python 科学计算生态的基石。其核心价值在于:
ndarray :同构、多维、内存连续的数据容器。
广播(broadcasting) :无需显式循环即可对不同形状数组做逐元素运算。
通用函数(ufuncs) :编译层实现的向量化运算,性能比 Python 循环高 1-2 个数量级。
线性代数(linalg) :基于 BLAS/LAPACK 的标准矩阵运算,支持特征分解、SVD、求解线性方程组等。
einsum :用爱因斯坦求和约定优雅地表达各种张量运算。
灵活的索引 :基本索引(视图)、花式索引(副本)、布尔索引(筛选),满足各种数据访问需求。
掌握 NumPy 意味着你能够在 Python 中写出既简洁又高性能的数值计算代码。当 NumPy 的向量化不足以表达某些复杂算法(如前后依赖的循环)时,需要考虑 Numba / Cython 作为补充方案,但绝大多数数据处理任务都可以在 NumPy 内高效完成。
参考资料 :