目录
  1. 1. 一、为什么需要分布式计算
  2. 2. 二、MapReduce编程模型
  3. 3. 三、Shuffle的深入解析
  4. 4. 四、容错机制
  5. 5. 五、Spark对MapReduce的超越
  6. 6. 六、分布式文件系统的设计原则
  7. 7. 七、现代分布式计算生态
  8. 8. 八、面试常见追问
系统设计之分布式计算系统 - Map Reduce的原理与应用

一、为什么需要分布式计算

在单机计算能力达到物理极限后,横向扩展(Scale Out)成为处理海量数据的唯一选择。试想一下,如果要对一PB(1024 TB)的日志数据做一次全文检索或聚合统计,即使使用最快的NVMe SSD(顺序读取速度约3GB/s),单机扫描一遍也需要约97个小时。但如果将数据分片到1000台机器上,每台处理1TB,理论上不到2分钟就能完成(忽略调度和通信开销)。这就是分布式计算的核心思想——分而治之,将大问题分解为小问题,分发到多个计算节点并行处理,最后归并结果。

分布式计算模型需要解决几个核心挑战。第一,并行化:如何将计算任务自然地拆分为可并行的子任务。第二,数据分布:如何将数据均匀地分布到计算节点,使得计算尽可能在数据所在节点进行(数据本地性)。第三,容错:在由数千台廉价PC组成的集群中,硬件故障是常态而非例外——硬盘每天都有可能有损坏、网络偶尔会断连、内存偶尔会出错(比特翻转)。分布式计算框架必须优雅地处理这些故障。第四,负载均衡:避免某些节点成为“慢节点”(Straggler)从而拖慢整个作业。

二、MapReduce编程模型

MapReduce是Google在2004年提出的分布式计算模型,其简洁优雅的设计影响了整整一代大数据处理框架。

MapReduce将计算过程抽象为两个阶段:Map阶段Reduce阶段,中间通过Shuffle(混洗)连接。用户只需实现map()reduce()两个函数,框架负责分布式调度、数据分区、容错等底层细节。

Map函数的签名是:map(key1, value1) -> list(key2, value2)。对输入的每条记录,生成零个或多个中间键值对。例如,单词计数程序的Map函数接收文档名和文档内容,输出<单词, 1>的中间对。

Reduce函数的签名是:reduce(key2, list(value2)) -> list(key3, value3)。对Map阶段产出的每个中间键,将其所有关联的值聚合。例如,Reduce函数接收单词和该单词所有计数的列表[1, 1, 1, ...],求和得到单词的总出现次数。

以单词计数(Word Count)为例:

// Map函数
public void map(String docName, String content) {
for (String word : content.split("\\s+")) {
emit(word, 1);
}
}

// Reduce函数
public void reduce(String word, List<Integer> counts) {
int sum = 0;
for (int count : counts) {
sum += count;
}
emit(word, sum);
}

在MapReduce的物理执行层面,整个过程细分为以下阶段:

输入分片(Input Split):输入数据被切分为固定大小的Split(通常64MB到256MB,与底层文件系统的块大小对齐)。GFS的默认块大小是64MB,每个Split对应一个Map任务。

Map执行:Master将Map任务调度到尽可能靠近数据所在ChunkServer的Worker节点上执行(数据本地性优化)。Map任务读取输入Split,调用用户定义的map函数,产生的中间键值对先写入内存缓冲区,缓冲区满后按照Reduce任务数量进行分区(默认使用hash(key) % R进行分区),并写入本地磁盘。

Shuffle与排序:当Map任务完成一定比例后,Reduce任务开始通过HTTP从各个Map任务的本地磁盘拉取属于自己分区的中间数据。由于Map任务和Reduce任务可能分布在不同的物理节点上,这个阶段涉及大量的网络传输。Reduce任务接收数据后,对所有中间键进行归并排序(外排序),使得相同键的数据聚集在一起。

