MapReduce

MapReduce的基本介绍

  • MapReduce: 分布式计算框架

    • 分布式计算思想: 分而治之

      1
      2
      3
      4
      5
      6
      7
      分而治之:  
      生活中: 搬砖 图书馆数书 计算从1~100和

      整个分而治之思想主要有二大阶段:
      分(map阶段): 将一个任务拆分为多个小的任务

      合(reduce阶段): 将每个小的任务结果进行聚合汇总在一起
      1
      2
      3
      4
      5
      6
      7
      8
      MapReduce既然是一个分布式计算框架, 必然需要有输入 和 输出, 数据在map执行之前进行读取数据, 在reduce之后将数据写出去

      数据经历阶段:

      1) 数据读取阶段: 不断持续的一直读取数据, 默认一行一行的读取数据, 每读取一行 就需要执行一次map的操作 数据传递 采用 kv方式: 读取过来数据 一般称为 k1和v1
      2) map阶段: 接收k1和v1, 对数据进行处理, 形成新的键值对 k2和 v2
      3) reduce阶段: 接收k2和v2 进行聚合统计操作, 然后转换为k3和v3
      4) 数据输出阶段: 将k3和v3输出到目的地

MapReduce的编程模型

  • 整个MapReduce编写步骤, 共计为8步: 天龙八部
    • map阶段:

        1. 读取数据: 将读取数据转换为k1和v1
        1. 自定义map逻辑: 接收k1和v1 , 将k1和v1转换为k2和v2
    • shuffle阶段:

        1. 分区: 将相同k2的数据, 发往同一个reduce, 保证相同k2的数据都在一个reduce中
        1. 排序: 对k2进行排序操作, 默认升序 字典序和数字顺序
        1. 规约: MR的优化工作, 提前聚合操作 可以省略
        1. 分组: 将相同k2的对应v2数据合并在一起, 形成一个集合
    • reduce阶段:

        1. 自定义reduce逻辑: 接收k2和v2 将其转换为 k3和v3
        1. 输出数据: 将k3和v3 输出目的地

MapReduce的入门案例

  • 需求: 请统计指定单词文件中, 每个单词出现了多少次
MapReduce.assets/image-20210129101849644.png
  • 代码实现
1
2
3
第一步: 自定义 map代码
第二步: 自定义 reduce代码
第三步: 编写MR驱动类 (组装天龙八部)

  • 创建maven项目. 导入相关的依赖:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.7.5</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.13</version>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<version>3.0</version>
<configuration>
<source>1.8</source>
<target>1.8</target>
<encoding>UTF-8</encoding>
<!-- <verbal>true</verbal>-->
</configuration>
</plugin>
<!--
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.3</version>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<minimizeJar>true</minimizeJar>
</configuration>
</execution>
</executions>
</plugin>
-->
</plugins>
</build>
  • 第一步: 自定义map逻辑的代码
1
2
3
1) 创建一个类, 继承mapper类
2) 重写mapper类中map的方法
3) 在map方法中 实现map的逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.mr.wordCount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class WordCountMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
// map类被初始化的时候, 会执行的操作 只会执行一次
/*@Override
protected void setup(Context context) throws IOException, InterruptedException {

}*/
private IntWritable v2 = new IntWritable(1);
private Text k2 = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//1. 获取一行数据
String line = value.toString();

//2. 判断操作
if(line != null && !"".equals(line)){ // 如果能进来, 说明line中有数据的 千万不要丢掉 ! 符号

//3. 对数据执行切割操作
String[] words = line.split(" ");

//4. 遍历数组
for (String word : words) {

k2.set(word);

context.write(k2,v2);
}

}
}
// map类被关闭的时候, 会执行的操作, 只会执行一次 释放资源
/*@Override
protected void cleanup(Context context) throws IOException, InterruptedException {

}*/
}
  • 第二部: 自定义reduce逻辑代码
1
2
3
1) 创建一个类, 继承reducer类
2) 重写reducer类中reduce的方法
3) 在reduce方法中 实现reduce的逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.mr.wordCount;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class WordCountReducer extends Reducer<Text,IntWritable,Text,LongWritable> {
private LongWritable v3 = new LongWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

//1. 遍历这个迭代器, 获取每一个v2数据
long count = 0;
for (IntWritable value : values) {

int i = value.get();
count += i;
}

//2. 写出去
v3.set(count);
context.write(key,v3);
}
}
  • 第三部: 实现驱动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
package com.mr.wordCount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCountDriver extends Configured implements Tool {


@Override
public int run(String[] args) throws Exception {

//1. 创建 任务对象: job对象
Job job = Job.getInstance(getConf(), "WordCountDriver");


//2. 封装任务: 封装天龙八部
//2.1: 封装读取数据的类, 读取数据的路径

job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("file:///D:\\day05_MapReduce\\资料\\wordcount\\input\\wordcount.txt"));

//2.2: 设置mapper类, 并且设置输出的k2和v2的类型
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//2.3: 设置shuffle操作: 分区 排序 规约 分组 (默认 )

//2.7: 设置reduce类, 并且设置reduce的输出的k3和v3的类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

//2.8: 设置输出类 并设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
// 注意: 输出路径必须不能存在,否则报错
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day05_MapReduce\\资料\\wordcount\\output"));


//3. 提交执行了
boolean flag = job.waitForCompletion(true); // 是否等待执行的状态


return flag ? 0 : 1;
}

public static void main(String[] args) throws Exception {

//1. 指定run方法
Configuration conf = new Configuration();
int i = ToolRunner.run(conf, new WordCountDriver(), args); // 返回值为执行状态 0 正常 1异常

//2. 执行程序退出
System.exit(i);

}

}

  • 简写方式:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
