学习目标
理解排序的原理并使用MapReduce编写程序
知识点
1、查看数据并启动Hadoop服务
2、程序编写
知识回顾
在MapReduce操作时,传递的< key,value >会按照key的大小进行排序,最后输出的结果是按照key排过序的。在key排序的基础上,对value也进行排序。这种需求就是二次排序。二次排序是在框架在对key2排序后再对reduce输出结果的结果value3进行二次排序的需求。
在map阶段,使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。本例子中使用的是TextInputFormat,他提供的RecordReader会将文本的字节偏移量作为key,这一行的文本作为value。
核心总结:
1、map最后阶段进行partition分区,一般使用job.setPartitionerClass设置的类,如果没有自定义Key的hashCode()方法进行排序。
2、(第一次排序)每个分区内部调用job.setSortComparatorClass设置的key的比较函数类进行排序,如果没有则使用Key的实现的compareTo方法。
3、(第二次排序)当reduce接收到所有map传输过来的数据之后,调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序,如果没有则使用Key的实现的compareTo方法。
4、紧接着使用job.setGroupingComparatorClass设置的分组函数类,进行分组,同一个Key的value放在一个迭代器里面。
环境资源
硬件:Ubuntu16.04
软件:JDK-1.8、Hadoop-3.3、Eclipse
数据存放路径:/data/dataset
tar包路径:/data/software
tar包压缩路径:/data/bigdata
软件安装路径:/opt
实验设计创建文件:/data/resource
操作步骤
启动Hadoop集群
注意:需要在配置文件/etc/profile文件中打开hadoop3的相关环境变量设置。
1、在终端窗口中,执行如下命令,启动HDFS集群。
$ cd /opt/hadoop-3.3.0/sbin/
$ ./start-all.sh
2、在终端窗口中,执行如下命令,查看HDFS服务启动情况:
$ jps
准备数据文件
1、查看源数据文件内容。在终端窗口中,执行如下命令:
$ cat /data/dataset/SecondarySort.txt
可以看到,文件内容如下:
20 21
50 51
50 52
50 53
50 54
60 51
60 53
60 52
60 56
60 57
70 58
60 61
70 54
70 55
70 56
70 57
70 58
1 2
3 4
5 6
7 82
203 21
50 512
50 522
50 53
530 54
40 511
20 53
20 522
60 56
60 57
740 58
63 61
730 54
71 55
71 56
73 57
74 58
12 211
31 42
50 62
7 8
2、将数据源文件上传至HDFS的根目录下。在终端窗口中,执行如下命令:
$ hdfs dfs -put /data/dataset/SecondarySort.txt /
创建Map/Reduce项目
1、启动开发工具Eclipse。
2、打开eclipse开发工具后,在菜单栏中,选择【File】->【New】->【Project】->【Java project】创建Java项目并命名为【Hadoop3Demo】,点击【Finish】完成创建,如下图所示:
3、导入hadoop相关的jar包,首先右击项目选择【New】—>【Folder】创建一个【lib】文件夹并把指定位置中(/data/software/hadoop3_lib/)的包放入该文件中。如下图所示:
编写MapReduce程序
1、在项目【src】目录下,右键点击,选择”New”创建一个类文件名称为”com.simple.IntPair”,该类是对给定数据的两列值的封装,并作为mapper的输出键对象 。实现代码如下:
package com.simple;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class IntPair implements WritableComparable<IntPair> {
private int first;
private int second;
public IntPair() {
super();
}
public IntPair(int first, int second) {
super();
this.first = first;
this.second = second;
}
public int getFirst() {
return first;
}
public void setFirst(int first) {
this.first = first;
}
public int getSecond() {
return second;
}
public void setSecond(int second) {
this.second = second;
}
@Override
public int hashCode() {
final int prime = 31;
int result = 1;
result = prime * result + first;
result = prime * result + second;
return result;
}
@Override
public boolean equals(Object obj) {
if (this == obj)
return true;
if (obj == null)
return false;
if (getClass() != obj.getClass())
return false;
IntPair other = (IntPair) obj;
if (first != other.first)
return false;
if (second != other.second)
return false;
return true;
}
@Override
public String toString() {
return "IntPair [first=" + first + ", second=" + second + "]";
}
@Override
public int compareTo(IntPair intPair) {
//首先比较第一个数,当第一个数不一样时,对第一个数进行比较,设置排序规则
if(first-intPair.getFirst()!=0) {
return first>intPair.first?1:-1;
}else {
//当第一个数一样时,比较第二个数,并设置排序规则
return second>intPair.second?1:-1;
}
}
@Override
//readFiedls方法用于序列化过程中的数据读取
public void readFields(DataInput in) throws IOException {
this.first=in.readInt();
this.second=in.readInt();
}
@Override
//write方法用于序列化过程中的数据写出
public void write(DataOutput out) throws IOException {
// TODO Auto-generated method stub
out.writeInt(first);
out.writeInt(second);
}
}
2、在项目【src】目录下,右键点击创建一个类文件名称为”com.simple.FirstPartitioner”,该类是对数据处理后的结果进行分区设置 。代码实现如下:
package com.simple;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;
/*
* 分区函数类
实现其自定义分区功能
*/
public class FirstPartitioner extends Partitioner<IntPair, Text> {
@Override
public int getPartition(IntPair key, Text value, int numPartitions) {
//这里取key的hashcode值*127,然后取其绝对值,对numPartitions取模,这里numPartitions与ReduceTask数保持一致
return Math.abs(key.getFirst() * 127) % numPartitions;
}
}
3、在项目src目录下,右键点击创建一个类文件名称为”com.simple.GroupingComparator”,该类是对处理的数据进行分组设置 。实现代码如下:
package com.simple;
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
/*
* 分组函数类
*/
public class GroupingComparator extends WritableComparator {
// 必须要有这个构造器,构造器中必须要实现这个
protected GroupingComparator() {
super(IntPair.class, true);
}
// 重载 compare:对组合键按第一个自然键排序分组
@SuppressWarnings("rawtypes")
public int compare(WritableComparable w1, WritableComparable w2) {
IntPair ip1 = (IntPair) w1;
IntPair ip2 = (IntPair) w2;
return ip1.compareTo(ip2);
}
}
4、在项目【src】目录下,右键点击创建一个类文件名称为”com.simple.SecondarySortMapper”,继承类Mapper同时指定需要的参数类型,根据业务逻辑修改map类的内容如下:
package com.simple;
import java.io.IOException;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class SecondarySortMapper extends Mapper<LongWritable, Text, IntPair, Text> {
private final IntPair keyPair = new IntPair();
String[] lineArr = null;
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//获取行的内容并以一个空格进行分割,然后将切割后的第一个字段赋值给keyPair的first,
// 第二个字段赋值给keyPair的second,并以keyPair作为k,value作为v,写出
String line = value.toString();
lineArr = line.split(" ", -1);
keyPair.setFirst(Integer.parseInt(lineArr[0]));
keyPair.setSecond(Integer.parseInt(lineArr[1]));
context.write(keyPair, value);
}
}
5、在项目【src】目录下右键点击,新建一个类名为”com.simple.SecondarySortReducer”并继承Reducer类,然后添加该类中的代码内容如下所示:
package com.simple;
import java.io.IOException;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
public class SecondarySortReducer extends Reducer<IntPair, Text, Text, Text> {
private static final Text SEPARATOR = new Text("---------------------");
public void reduce(IntPair key, Iterable<Text> values, Context context)
throws IOException, InterruptedException {
//对每一个IntPair输出一个"-------"划分观察
context.write(SEPARATOR, null);
//迭代输出
for (Text val : values) {
context.write(null, val);
}
}
}
6、在项目【src】目录下右键点击,新建一个测试主类名为”com.simple.SecondarySortJob”,并指定main主方法。测试代码如下所示:
package com.simple;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
public class SecondarySortJob {
public static void main(String[] args) throws Exception {
// 获取作业对象
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://localhost:9000");
Job job = Job.getInstance(conf);
// 设置主类
job.setJarByClass(SecondarySortJob.class);
// 设置job参数
job.setMapperClass(SecondarySortMapper.class);
job.setReducerClass(SecondarySortReducer.class);
job.setMapOutputKeyClass(IntPair.class);
job.setMapOutputValueClass(Text.class);
// 设置分区
job.setPartitionerClass(FirstPartitioner.class);
// 设置分组
job.setGroupingComparatorClass(GroupingComparator.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(Text.class);
job.setInputFormatClass(TextInputFormat.class);
job.setOutputFormatClass(TextOutputFormat.class);
// 设置job输入输出
FileInputFormat.setInputPaths(job, new Path("/SecondarySort.txt"));
FileOutputFormat.setOutputPath(job, new Path("/simple/output"));
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
程序测试及运行
1、在”SecondarySortJob”类文件的任意空白处,单击右键,在弹出的环境菜单中,选择”【Run As】->【Java Application】”菜单项,运行程序。操作如下图所示:
2、查看控制台显示内容查看是否正确执行。如下图所示:
3、程序执行完毕之后,查看对数据处理后产生的结果。如下图所示:
hadoop不是大数据内容吗?