Reduce执行:Reduce任务对排序后的数据进行迭代,对每个唯一的中间键调用用户定义的reduce函数。Reduce的输出直接写入GFS(通常每个Reduce任务产生一个输出文件)。

Output:所有Reduce任务完成后,作业输出为R个文件(R为Reduce任务数量),存储在分布式文件系统中。

三、Shuffle的深入解析

Shuffle是MapReduce中最核心也最复杂的阶段,对数以千计的节点的计算性能有决定性影响。

Map端Shuffle的过程:Map函数产生的输出首先写入一个环形的内存缓冲区(默认100MB,可通过io.sort.mb配置)。当缓冲区达到阈值(默认80%),启动溢写(Spill)线程将数据写入本地磁盘。写入前,溢写线程根据Reduce任务数量R对数据进行分区(Partition),并在每个分区内按键进行排序(Sort)。如果用户定义了Combiner(合并器),排序后对相同键的值进行局部合并。例如,在一个Map任务内,单词”hello”可能出现了50次,Combiner将其合并为<"hello", 50>而不是发送50个<"hello", 1>,大幅减少了需要传输的数据量。这种优化在单词计数这种具有可结合(Associative)和可交换(Commutative)性质的聚合中非常有效。

Reduce端Shuffle的过程:Reduce任务从每个已完成的Map任务处拉取(Copy)属于自己分区的数据。由于Reduce可能从数百甚至上千个Map任务处拉取数据,需要多线程并发拉取(默认5个并发线程)。拉取过来的数据块先放入内存缓冲区,当内存不足时溢写到磁盘。所有数据拉取完成后,进行归并排序(Merge Sort)——将多个已排序的文件和内存中的数据归并为一个全局有序的数据流。这个阶段被称为“Sort”阶段(虽然在早期Copy阶段就已经开始溢写和预排序)。

当Map任务数量非常大(如10000个)时,Reduce端需要打开大量的HTTP连接并维持大量的归并文件句柄,可能导致文件描述符耗尽。Hadoop中通过设置io.sort.factor参数控制一次归并的最大文件数,分批次归并(多路归并的多轮执行)。

Shuffle的大量数据在网络上传输,是MapReduce作业中最容易成为瓶颈的阶段。优化的关键手段包括:合理设置Reduce任务数(太少无法充分利用并行度,太多会产生过多小文件);启用Combiner减少Map输出;使用压缩减少网络传输量(如Snappy或LZ4压缩,解压快但压缩率适中);调整网络缓冲区大小以适应高吞吐传输。

四、容错机制

MapReduce的设计哲学是“将故障视为常态”。集群由数千台廉价PC组成,每台机器的平均无故障时间(MTBF)可能只有数百天。这意味着在一个1000台的集群中,平均每天都可能有一台机器出现故障。

Worker故障:Master周期性地向每个Worker发送心跳(Heartbeat),如果一定时间内未收到响应,Master将该Worker标记为失败节点。该Worker上已完成的所有Map任务需要重新执行——因为Map任务的输出存储在Worker的本地磁盘上,Worker故障导致这些中间数据不可访问。注意,已完成的Reduce任务不需要重新执行,因为Reduce的输出存储在GFS中(GFS有副本冗余)。当一个Map任务在Worker A上失败后,Master将其重新调度到Worker B上,所有Reduce任务会感知到需要从新的Worker B拉取数据。

Master故障:Master是单点,如果Master故障,整个作业失败。Google的MapReduce实现中,Master定期保存检查点(Checkpoint),记录作业的当前进度。当Master故障恢复后,从最近的检查点恢复执行。但在实际生产环境中,Master的故障率极低(因为只有一台),且Master可以将自身状态复制到备份节点。Hadoop 1.x中JobTracker是单点,YARN将此拆分为ResourceManager(仍为单点但可HA)和ApplicationMaster(每个应用一个,可重试)。