package com.mr.wordCount;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class WordCountDriver2 {

public static void main(String[] args) throws Exception {

//1. 创建 任务对象: job对象
Job job = Job.getInstance(new Configuration(), "WordCountDriver");


//2. 封装任务: 封装天龙八部
//2.1: 封装读取数据的类, 读取数据的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("file:///D:\\day05_MapReduce\\资料\\wordcount\\input\\wordcount.txt"));

//2.2: 设置mapper类, 并且设置输出的k2和v2的类型
job.setMapperClass(WordCountMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//2.3: 设置shuffle操作: 分区 排序 规约 分组 (默认 )

//2.7: 设置reduce类, 并且设置reduce的输出的k3和v3的类型
job.setReducerClass(WordCountReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

//2.8: 设置输出类 并设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
// 注意: 输出路径必须不能存在,否则报错
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day05_MapReduce\\资料\\wordcount\\output1"));


//3. 提交执行了
boolean flag = job.waitForCompletion(true); // 是否等待执行的状态


//4. 退出程序:
System.exit(flag ? 0 : 1);

}
}

  1. MapReduce的运行方式

本地运行

  • 直接右键run即可, 输入和输出路径可以是本地 也可以是HDFS

    1
    2
    3
    如果输入和输出都是本地路径, 那么不需要启动hadoop集群

    如果输出和输出是HDFS路径, 需要保证HDFS已经启动了

集群化运行:

  • 将MapReduce在yarn平台上执行

    1. 在MR的驱动类, 添加集群运行配置代码:
    1
    2
    job.setJarByClass(当前类的.class);
    注意: 此配置必须配, 否则在yarn上执行的时候, 会报找不到主类
    1. 在yarn上运行, 建议输入和输出路径采用HDFS路径, 不要使用本地路径
    1
    2
    TextInputFormat.addInputPath(job, new Path(args[0])); // 输入路径  args[0] 后期执行时候动态传递
    TextOutputFormat.setOutputPath(job,new Path(args[1]));// 输出路径 args[1] 后期执行时候动态传递
    1. 在pom文件中 添加打包插件
    • 打包插件作用: 将当前程序所依赖的jar包, 一并打入到当前jar包中 (fatjar)

    • 注意: 如果不加打包插件, 只会将自己编写代码进行打包, 依赖的jar包, 不会打入的

      什么时候需要加入这个打包插件?

      1. pom文件中使用的依赖包有非hadoop的jar包的时候

      2. 添加打包插件后, 如果有一些包, 不需要打入jar包中: 建议在依赖中添加 provided

    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    <plugin>
    <groupId>org.apache.maven.plugins</groupId>
    <artifactId>maven-shade-plugin</artifactId>
    <version>2.4.3</version>
    <executions>
    <execution>
    <phase>package</phase>
    <goals>
    <goal>shade</goal>
    </goals>
    <configuration>
    <minimizeJar>true</minimizeJar>
    </configuration>
    </execution>
    </executions>
    </plugin>
    1. 执行打包:
  • MapReduce.assets/image-20210129120324576.png
  • MapReduce.assets/image-20210129120357308.png
    1. 由于在wordCount案例中, 所有的依赖包都是hadoop相关的jar包, 所以此处只需要使用小的jar包即可
    1. 将这个jar包上传到Linux系统中
    1. 执行 jar包
    1
    2
    3
    4
    5
    6
    7
    8
    9
    数据准备工作
    hdfs dfs -mkdir -p /wordcount/input
    hdfs dfs -put wordcount.txt /wordcount/input

    执行操作:
    yarn jar wordCountMR.jar com.mr.wordCount.WordCountDriver hdfs://node1:8020/wordcount/input/wordcount.txt hdfs://node1:8020/wordcount/output
    格式:
    yarn jar jar包路径 驱动类包名+类名(权限类名) [args...]
    hadoop jar jar包路径 驱动类包名+类名(权限类名) [args...]

MapReduce的原理

MapReduce的并行机制

  • 并行机制: map 或者 reduce 在执行的时候, 会运行多少个map 和多少个reduce问题
  • MapTask并行度

    【在执行MapReduce的时候, mapTask会运行多少个取决于什么呢?】

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
mapTask数量取决于读取的数据量, 根据数据量不同, MapTask也会启动不同个

默认行为下,
每个mapTask处理数据量为128M , 也就是, 读取一个文件, 这个你文件块有多少个, 就会启动多少个mapTask
读取的每一个文件至少需要一个mapTask来运行

思考: 假设在HDFS中有一个文件为300M大小, 分为三个块(物理划分), 128,128,44 , 此时MapReduce会启动三个mapTask来执行, 请问MapTask在读取这个文件 是否和这几个块一一对应呢? 不会的
在整个MapTask在读取数据采用文件切片, 文件切片大小和块的大小是一致的, 但是文件切片是一个逻辑的划分

将这个文件整体读取过来, 根据文件切片大小对数据进行逻辑划分块, 划分多少块然后就启动多少MapTask
原因: 担心将一行数据划分到不同的mapTask中

切片的公式:
Math.max(minSize, Math.min(maxSize, blockSize));
说明:
minSize: 最小值默认 为 0
maxSize: 最大值默认为 Long的最大值(好大好大值)
blockSize: 块大小 128M HDFS决定
根据公式, 可以算出, 默认每一个文件切片大小128M

比如说: 一个文件 1GB , 此时按照默认的行为, 会启动多少mapTask呢? 8 个mapTask
需要: 由于服务器性能比较好, 每一个MapTask可以处理256M 也没有问题, 此时需要你解决这个问题呢?
通过调整: minSize 将其调整为 256M 大小
如果调大每个文件切片大小, 通过调整minsize方式即可
需求: 由于服务器性能不是特别好, 每一个MapTask可以处理64M , 此时需要你解决这个问题呢?
通过调整: maxSize 将其调整为 64M大小
如果调小每个文件切片大小, 通过调整maxsize方式即可
那么在代码中如何体现呢? 在驱动类中输入format设置即可
TextInputFormat.setMaxInputSplitSize();
TextInputFormat.setMinInputSplitSize();

  • reduce的并行度
1
2
3
4
在执行MapReduce的程序的时候, reduce有多少个, 取决于什么? 结果最终需要几个文件有关系, 说白了, 有几个reduce, 也就表示着有几个分区, 如果自定义分区, 那么分区的数量和reduce的相等

如何设置reduce的数量呢? 默认reduce数量为 1
job.setNumReduceTask(N);

mapTask的工作机制

MapReduce.assets/image-20210131110450060.png

reduce的工作机制

MapReduce.assets/image-20210131113702393.png

进阶使用:自定义shuffle

MapReduce中自定义分区

  • MR中默认分区方案: 将相同的k2发往同一个reduce操作

  • MR如何确保这个事情的呢?

1
2
3
4
5
6
7
8
解决方案: 采用 hash取模计算法
k2.hashcode % numPartition 余数得多少, 就将k2数据发到那个编号的分区上即可
在MR中专门有一个默认分区类: HashPartitioner
(key.hashCode() & 2147483647) % numReduceTasks
好处:
给相同的数据打上同样的分区的标记
弊端:
划分的数据比较随机
  • 需求: 将15及以上的结果以及15以下的结果进行分开成两个文件进行保存
MapReduce.assets/image-20210129150215762.png
  • 实现思路流程:
MapReduce.assets/image-20210129152650492.png
  • 代码实现:
  • mapper类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
package com.mr.partition;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class LotteryMapper extends Mapper<LongWritable,Text,IntWritable,Text>{
private IntWritable k2 = new IntWritable();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//1.获取一行数据
String line = value.toString();

//2. 判断操作
if(line!=null && !"".equals(line)){

//3.从一行数据中提取开奖号
String[] fields = line.split("\t");
int lottery = Integer.parseInt(fields[5]);

//4. 写出去
k2.set(lottery);

context.write(k2,value);
}


}
}
  • 定义自定义分区类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.mr.partition;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

// 自定义分区类
public class MyPartitioner extends Partitioner<IntWritable,Text> {
@Override
public int getPartition(IntWritable k2, Text v2, int numReducerTask) {
// 思考: 判断k2的数据是否是大于等于15 如果大于返回 0 否则 返回 1
int lottery = k2.get();
if(lottery>=15){
return 0;
}else {
return 1;
}

}
}

  • 定义reduce逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.mr.partition;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class LotteryReducer extends Reducer<IntWritable,Text,Text,NullWritable> {

@Override
protected void reduce(IntWritable key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

for (Text value : values) {
context.write(value,NullWritable.get());
}

}
}
  • 定义驱动类:
    • 一定要设置自己定义的分区类, 如果不设置, 默认MR会使用HashPartitioner进行分区操作,因为MR并不知道你设置了自定义分区
    • 一定要设置reduce的数量, 因为reduce数量默认为1个, 如果不设置, 即使分区编号有多个, 最终也只能落在一个reduce上
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.mr.partition;

import com.mr.wordCount.WordCountMapper;
import com.mr.wordCount.WordCountReducer;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class LotteryDriver {

public static void main(String[] args) throws Exception{

//1. 创建 job对象: 任务对象
Job job = Job.getInstance(new Configuration(), "LotteryDriver");

// 集群化运行的必备参数
job.setJarByClass(LotteryDriver.class);

//2. 封装job任务: 天龙八部:
//2.1: 封装读取数据的类, 读取数据的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("file:///D:\\day05_MapReduce\\资料\\自定义分区\\input\\partition.csv"));

//2.2: 设置mapper类, 并且设置输出的k2和v2的类型
job.setMapperClass(LotteryMapper.class);
job.setMapOutputKeyClass(IntWritable.class);
job.setMapOutputValueClass(Text.class);

//2.3: 设置shuffle操作: 分区
job.setPartitionerClass(MyPartitioner.class);

//2.4: 设置shuffle操作: 排序 规约 分组 (默认 )

//2.7: 设置reduce类, 并且设置reduce的输出的k3和v3的类型
job.setReducerClass(LotteryReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);

//2.8: 设置输出类 并设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
// 注意: 输出路径必须不能存在,否则报错
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day05_MapReduce\\资料\\自定义分区\\output1"));


//2.9:设置reduce的数量 默认为 1
job.setNumReduceTasks(2);

//3. 执行代码
boolean flag = job.waitForCompletion(true);

//4. 退出程序
System.exit(flag ? 0 : 1);


}


}

  • 右键执行测试即可:
  • MapReduce.assets/image-20210129154345718.png

MapReduce中自定义排序与序列化

  • 序列化: 将数据转换为字节的过程

  • 反序列化: 将字节内容转回到数据本身

  • 如果数据能够支持被序列化, 也就说数据可以进行保存磁盘或者进行网络的传输工作

  • 如何让一类或者让某个数据类型支持序列化呢?

1
2
3
4
5
6
7
在java中, 专门有一个序列化的接口: Serializable, 如果你想让一个类或者数据类型实现序列化的操作, 只需要让其这个类实现serializable接口即可

在hadoop中, 并没有使用默认的数据类型, 虽然这些类型已经实现了序列化, 原因如下:
因为java中提供的这种序列化方式有点太重了, java的序列化在对数据进行转换的时候, 除了携带数据本身以外, 还携带这个类的继承体系以及其他的校验数据信息,这样导致传递的内容变得更多了(变的更重了)
这样如果采用java的序列化方案进行网络传输, 就会导致传输大量没有用的数据, 从而占用大量的网络带宽, 导致传递数据的效率下降, 成本提高
hadoop为了解决这样的问题, 专门提供了一套针对于hadoop的序列化方案: writable类
writable在传输的过程中, 仅会对数据本身进行转换 并不会携带额外一些内容, 从而保证传输效率更高
  • 说明writable:
1
2
3
4
5
在writable类中, 有两个方法:  
write() : 序列化方法
readFields(): 反序列化方法
注意:
在使用的时候, 序列化的顺序和反序列化的顺序必须保持一致, 否则会出现反序列化失败
  • 排序: 默认MR的排序对k2进行排序工作, 升序排序, 排序规则: 字典序和数字序

  • 需求:
1
2
3
4
5
6
7
8
9
10
11
a   1
a 9
b 3
a 7
b 8
b 10
a 5

第一列按照字典顺序进行排列
第一列相同的时候, 第二列按照升序进行排列

  • MapReduce.assets/image-20210129164022610.png
  • 代码实现:

    1. 定义sortBean操作:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
package com.mr.sort;

import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class SortBean implements WritableComparable<SortBean> {

private String first;
private Integer second;

public SortBean() {
}

public SortBean(String first, Integer second) {
this.first = first;
this.second = second;
}

public String getFirst() {

return first;
}

public void setFirst(String first) {
this.first = first;
}

public Integer getSecond() {
return second;
}

public void setSecond(Integer second) {
this.second = second;
}

@Override
public String toString() {
return first + "\t" + second ;
}

//序列化方法
// 注意: 序列化顺序 和 反序列化的顺序必须保持一致
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(first);
out.writeInt(second);
}

// 反序列化方法
@Override
public void readFields(DataInput in) throws IOException {
this.first = in.readUTF();
this.second = in.readInt();
}

// 比较器: 只需要指定按照谁来进行比较操作
// 技巧: 如果要想实现升序排序: this.compareTo(参数里内容) 倒序排序: 参数内容.compareTo(this)
@Override
public int compareTo(SortBean o) {
// 比较逻辑: 先比较第一列的数据(降序), 如果一致, 比较第二列数据

int i = o.first.compareTo(this.first); //返回值: 整数(前面比后面大) 负数(前面比后面的小) 和 0(一般大)

if(i == 0 ){ // 第一列相等

int i1 = this.second.compareTo(o.second);
return i1;
}

return i;
}
}

    1. 定义mapper类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
