MapReduce
MapReduce
KNOWUMapReduce的基本介绍
MapReduce: 分布式计算框架
分布式计算思想: 分而治之
1
2
3
4
5
6
7分而治之:
生活中: 搬砖 图书馆数书 计算从1~100和
整个分而治之思想主要有二大阶段:
分(map阶段): 将一个任务拆分为多个小的任务
合(reduce阶段): 将每个小的任务结果进行聚合汇总在一起1
2
3
4
5
6
7
8MapReduce既然是一个分布式计算框架, 必然需要有输入 和 输出, 数据在map执行之前进行读取数据, 在reduce之后将数据写出去
数据经历阶段:
1) 数据读取阶段: 不断持续的一直读取数据, 默认一行一行的读取数据, 每读取一行 就需要执行一次map的操作 数据传递 采用 kv方式: 读取过来数据 一般称为 k1和v1
2) map阶段: 接收k1和v1, 对数据进行处理, 形成新的键值对 k2和 v2
3) reduce阶段: 接收k2和v2 进行聚合统计操作, 然后转换为k3和v3
4) 数据输出阶段: 将k3和v3输出到目的地
MapReduce的编程模型
- 整个MapReduce编写步骤, 共计为8步: 天龙八部
map阶段:
- 读取数据: 将读取数据转换为k1和v1
- 自定义map逻辑: 接收k1和v1 , 将k1和v1转换为k2和v2
shuffle阶段:
- 分区: 将相同k2的数据, 发往同一个reduce, 保证相同k2的数据都在一个reduce中
- 排序: 对k2进行排序操作, 默认升序 字典序和数字顺序
- 规约: MR的优化工作, 提前聚合操作 可以省略
- 分组: 将相同k2的对应v2数据合并在一起, 形成一个集合
reduce阶段:
- 自定义reduce逻辑: 接收k2和v2 将其转换为 k3和v3
- 输出数据: 将k3和v3 输出目的地
MapReduce的入门案例
- 需求: 请统计指定单词文件中, 每个单词出现了多少次
- 代码实现
1 | 第一步: 自定义 map代码 |
- 创建maven项目. 导入相关的依赖:
1 | <dependencies> |
- 第一步: 自定义map逻辑的代码
1 | 1) 创建一个类, 继承mapper类 |
1 | package com.mr.wordCount; |
- 第二部: 自定义reduce逻辑代码
1 | 1) 创建一个类, 继承reducer类 |
1 | package com.mr.wordCount; |
- 第三部: 实现驱动类
1 | package com.mr.wordCount; |
- 简写方式:
1 | package com.mr.wordCount; |
- MapReduce的运行方式
本地运行
直接右键run即可, 输入和输出路径可以是本地 也可以是HDFS
1
2
3如果输入和输出都是本地路径, 那么不需要启动hadoop集群
如果输出和输出是HDFS路径, 需要保证HDFS已经启动了
集群化运行:
将MapReduce在yarn平台上执行
- 在MR的驱动类, 添加集群运行配置代码:
1
2job.setJarByClass(当前类的.class);
注意: 此配置必须配, 否则在yarn上执行的时候, 会报找不到主类- 在yarn上运行, 建议输入和输出路径采用HDFS路径, 不要使用本地路径
1
2TextInputFormat.addInputPath(job, new Path(args[0])); // 输入路径 args[0] 后期执行时候动态传递
TextOutputFormat.setOutputPath(job,new Path(args[1]));// 输出路径 args[1] 后期执行时候动态传递- 在pom文件中 添加打包插件
打包插件作用: 将当前程序所依赖的jar包, 一并打入到当前jar包中 (fatjar)
注意: 如果不加打包插件, 只会将自己编写代码进行打包, 依赖的jar包, 不会打入的
什么时候需要加入这个打包插件?
pom文件中使用的依赖包有非hadoop的jar包的时候
添加打包插件后, 如果有一些包, 不需要打入jar包中: 建议在依赖中添加
provided
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>- 执行打包:
- 由于在wordCount案例中, 所有的依赖包都是hadoop相关的jar包, 所以此处只需要使用小的jar包即可
- 将这个jar包上传到Linux系统中
- 执行 jar包
1
2
3
4
5
6
7
8
9数据准备工作
hdfs dfs -mkdir -p /wordcount/input
hdfs dfs -put wordcount.txt /wordcount/input
执行操作:
yarn jar wordCountMR.jar com.mr.wordCount.WordCountDriver hdfs://node1:8020/wordcount/input/wordcount.txt hdfs://node1:8020/wordcount/output
格式:
yarn jar jar包路径 驱动类包名+类名(权限类名) [args...]
hadoop jar jar包路径 驱动类包名+类名(权限类名) [args...]
MapReduce的原理
MapReduce的并行机制
- 并行机制: map 或者 reduce 在执行的时候, 会运行多少个map 和多少个reduce问题
MapTask并行度
【在执行MapReduce的时候, mapTask会运行多少个取决于什么呢?】
1 | mapTask数量取决于读取的数据量, 根据数据量不同, MapTask也会启动不同个 |
- reduce的并行度
1 | 在执行MapReduce的程序的时候, reduce有多少个, 取决于什么? 结果最终需要几个文件有关系, 说白了, 有几个reduce, 也就表示着有几个分区, 如果自定义分区, 那么分区的数量和reduce的相等 |
mapTask的工作机制
reduce的工作机制
进阶使用:自定义shuffle
MapReduce中自定义分区
MR中默认分区方案: 将相同的k2发往同一个reduce操作
MR如何确保这个事情的呢?
1 | 解决方案: 采用 hash取模计算法 |
- 需求: 将15及以上的结果以及15以下的结果进行分开成两个文件进行保存
- 实现思路流程:
- 代码实现:
- mapper类:
1 | package com.mr.partition; |
- 定义自定义分区类
1 | package com.mr.partition; |
- 定义reduce逻辑
1 | package com.mr.partition; |
- 定义驱动类:
- 一定要设置自己定义的分区类, 如果不设置, 默认MR会使用HashPartitioner进行分区操作,因为MR并不知道你设置了自定义分区
- 一定要设置reduce的数量, 因为reduce数量默认为1个, 如果不设置, 即使分区编号有多个, 最终也只能落在一个reduce上
1 | package com.mr.partition; |
- 右键执行测试即可:
MapReduce中自定义排序与序列化
序列化: 将数据转换为字节的过程
反序列化: 将字节内容转回到数据本身
如果数据能够支持被序列化, 也就说数据可以进行保存磁盘或者进行网络的传输工作
如何让一类或者让某个数据类型支持序列化呢?
1 | 在java中, 专门有一个序列化的接口: Serializable, 如果你想让一个类或者数据类型实现序列化的操作, 只需要让其这个类实现serializable接口即可 |
- 说明writable:
1 | 在writable类中, 有两个方法: |
- 排序: 默认MR的排序对k2进行排序工作, 升序排序, 排序规则: 字典序和数字序
- 需求:
1 | a 1 |
代码实现:
- 定义sortBean操作:
1 | package com.mr.sort; |
- 定义mapper类:
1 | package com.mr.sort; |
- 定义reduce类:
1 | package com.mr.sort; |
- 创建驱动类
1 | package com.mr.sort; |
MapReduce的规约操作
规约: 是MR中优化步骤, 主要目的是为了实现提前聚合操作, 减少从map 到reduce之间数据传输量
从对wordCount理解规约的过程, 说白了 对每一个map阶段数据, 进行提前聚合操作, 整个聚合操作基本与reduce聚合逻辑是一致的, 只不过规约是针对每一个map 而reduce是可以针对所有map的结果进行聚合
如何实现规约呢? 规约的逻辑和reduce的逻辑是一致
1 | 说明: |
- 需求: 有 三个书架 ,每个书架上都有5本书, 要求 统计出 每种分类的下有几本书??
1 | 1号书架 2号书架 3号书架 |
- 思考如何做: 相同k2的数据会被发往同一个reduce 相同k2的value数据合并为一个集合
1 | map阶段: k1:行偏移量 v1: 一本书 --> k2(类别) v2 (1) |
- 代码实现:
- 实现map逻辑
1 | package com.mr.combiner; |
- reduce代码:
1 | package com.mr.combiner; |
- combiner类
1 | package com.mr.combiner; |
- 驱动类
1 | package com.mr.combiner; |
- 效果演示:
- MapReduce中分组操作
分组: 将相同k2的value数据合并为一个集合操作
需求: 现在需要求出每一个订单中成交金额最大的一笔交易
需求分析流程:
如何自定义分组
1 | 1) 创建一个类, 继承 WritableComparator |
代码实现:
- 自定义数据类型
1 | package com.mr.group; |
- 自定义map
1 | package com.mr.group; |
- 自定义分区操作
1 | package com.mr.group; |
- 自定义reduce操作
1 | package com.mr.group; |
- 自定义分组操作
1 | package com.mr.group; |
- 驱动类
1 | package com.mr.group; |
- 综合案例_倒排索引
需求: 求每个单词在各个文件出现了多少次
案例流程分析:
代码实现:
如何获取文件名
1 | //4. 获取文件名 |
- 定义map程序
1 | package com.mr.index; |
- 自定义reduce程序
1 | package com.mr.index; |
- 自定义Driver类
1 | package com.mr.index; |
1 .MapReduce的综合练习
上网流量的统计_需求一
需求: 统计每个手机号的上行流量总和,下行流量总和,上行总流量之和,下行总流量之和
数据样例:
数据说明:
流程分析:
代码实现:
- 自定义数据类型:
1 | package com.mr.anli.flow; |
- mapper阶段代码
1 | package com.mr.anli.flow; |
- reduce代码:
1 | package com.mr.anli.flow; |
- 驱动类
1 | package com.mr.anli.flow; |
上网流量的统计_需求二
- 需求: 对需求一的结果中上行流量和倒序排序(递减排序)
代码实现:
- 自定义数据类型:
1 | package com.mr.anli.flow2; |
- 自定义map逻辑
1 | package com.mr.anli.flow2; |
- 自定义reduce逻辑
1 | package com.mr.anli.flow2; |
- 驱动类
1 | package com.mr.anli.flow2; |
社交粉丝案例
需求: 求出哪些人两两之间有共同好友,及他俩的共同好友都有谁? 好友关系都是单向的
1 | A:B,C,D,F,E,O |
实现思路:
1 | 样例数据: |
代码实现: 第一个需求, 求用户在那些人的好友列表中
- mapper类实现
1 | package com.mr.anli.friend; |
- reduce类的实现
1 | package com.mr.anli.friend; |
- 驱动类
1 | package com.mr.anli.friend; |
代码实现: 求共同好友
- mapper类的编写
1 | package com.mr.anli.friend; |
- recuceTask代码实现
1 | package com.mr.anli.friend; |
- driver类实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55package com.mr.anli.friend;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class FriendDriver2 {
public static void main(String[] args) throws Exception {
//1. 创建 job任务对象
Job job = Job.getInstance(new Configuration(), "FriendDriver2");
// 集群运行的必备参数
job.setJarByClass(FriendDriver2.class);
//2. 封装 job的任务 : 天龙八部
//2.1: 设置输入类 和输入的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\day07_MapReduce与yarn\\资料\\共同好友\\output"));
//2.2: 设置map类, 及其输出的k2和v2的类型
job.setMapperClass(FriendMapper2.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);
//2.3: 设置shuffle: 分区 排序 规约 分组 默认方案
//2.7: 设置reduce逻辑, reduce的输出k3和v3的类型
job.setReducerClass(FriendReducer2.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
//2.8: 设置输出类, 及其输出的路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day07_MapReduce与yarn\\资料\\共同好友\\output1"));
//3. 提交任务
boolean flag = job.waitForCompletion(true);
//4. 退出程序
System.exit(flag ?0 :1);
}
}