Straggler(拖后腿者)处理:在大型集群中,由于硬件问题(如磁盘坏道导致读取慢)、资源竞争(同机器上其他作业占用资源)或网络瓶颈,少数任务的执行速度远慢于平均。这些Straggler会拖慢整个作业,因为Reduce阶段需要等待所有Map任务完成。MapReduce使用推测执行(Speculative Execution)来解决这个问题:当作业接近完成时(如95%的Map任务已完成),Master检测到某些任务的进度显著慢于其他任务,则在另一个Worker上启动该任务的备份副本。无论原任务还是备份任务先完成,结果都被采纳,另一个任务被杀死。Hadoop中推测执行默认开启,但在生产环境中常对Reduce阶段关闭(因为Reduce的Output写入GFS,多个任务同时写入会产生冲突)。

五、Spark对MapReduce的超越

MapReduce模型虽然有开创性意义,但其批处理范式的缺陷也很明显。每次MapReduce作业都从磁盘读取输入、将中间结果写磁盘、最终输出写磁盘——大量的磁盘I/O使得MapReduce不适合迭代计算(如机器学习中的梯度下降需要多次迭代)。此外,MapReduce的编程模型过于受限——很多算法难以用map和reduce两步直接表达。

Spark通过弹性分布式数据集(Resilient Distributed Dataset,RDD)解决了这些问题。RDD是一个分区化的、不可变的分布式数据集合,存储在内存中(也可溢写到磁盘)。RDD通过血统(Lineage)信息实现容错:每个RDD都记录了它是如何从父RDD转换而来的(转换操作的DAG图),当某个分区的数据丢失时,根据Lineage从原始数据重新计算,而不需要像MapReduce那样依赖副本或检查点。

Spark的DAG执行引擎将一系列转换(map、filter、join、groupByKey等)优化为执行阶段(Stage),最大化流水线执行和内存复用。宽依赖(如groupByKey)会触发Shuffle,将Job拆分为不同的Stage;窄依赖(如map、filter)可以在同一Stage内流水线执行,中间结果保留在内存中,无需写入磁盘。

以下面这个典型的日志分析任务为例:

val logs = spark.textFile("hdfs://logs/")
val errors = logs.filter(_.contains("ERROR"))
val counts = errors.map(line => (parseDate(line), 1))
.reduceByKey(_ + _)
counts.saveAsTextFile("hdfs://output/")

Spark会将textFile → filter → map合并为一个Stage(窄依赖),reduceByKey作为第二个Stage(Shuffle后聚合),saveAsTextFile作为第三个Stage。MapReduce需要三个Job才能完成这个任务,而Spark在一个Job内完成,中间结果在Stage间通过内存交换,比MapReduce快10到100倍(尤其是在迭代计算中)。

六、分布式文件系统的设计原则

分布式存储是分布式计算的基础。谷歌文件系统GFS和Hadoop的HDFS都遵循以下设计原则:

大文件优先:典型的文件大小在数百MB到数GB。块大小(如GFS的64MB、HDFS的128MB)远大于传统文件系统(如Linux EXT4的4KB),减少了元数据的规模和查找开销。

追加写为主:文件主要被追加写入(Append-Only),很少被随机修改。这使得一致性模型可以简化,写入流水线可以高效设计。随机写入虽然支持,但性能不佳且不建议。

顺序读取优化:大文件的典型访问模式是顺序扫描(如MapReduce的输入读取),因此数据布局和预读策略都针对顺序I/O优化,而非随机I/O。

冗余存储:每块数据默认存储三份副本(HDFS的默认配置),分布在不同的机架(Rack)上,保证在任何单节点甚至整个机架故障时数据仍然可用。

计算向数据移动:相比于将数据移动到计算节点,将计算调度到数据所在节点成本更低(因为数据量大、网络带宽相对稀缺)。这是MapReduce进行数据本地性调度的基础。