package com.mr.sort;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class SortMapper extends Mapper<LongWritable,Text,SortBean,NullWritable> {
private SortBean k2 = new SortBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//1. 获取一行数据
String line = value.toString();

//2. 判断操作
if(line != null && !"".equals(line)){

//3. 封装 SortBean 对象
String[] fields = line.split("\t");


k2.setFirst(fields[0]);
k2.setSecond(Integer.parseInt(fields[1]));

//4. 写出去

context.write(k2,NullWritable.get());
}

}
}

    1. 定义reduce类:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
package com.mr.sort;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class SortReducer extends Reducer<SortBean,NullWritable,SortBean,NullWritable> {

@Override
protected void reduce(SortBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {

for (NullWritable value : values) {
context.write(key,value);
}

// context.write(key,NullWritable.get());
}
}

    1. 创建驱动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
package com.mr.sort;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class SortDriver {

public static void main(String[] args) throws Exception{

//1. 创建 job任务对象
Job job = Job.getInstance(new Configuration(), "SortDriver");

// 设置集群运行配置参数
job.setJarByClass(SortDriver.class);

//2. 封装任务对象: 天龙八部
//2.1: 封装读取数据的类, 读取数据的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path("file:///D:\\day05_MapReduce\\资料\\排序\\input\\sort.txt"));

//2.2: 设置mapper类, 并且设置输出的k2和v2的类型
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(SortBean.class);
job.setMapOutputValueClass(NullWritable.class);

//2.3: 设置shuffle操作: 分区 排序 规约 分组 (默认 )


//2.7: 设置reduce类, 并且设置reduce的输出的k3和v3的类型
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(SortBean.class);
job.setOutputValueClass(NullWritable.class);

//2.8: 设置输出类 并设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
// 注意: 输出路径必须不能存在,否则报错
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day05_MapReduce\\资料\\排序\\output1"));



//3. 执行代码
boolean flag = job.waitForCompletion(true);

//4. 退出程序
System.exit(flag ? 0 : 1);


}
}

MapReduce的规约操作

  • 规约: 是MR中优化步骤, 主要目的是为了实现提前聚合操作, 减少从map 到reduce之间数据传输量

  • 从对wordCount理解规约的过程, 说白了 对每一个map阶段数据, 进行提前聚合操作, 整个聚合操作基本与reduce聚合逻辑是一致的, 只不过规约是针对每一个map 而reduce是可以针对所有map的结果进行聚合

  • 如何实现规约呢? 规约的逻辑和reduce的逻辑是一致

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
说明: 
规约的实现步骤与reduce是一模一样的, reduce如何实现, 规约就如何实现即可, 只不过最后在驱动类 需要将reduce程序设置到规约的选项中



并不是所有的业务都可以进行规约操作,在进行规约之前, 一定要做可行性分析: 要求规约操作不能够影响最终结果,如果有 不要使用规约

比如说求平均数业务:
假设: 1,2,3,4,5 求平均数 3

比如有两个map
第一个map ; 1,2,3 ==> 2

第二个map: 4,5 ==> 4.5


reduce: 统计进行平均数 ==>3.25

  • 需求: 有 三个书架 ,每个书架上都有5本书, 要求 统计出 每种分类的下有几本书??
1
2
3
4
5
6
7
8
1号书架                 2号书架                         3号书架
<<java入门宝典>> <<Python入门宝典>> <<spark入门宝典>>
<<UI入门宝典>> <<乾坤大挪移>> <<hive入门宝典>>
<<天龙八部>> <<凌波微步>> <<葵花点穴手>>
<<史记>> <<PHP入门宝典>> <<铁砂掌>>
<<葵花宝典>> <<hadoop入门宝典>> <<论清王朝的腐败>>

分为三大类: 计算机类 武林秘籍 历史
  • 思考如何做: 相同k2的数据会被发往同一个reduce 相同k2的value数据合并为一个集合
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
map阶段: k1:行偏移量  v1: 一本书  -->  k2(类别)      v2 (1)
假设有二个map程序
第一个map程序:
输出结果:
计算机 1
计算机 1
计算机 1
计算机 1
武林秘籍 1
计算机 1
武林秘籍 1
武林秘籍 1

第一个map规约方案:
计算机 5
武林秘籍 3


武林秘籍 1
历史 1
计算机 1
武林秘籍 1
武林秘籍 1
计算机 1
历史 1

第二个map规约方案:
武林秘籍 3
历史 2
计算机 2


没有规约之前, reduce需要接收到 15次键值对
规约之后, reduce可以接收到 5次键值对

接收到: 分组后数据
计算机 [1,1,1,1,1,1,1]
武林秘籍: [1,1,1,1,1,1]
历史: [1,1]

reduce阶段: k2:类别 v2: 数值1集合 ---> k3: 类别 v3 结果数量

结果:
计算机 7
武林秘籍 6
历史 2

  • 代码实现:
    1. 实现map逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.mr.combiner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class BookCombinerMapper extends Mapper<LongWritable,Text,Text,IntWritable> {

private Text k2 = new Text();
private IntWritable v2 = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//1. 读取一行数据
String book = value.toString();

//2. 执行判断操作
if(book != null && !"".equals(book)){
String category = "";
//3:判断 书是属于什么类别:
if(book.contains("入门宝典")){// 如果能进来, 说明书为 计算机类别
category= "计算机";
}else if (book.contains("史记") || book.contains("论清王朝的腐败") ){ // 如果能进来, 说明书为 历史类别
category= "历史";
}else { // 否则 就是武林秘籍
category= "武林秘籍";
}

//4. 写出去

k2.set(category);

context.write(k2,v2);
}


}
}

  • reduce代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
package com.mr.combiner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class BookConbinerReducer extends Reducer<Text,IntWritable,Text,LongWritable> {

private LongWritable v3 = new LongWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

//1. 遍历v2的数据, 进行累加即可
long count = 0 ;
for (IntWritable value : values) {

count += value.get();

}

//2. 写出去
v3.set(count);
context.write(key,v3);



}
}

  • combiner类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
package com.mr.combiner;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class BookConbiner extends Reducer<Text,IntWritable,Text,IntWritable> {

private IntWritable v3 = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

//1. 遍历v2的数据, 进行累加即可
int count = 0 ;
for (IntWritable value : values) {

count += value.get();
}
//2. 写出去
v3.set(count);
context.write(key,v3);

}
}

  • 驱动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
package com.mr.combiner;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class BookCombinerDriver extends Configured implements Tool {


@Override
public int run(String[] args) throws Exception {

//1. 创建 job任务对象
Job job = Job.getInstance(super.getConf(), "BookCombinerDriver");

// 设置集群运行的必备参数
job.setJarByClass(BookCombinerDriver.class);

//2. 封装 job的任务: 天龙八部
//2.1: 封装 读取数据输入类 及其输入的路径
job.setInputFormatClass(TextInputFormat.class);

TextInputFormat.addInputPath(job,new Path("file:///D:\\day06_MapReduce\\资料\\combinner\\input\\combinner.txt"));

//2.2: 设置mapper类 及其mapper输出k2 和 v2的类型
job.setMapperClass(BookCombinerMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);

//2.3: shuffle阶段: 分区 排序 规约 分组操作
job.setCombinerClass(BookConbiner.class);

//2.7: 设置reduce类, 及其输出 k3 和 v3的类型
job.setReducerClass(BookConbinerReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

//2.8: 设置 输出类 , 及其输出的路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day06_MapReduce\\资料\\combinner\\output1"));

//3. 提交任务
boolean flag = job.waitForCompletion(true);

return flag ? 0 : 1;
}

public static void main(String[] args) throws Exception {

int i = ToolRunner.run(new Configuration(), new BookCombinerDriver(), args);

System.exit(i);
}
}

  • 效果演示:
MapReduce.assets/image-20210131094521098.png
  1. MapReduce中分组操作
  • 分组: 将相同k2的value数据合并为一个集合操作

  • 需求: 现在需要求出每一个订单中成交金额最大的一笔交易

  • 需求分析流程:

MapReduce.assets/image-20210131152314963.png

如何自定义分组

1
2
3
4
5
6
1) 创建一个类, 继承 WritableComparator
2) 编写空参构造方法, 在构造方法中, 调用父类构造
super( k2的类型 ,true);
3) 重写父类的compare方法, 在方法上有两个参数, 这两个参数其实就是需要进行比较的参数
4) 在compare方法, 编写比较规则: 根据业务来判断, 具体需要将随放置在一组, 就按照谁来进行比较即可
5) 告知给MR, 我自定义分组操作