单一Master + 多ChunkServer架构:Master管理所有元数据(文件命名空间、文件到块的映射、块到ChunkServer的映射),ChunkServer存储实际数据块。客户端与Master交互只获取元数据,所有的数据读写直接与ChunkServer交互,避免了Master成为数据吞吐的瓶颈。

七、现代分布式计算生态

从MapReduce出发,大数据计算生态已经进化出丰富的系统:

批处理:Apache Spark(基于内存的DAG引擎)、Apache Flink(支持批流一体,批处理视为有界流处理)、Presto/Trino(交互式SQL查询引擎,MPP架构,适合秒级的分析查询)。

流处理:Apache Flink(事件时间处理,精确一次语义(Exactly-Once),有状态算子)、Kafka Streams(嵌入应用内的轻量级流处理库)、Apache Storm(早期的流处理框架,逐条记录处理)。

资源管理:YARN(Hadoop生态的资源管理器)、Kubernetes(云原生时代的资源编排平台,越来越多的计算框架直接运行在K8s上)、Mesos(Apache的通用资源调度平台,已被K8s超越)。

SQL on Hadoop:Hive(MapReduce/Tez/Spark执行引擎上的SQL层,事实上的数据仓库标准)、Spark SQL(DataFrame/Dataset API,Catalyst优化器)、ClickHouse(面向OLAP的列式数据库,极高查询性能)。

统一引擎:Databricks的Lakehouse架构(基于Delta Lake,在数据湖上实现ACID事务和版本管理)、Apache Iceberg(数据湖表格式标准)、Snowflake(云原生数据仓库)。

八、面试常见追问

问题一:MapReduce的Combiner和Reducer有什么区别?

Combiner在Map端(数据刚从内存溢出至磁盘时)运行,做局部聚合;Reducer在Reduce端运行,做全局聚合。Combiner必须满足结合律和交换律(因为中间数据可能在多个Map任务中多次运行Combiner,也可能不运行——Combiner的调用次数是未定义的)。典型适用场景如求和、计数、最大值/最小值;不适用场景如求平均值(Combiner合并后丢失了计数信息,虽然可以通过<sum, count>对来变通)。Combiner的签名与Reducer完全相同,实际上通常直接复用Reducer的实现。

问题二:为什么Spark的迭代计算比MapReduce快很多?

核心原因是数据在内存中的驻留。MapReduce每次迭代都从HDFS读取输入、将输出写入HDFS,每次迭代都产生大量的磁盘I/O和序列化/反序列化开销。十次迭代意味着十次全量磁盘读写。Spark将RDD缓存(cache()persist())在内存中,十次迭代中只有第一次从HDFS读取,最后一次写入HDFS,中间八次直接在内存中转换。在逻辑回归训练的典型场景中,Spark比MapReduce快两个数量级。此外,Spark的DAG执行引擎通过流水线化窄依赖Stage,进一步减少了中间结果的物化开销。

问题三:推测执行在什么情况下不应该开启?

推测执行不适合具有副作用的操作。如果Reduce任务写入外部系统(如将结果写入MySQL或发送邮件),备份任务的执行会导致重复写入。Hadoop中推测执行默认对Map开启但对Reduce关闭(Hadoop 1.x)。另外,在集群资源紧张时,推测执行会消耗额外的计算资源,可能反而延长整体作业时间。对于资源紧张的环境,关闭推测执行、转而优化数据倾斜问题通常是更好的策略。

文章作者: Leo·Cheung
文章链接: http://tufusi.com/2024/04/05/%E7%B3%BB%E7%BB%9F%E8%AE%BE%E8%AE%A1%E4%B9%8B%E5%88%86%E5%B8%83%E5%BC%8F%E8%AE%A1%E7%AE%97%E7%B3%BB%E7%BB%9F/
版权声明: 本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自 ONE·PIECE
打赏
  • 微信
  • 支付宝

评论