代码实现:

  • 自定义数据类型
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
package com.mr.group;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {

private String order_id;

private String product_id;

private Double price;


// 序列化方法
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(order_id);
out.writeUTF(product_id);
out.writeDouble(price);

}
// 反序列化方法
@Override
public void readFields(DataInput in) throws IOException {

this.order_id = in.readUTF();
this.product_id = in.readUTF();
this.price = in.readDouble();

}
// 排序比较的方法
@Override
public int compareTo(OrderBean o) {
// 比较逻辑: 首先先比较订单id是否一致(升序还是降序呢? 无所谓), 如果一致, 比较金额(降序)

int i = this.order_id.compareTo(o.order_id);

if(i == 0){ // 订单id是相等的

int i1 = o.price.compareTo(this.price);
return i1;

}

return i;
}


public String getOrder_id() {
return order_id;
}

public void setOrder_id(String order_id) {
this.order_id = order_id;
}

public String getProduct_id() {
return product_id;
}

public void setProduct_id(String product_id) {
this.product_id = product_id;
}

public Double getPrice() {
return price;
}

public void setPrice(Double price) {
this.price = price;
}

@Override
public String toString() {
return order_id + "\t" + product_id + "\t" + price ;
}
}

  • 自定义map
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
package com.mr.group;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class OrderMapper extends Mapper<LongWritable,Text,OrderBean,NullWritable> {
private OrderBean k2 = new OrderBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//1. 获取一行数据
String line = value.toString(); //


//2. 判断
if( line != null && !"".equals(line) ){

//3. 对数据进行切割操作:
String[] fields = line.split("\t");

//4.封装到 OrderBean

k2.setOrder_id(fields[0]);
k2.setProduct_id(fields[1]);
k2.setPrice(Double.parseDouble(fields[2]));

//5. 写出去
context.write(k2,NullWritable.get());


}

}
}

  • 自定义分区操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
package com.mr.group;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class OrderPartition extends Partitioner<OrderBean,NullWritable> {
@Override
public int getPartition(OrderBean orderBean, NullWritable nullWritable, int numReduceTasks) {
// 需求: 将相同订单ID的数据 划分同一个reduce中去
String order_id = orderBean.getOrder_id();

return (order_id.hashCode() & 2147483647) % numReduceTasks;
}
}

  • 自定义reduce操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.mr.group;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class OrderReducer extends Reducer<OrderBean,NullWritable,OrderBean,NullWritable> {

@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
int top = 2;
int len = 0;
for (NullWritable value : values) {

if(len >= top){
break;
}

context.write(key,value);

len ++;

}
}
}

  • 自定义分组操作
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
package com.mr.group;

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
// 自定义分组的是实现
public class MyOrderGroup extends WritableComparator{

public MyOrderGroup() {
// 参数1: 告知给分组类, k2的类型是什么 参数2: 是否创建k2类型 (true)
super( OrderBean.class ,true);
}

// 比较方法, 方法中两个参数, 本质上就是k2的相邻的两个元素
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean elem1 = (OrderBean)a;
OrderBean elem2 = (OrderBean)b;

return elem1.getOrder_id().compareTo(elem2.getOrder_id());
}
}

  • 驱动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
package com.mr.group;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class OrderDriver {

public static void main(String[] args) throws Exception {

//1. 创建 job对象
Job job = Job.getInstance(new Configuration(), "OrderDriver");

//设置 集群运行必备参数
job.setJarByClass(OrderDriver.class);


//2. 设置job的任务: 天龙八部

//2.1: 设置输入类, 及其输入的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\day06_MapReduce\\资料\\自定义groupingComparator\\input\\orders.txt"));

//2.2: 设置mapper类, 及其mapper输出的k2和v2的类型
job.setMapperClass(OrderMapper.class);

job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);


//2.3: 设置shuffle阶段: 分区
job.setPartitionerClass(OrderPartition.class);

//2.4: 设置shuffle阶段: 排序 数据类型中以及定义完成

//2.5: 设置shuffle阶段: 规约, 暂时没有

//2.6: 设置shuffle阶段: 分组操作
job.setGroupingComparatorClass(MyOrderGroup.class);

//2.7: 设置reduce类, 及其输出k3和v3的类型
job.setReducerClass(OrderReducer.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);

//2.8: 设置输出路径 及其输出 k3和v3的类型
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day06_MapReduce\\资料\\自定义groupingComparator\\output1"));

//2.9: 设置reduceTask数量
job.setNumReduceTasks(2);

//3. 提交任务
boolean flag = job.waitForCompletion(true);

//4. 退出程序
System.exit(flag ? 0 : 1);
}



}

  1. 综合案例_倒排索引
  • 需求: 求每个单词在各个文件出现了多少次

  • 案例流程分析:

  • MapReduce.assets/image-20210131172702327.png
  • 代码实现:

  • 如何获取文件名

1
2
3
//4. 获取文件名
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String fileName = fileSplit.getPath().getName();
    1. 定义map程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
package com.mr.index;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class IndexMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
private Text k2 = new Text();
private IntWritable v2 = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1. 读取一行数据
String line = value.toString();

//2. 判断
if( line!= null && !"".equals(line) ){

//3. 对数据执行切割操作
String[] words = line.split(" ");

//4. 获取文件名
FileSplit fileSplit = (FileSplit)context.getInputSplit();
String fileName = fileSplit.getPath().getName();

//5. 组合k2操作:
for (String word : words) {
k2.set(fileName+"_"+word);

//6. 写出去
context.write(k2,v2);
}



}

}
}

  • 自定义reduce程序
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.mr.index;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class IndexReducer extends Reducer<Text,IntWritable,Text,LongWritable> {
private LongWritable v3 = new LongWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {

//1. 遍历
long count = 0 ;
for (IntWritable value : values) {
count+= value.get();
}

//2. 写出去
v3.set(count);
context.write(key,v3);

}
}

  • 自定义Driver类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
package com.mr.index;

import com.mr.group.*;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class IndexDriver {

public static void main(String[] args) throws Exception {

//1. 创建 job对象
Job job = Job.getInstance(new Configuration(), "IndexDriver");

//设置 集群运行必备参数
job.setJarByClass(IndexDriver.class);


//2. 设置job的任务: 天龙八部

//2.1: 设置输入类, 及其输入的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\day06_MapReduce\\资料\\倒排索引\\input"));

//2.2: 设置mapper类, 及其mapper输出的k2和v2的类型
job.setMapperClass(IndexMapper.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);


//2.3: 设置shuffle阶段: 分区

//2.4: 设置shuffle阶段: 排序 数据类型中以及定义完成

//2.5: 设置shuffle阶段: 规约, 暂时没有

//2.6: 设置shuffle阶段: 分组操作


//2.7: 设置reduce类, 及其输出k3和v3的类型
job.setReducerClass(IndexReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(LongWritable.class);

//2.8: 设置输出路径 及其输出 k3和v3的类型
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day06_MapReduce\\资料\\倒排索引\\output"));


//3. 提交任务
boolean flag = job.waitForCompletion(true);

//4. 退出程序
System.exit(flag ? 0 : 1);
}

}

1 .MapReduce的综合练习

上网流量的统计_需求一

  • 需求: 统计每个手机号的上行流量总和,下行流量总和,上行总流量之和,下行总流量之和

    数据样例:

    MapReduce.assets/image-20210201085126705.png

    数据说明:

    MapReduce.assets/image-20210201085141830.png

    流程分析:

    MapReduce.assets/image-20210201090137275.png

代码实现:

  • 自定义数据类型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
package com.mr.anli.flow;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean implements Writable{

private Long upFlow ;
private Long downFlow;
private Long upTotalFlow;
private Long downTotalFlow;

public Long getUpFlow() {
return upFlow;
}

public void setUpFlow(Long upFlow) {
this.upFlow = upFlow;
}

public Long getDownFlow() {
return downFlow;
}

public void setDownFlow(Long downFlow) {
this.downFlow = downFlow;
}

public Long getUpTotalFlow() {
return upTotalFlow;
}

public void setUpTotalFlow(Long upTotalFlow) {
this.upTotalFlow = upTotalFlow;
}

public Long getDownTotalFlow() {
return downTotalFlow;
}

public void setDownTotalFlow(Long downTotalFlow) {
this.downTotalFlow = downTotalFlow;
}

@Override
public String toString() {
return upFlow +"\t"+downFlow +"\t"+upTotalFlow +"\t"+downTotalFlow;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(upTotalFlow);
out.writeLong(downTotalFlow);


}

@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.upTotalFlow = in.readLong();
this.downTotalFlow = in.readLong();
}
}

  • mapper阶段代码
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
package com.mr.anli.flow;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Flow1Mapper extends Mapper<LongWritable,Text,Text,FlowBean> {
private Text k2 = new Text();
private FlowBean v2 = new FlowBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//1. 读取一行数据
String line = value.toString();

//2. 判断
if( line != null && !"".equals(line) ){

//3.对数据进行切割处理
String[] fields = line.split("\t");

//4. 获取 k2和v2的数据
String phone = fields[1]; // 封装 k2

// 封装 v2
v2.setUpFlow(Long.parseLong(fields[6]));
v2.setDownFlow(Long.parseLong(fields[7]));
v2.setUpTotalFlow(Long.parseLong(fields[8]));
v2.setDownTotalFlow(Long.parseLong(fields[9]));


//5. 写出去:
k2.set(phone);
context.write(k2,v2);
}


}
}

  • reduce代码:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
package com.mr.anli.flow;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class Flow1Reducer extends Reducer<Text,FlowBean,Text,FlowBean> {
private FlowBean v3 = new FlowBean();
@Override
protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {


//1. 遍历 v2的数据, 封装v3数据
Long upFlow = 0L;
Long downFlow = 0L;
Long upTotalFlow = 0L;
Long downTotalFlow = 0L;

for (FlowBean value : values) {
upFlow += value.getUpFlow();
downFlow += value.getDownFlow();
upTotalFlow += value.getUpTotalFlow();
downTotalFlow += value.getDownTotalFlow();
}

v3.setUpFlow(upFlow);
v3.setDownFlow(downFlow);
v3.setUpTotalFlow(upTotalFlow);
v3.setDownTotalFlow(downTotalFlow);


//2. 写出去
context.write(key,v3);




}
}

  • 驱动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
package com.mr.anli.flow;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Flow1Driver {

public static void main(String[] args) throws Exception {

//1. 创建 job任务对象
Job job = Job.getInstance(new Configuration(), "Flow1Driver");

// 集群运行的必备参数
job.setJarByClass(Flow1Driver.class);

//2. 封装 job的任务 : 天龙八部

//2.1: 设置输入类 和输入的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\day07_MapReduce与yarn\\资料\\流量统计\\input"));

//2.2: 设置map类, 及其输出的k2和v2的类型
job.setMapperClass(Flow1Mapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(FlowBean.class);


//2.3: 设置shuffle: 分区 排序 规约 分组 默认方案


//2.7: 设置reduce逻辑, reduce的输出k3和v3的类型

job.setReducerClass(Flow1Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean.class);

//2.8: 设置输出类, 及其输出的路径

job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day07_MapReduce与yarn\\资料\\流量统计\\output"));

//3. 提交任务
boolean flag = job.waitForCompletion(true);

//4. 退出程序
System.exit(flag ?0 :1);


}
}

上网流量的统计_需求二

  • 需求: 对需求一的结果中上行流量和倒序排序(递减排序)

代码实现:

  • 自定义数据类型:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
package com.mr.anli.flow2;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class FlowBean2 implements WritableComparable<FlowBean2> {

private Long upFlow ;
private Long downFlow;
private Long upTotalFlow;
private Long downTotalFlow;


public FlowBean2() {
}

public FlowBean2(Long upFlow, Long downFlow, Long upTotalFlow, Long downTotalFlow) {
this.upFlow = upFlow;
this.downFlow = downFlow;
this.upTotalFlow = upTotalFlow;
this.downTotalFlow = downTotalFlow;
}

@Override
public int compareTo(FlowBean2 o) {
return o.upFlow.compareTo(this.upFlow) ;
}

@Override
public void write(DataOutput out) throws IOException {
out.writeLong(upFlow);
out.writeLong(downFlow);
out.writeLong(upTotalFlow);
out.writeLong(downTotalFlow);


}

@Override
public void readFields(DataInput in) throws IOException {
this.upFlow = in.readLong();
this.downFlow = in.readLong();
this.upTotalFlow = in.readLong();
this.downTotalFlow = in.readLong();
}


public Long getUpFlow() {
return upFlow;
}

public void setUpFlow(Long upFlow) {
this.upFlow = upFlow;
}

public Long getDownFlow() {
return downFlow;
}

public void setDownFlow(Long downFlow) {
this.downFlow = downFlow;
}

public Long getUpTotalFlow() {
return upTotalFlow;
}

public void setUpTotalFlow(Long upTotalFlow) {
this.upTotalFlow = upTotalFlow;
}

public Long getDownTotalFlow() {
return downTotalFlow;
}

public void setDownTotalFlow(Long downTotalFlow) {
this.downTotalFlow = downTotalFlow;
}

@Override
public String toString() {
return upFlow +"\t"+downFlow +"\t"+upTotalFlow +"\t"+downTotalFlow;
}


}

  • 自定义map逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
package com.mr.anli.flow2;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class Flow2Mapper extends Mapper<LongWritable,Text,FlowBean2,Text> {
private Text v2 = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//1. 读取一行数据
String line = value.toString();

//2. 判断
if( line != null && !"".equals(line)){

//3. 对数据执行分割操作
String[] fields = line.split("\t");

//4. 封装 k2和v2
FlowBean2 k2 = new FlowBean2(Long.parseLong(fields[1]),Long.parseLong(fields[2]),Long.parseLong(fields[3]),Long.parseLong(fields[4]));

v2.set(fields[0]);


//5. 写出去
context.write(k2,v2);

}


}
}

  • 自定义reduce逻辑
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
package com.mr.anli.flow2;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class Flow2Reducer extends Reducer<FlowBean2,Text,Text,FlowBean2> {

@Override
protected void reduce(FlowBean2 key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

for (Text value : values) {

context.write(value,key);
}


}
}

  • 驱动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
package com.mr.anli.flow2;

import com.mr.anli.flow.Flow1Mapper;
import com.mr.anli.flow.Flow1Reducer;
import com.mr.anli.flow.FlowBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class Flow2Driver {

public static void main(String[] args) throws Exception {

//1. 创建 job任务对象
Job job = Job.getInstance(new Configuration(), "Flow2Driver");

// 集群运行的必备参数
job.setJarByClass(Flow2Driver.class);

//2. 封装 job的任务 : 天龙八部

//2.1: 设置输入类 和输入的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\day07_MapReduce与yarn\\资料\\流量统计\\output"));

//2.2: 设置map类, 及其输出的k2和v2的类型
job.setMapperClass(Flow2Mapper.class);
job.setMapOutputKeyClass(FlowBean2.class);
job.setMapOutputValueClass(Text.class);


//2.3: 设置shuffle: 分区 排序 规约 分组 默认方案


//2.7: 设置reduce逻辑, reduce的输出k3和v3的类型

job.setReducerClass(Flow2Reducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(FlowBean2.class);

//2.8: 设置输出类, 及其输出的路径

job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day07_MapReduce与yarn\\资料\\流量统计\\output1"));

//3. 提交任务
boolean flag = job.waitForCompletion(true);

//4. 退出程序
System.exit(flag ?0 :1);


}
}

社交粉丝案例

需求: 求出哪些人两两之间有共同好友,及他俩的共同好友都有谁? 好友关系都是单向的

1
2
3
4
5
6
7
8
9
10
11
12
13
14
A:B,C,D,F,E,O
B:A,C,E,K
C:F,A,D,I
D:A,E,F,L
E:B,C,D,M,L
F:A,B,C,D,E,O,M
G:A,C,D,E,F
H:A,C,D,E,O
I:A,O
J:B,O
K:A,C,D
L:D,E,F
M:E,F,G
O:A,H,I,J

实现思路:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
样例数据: 
A: B C E F
B: A E
C: A D
D: E B
E: F C B
F: C D

核心一句话: 相同key数据发往同一个reduce,相同k2的value数据合并为一个集合
需求一: 用户在那些人的好友列表中

map阶段: k2 v2
B A
C A
E A
F A
A B
E B
A C
D C
E D
B D
F E
C E
B E
C F
D F
分组后结果:
B : [A D E]
C : [A E F]
E : [A B D]
F : [A E ]
A : [B C]
D : [F C]

reduce阶段: K3 V3
A B-C
B A-D-E
C A-E-F
D F-C
E A-B-D
F A-E

求共同好友呢?

map阶段: k2 v2
B-C A
A-D B
A-E B
D-E B
A-E C
A-F C
E-F C
F-C D
A-B E
A-D E
B-D E
A-E F
说明 map阶段在进行两两拼接的时候, 保证 小的在前, 大的在后
分组:
A-B : [E]
A-D : [B E]
A-E : [ B C F]
A-F : [C]
B-C : [A]
B-D : [E]
D-E : [B]
E-F : [C]
F-C : [D]

reduce阶段: k3 和 v3

结果:
A-B: E-
A-D: B-E-
A-E: C-B-F-
A-F: C
B-C: A
B-D: E
C-F: D
D-E: B
E-F: C

代码实现: 第一个需求, 求用户在那些人的好友列表中

  • mapper类实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
package com.mr.anli.friend;

import com.sun.org.apache.regexp.internal.RE;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class FriendMapper1 extends Mapper<LongWritable,Text,Text,Text> {
private Text k2 = new Text();
private Text v2 = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {

//1. 获取一行数据
String line = value.toString();

//2. 判断
if( line!=null && !"".equals(line) ){

//3. 对数据进行切割处理
String[] fields = line.split(":");

String v2Str = fields[0];

String[] friends = fields[1].split(",");

//4. 遍历用户好友列表, 将每一个好友当做k2
for (String friend : friends) {
k2.set(friend);
v2.set(v2Str);

context.write(k2,v2);
}

}





}
}

  • reduce类的实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
package com.mr.anli.friend;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FriendReducer1 extends Reducer<Text,Text,Text,Text> {

private Text v3 = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

//1. 遍历v2的数据
StringBuffer v3Str = new StringBuffer();
for (Text value : values) {

v3Str.append( value.toString()+"-");
}

//2. 写出去
v3.set(v3Str.toString());
context.write(key,v3);

}
}

  • 驱动类
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
package com.mr.anli.friend;

import com.mr.anli.flow.Flow1Mapper;
import com.mr.anli.flow.Flow1Reducer;
import com.mr.anli.flow.FlowBean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

public class FriendDriver1 {

public static void main(String[] args) throws Exception {

//1. 创建 job任务对象
Job job = Job.getInstance(new Configuration(), "FriendDriver1");

// 集群运行的必备参数
job.setJarByClass(FriendDriver1.class);

//2. 封装 job的任务 : 天龙八部

//2.1: 设置输入类 和输入的路径
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("file:///D:\\day07_MapReduce与yarn\\资料\\共同好友\\input"));

//2.2: 设置map类, 及其输出的k2和v2的类型
job.setMapperClass(FriendMapper1.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Text.class);


//2.3: 设置shuffle: 分区 排序 规约 分组 默认方案


//2.7: 设置reduce逻辑, reduce的输出k3和v3的类型

job.setReducerClass(FriendReducer1.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);

//2.8: 设置输出类, 及其输出的路径

job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day07_MapReduce与yarn\\资料\\共同好友\\output"));

//3. 提交任务
boolean flag = job.waitForCompletion(true);

//4. 退出程序
System.exit(flag ?0 :1);

}
}


代码实现: 求共同好友

  • mapper类的编写
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
package com.mr.anli.friend;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.beans.FeatureDescriptor;
import java.io.IOException;
import java.util.Arrays;

public class FriendMapper2 extends Mapper<LongWritable,Text,Text,Text> {
private Text v2 = new Text();
private Text k2 = new Text();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1. 读取一行数据
String line = value.toString();

//2. 判断
if( line != null && !"".equals(line)){

//3. 对数据进行切割操作

String[] fields = line.split("\t");
v2.set(fields[0]);

String[] friends = fields[1].split("-");

//4. 对 数据进行从小到大的排序
Arrays.sort(friends);

//5. 对数组数据进行两两比较:
// a b c d:
for( int i = 0 ; i<friends.length-1; i++){

for( int j = i+1; j < friends.length ; j++ ){

String k2Str = friends[i] +"-" + friends[j];

//6. 写出去:

k2.set(k2Str);
context.write(k2,v2);
}

}

}


}
}

  • recuceTask代码实现
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
package com.mr.anli.friend;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class FriendReducer2 extends Reducer<Text,Text,Text,Text> {
private Text v3 = new Text();
@Override
protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {

//1. 遍历value的数据
StringBuffer v3str =new StringBuffer();
for (Text value : values) {

v3str.append(value+"-");

}

//2. 写出去
v3.set(v3str.toString());
context.write(key,v3);
}
}

  • driver类实现
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    package com.mr.anli.friend;

    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;

    public class FriendDriver2 {

    public static void main(String[] args) throws Exception {

    //1. 创建 job任务对象
    Job job = Job.getInstance(new Configuration(), "FriendDriver2");

    // 集群运行的必备参数
    job.setJarByClass(FriendDriver2.class);

    //2. 封装 job的任务 : 天龙八部

    //2.1: 设置输入类 和输入的路径
    job.setInputFormatClass(TextInputFormat.class);
    TextInputFormat.addInputPath(job,new Path("file:///D:\\day07_MapReduce与yarn\\资料\\共同好友\\output"));

    //2.2: 设置map类, 及其输出的k2和v2的类型
    job.setMapperClass(FriendMapper2.class);
    job.setMapOutputKeyClass(Text.class);
    job.setMapOutputValueClass(Text.class);


    //2.3: 设置shuffle: 分区 排序 规约 分组 默认方案


    //2.7: 设置reduce逻辑, reduce的输出k3和v3的类型

    job.setReducerClass(FriendReducer2.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    //2.8: 设置输出类, 及其输出的路径

    job.setOutputFormatClass(TextOutputFormat.class);
    TextOutputFormat.setOutputPath(job,new Path("file:///D:\\day07_MapReduce与yarn\\资料\\共同好友\\output1"));

    //3. 提交任务
    boolean flag = job.waitForCompletion(true);

    //4. 退出程序
    System.exit(flag ?0 :1);


    }
    }