![Hadoop大数据技术开发实战](https://wfqqreader-1252317822.image.myqcloud.com/cover/392/27563392/b_27563392.jpg)
5.6 案例分析:二次排序
MapReduce在传递<key,value>对时默认按照key进行排序,而有时候除了key以外,还需要根据value或value中的某一个字段进行排序,基于这种需求进行的自定义排序称为“二次排序”。
例如有以下数据:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P102_12842.jpg?sign=1739370003-71ql4kb8gcXNKxu4t7Lvn4UJONQzDzkP-0-786e0340eb6b083d9c8d4b50509ead40)
现需要对上述数据先按照第一字段进行升序排列,若第一字段相同,则按照第二字段进行降序排列,期望的输出结果如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P103_12963.jpg?sign=1739370003-fd75GaiM3Ke1Jy9zYL0no8OsmHaZHIIG-0-c5fed78f01de8630fb068f5f3696deb1)
1. 设计思路
由于MapReduce中主要是对key的比较和排序,因此可以将需要排序的两个字段组合成一个复合key,而value值不变,则组合后的<key,value>对形如<(key,value),value>。
在编程时可以自定义一个类MyKeyPair,该类中包含要排序的两个字段,然后将该类作为<key,value>对中的key(Hadoop中的任何类型都可以作为key),形如<MyKeyPair,value>,相当于自定义key的类型。由于所有的key是可序列化并且可比较的,因此自定义的key需要实现接口WritableComparable。
与按照一个字段排序相比,本次二次排序需要自定义的地方如下:
- 自定义组合key类,需要实现WritableComparable接口。
- 自定义分区类,按照第一个字段进行分区,需要继承Partitioner类。
- 自定义分组类,按照第一个字段进行分组,需要继承WritableComparator类。
2. 编写程序
(1)自定义组合key类。
新建自定义组合key类MyKeyPair.java,该类需要实现Hadoop提供的org.apache.hadoop.io.WritableComparable接口,该接口继承了org.apache.hadoop.io.Writable接口和java.lang.Comparable接口,定义源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P103_12964.jpg?sign=1739370003-V1B3UUrIOcXaxxlxnU3GKS6aEL791aQi-0-7c5ae12f680162f83802534f21b532c1)
然后需要实现WritableComparable接口中的序列化方法write()、反序列化方法readFields()、比较方法compareTo()。write()方法用于将数据写入输出流;readFields()方法用于从输入流读取数据;compareTo()方法用于将两个对象进行比较,以便能够进行排序。
自定义组合key类MyKeyPair.java的源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P103_12901.jpg?sign=1739370003-Qp08jyPVrFzF1JiDpqTwmzME03z7Agl5-0-8fa231ae8e87b22dd6bd5336c0960225)
(2)自定义分区类。
新建自定义分区类MyPartitioner.java,该类需要继承Hadoop提供的org.apache.hadoop.mapreduce.Partitioner类,并实现其中的抽象方法getPartition()。Partitioner类是一个抽象泛型类,用于控制对Map任务输出结果的分区,泛型的两个参数分别表示<key,value>对中key的类型和value的类型。Partitioner类的源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P105_13463.jpg?sign=1739370003-1LJLzgX9KGyFchlUWIEH7ScvHTfhiVJg-0-815fe44bfb7992d10be6da8f72439db6)
关于MapReduce的分区规则可参考本章5.1.3节的MapReduce工作原理,此处不再赘述。
自定义分区类MyPartitioner.java的源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P105_13464.jpg?sign=1739370003-dI1FWEw0TlZiubXQOB1TsMCDsLo2BwuY-0-0091172694a6e44aae17a02cee5676d3)
上述代码继承Partitioner类的同时指定了<key,value>对中key的类型为MyKeyPair,value的类型为IntWritable。
(3)自定义分组类。
新建自定义分组类MyGroupComparator.java,该类需要继承Hadoop提供的org.apache.hadoop.io.WritableComparator类,并重写其中的compare()方法,以实现按照指定的字段进行分组。
自定义分组类MyGroupComparator.java的源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P106_13683.jpg?sign=1739370003-oKzG1i0fV0YeW7gLFz98NhLY0nMEHr5u-0-98010c3f5e5409602070a2ee960c6549)
上述代码首先通过构造方法指定了<key,value>对中key的类型为MyKeyPair,由于MapReduce默认以<key,value>对中的key值进行分组,因此接下来重写了compare()方法,实现了按照MyKeyPair对象中的first字段进行对比,若值相等则会将当前<key,value>对分为一组。
(4)定义Mapper类。
新建Mapper类MyMapper.java,实现将输入的数据封装为<MyKeyPair, IntWritable>形式的<key,value>对进行输出,即输出的key的类型为MyKeyPair,输出的value的类型为IntWritable。
Mapper类MyMapper.java的源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P106_13684.jpg?sign=1739370003-K1rkWgLb3JlC2eI8YqDC09L6z32myoOp-0-780bd15818d7833f0e0fa092fcb90932)
(5)定义Reducer类。
新建Reducer类MyReducer.java,将接收到的分组后的<key,value-list>对循环进行输出。
Reducer类MyReducer.java的源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P107_13944.jpg?sign=1739370003-wuOn95ttbAqnrKYLB8fxSm2pzy6SJK7h-0-ed2a1946296355afa3d9914d4f26ee73)
上述代码将MyKeyPair类型的key中的first字段值作为输出的key,输出的value从集合values中进行遍历。
(6)定义应用程序主类。
新建应用程序主类MySecondSortApp.java,在该类中需要指定自定义的分区类和分组类,同时需要显式设置Map任务输出的key和value的类型。
应用程序主类MySecondSortApp.java的源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P108_14198.jpg?sign=1739370003-fEDjyFl3uMb3cmCgRUDlYEhSc2topBA5-0-8abed130c154ab172e3a8e45beb041c4)
上述代码解析如下:
❶ 设置map()方法输出的key和value的类型。若将此省略,则默认采用❷中设置的输出类型。也就是说,若map()方法和reduce()方法的输出类型一致,可以省略对map()方法输出类型的设置。若map()方法和reduce()方法实际的输出类型与此处的设置不匹配,则程序运行过程中将会报错。
在MapReduce程序运行的过程中会通过JobConf类获取map()方法的输出类型,获取map()方法输出key的类型的源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P109_14351.jpg?sign=1739370003-q2HyyyISFAVjHN4qCx0HQ5iPVYIE1wuz-0-393a7316086e5ec9f4b1b7a7c8e1f524)
从上述源码可以看出,当没有设置map()方法的输出类型时,会调用getOutputKeyClass()方法使用reduce()方法的输出类型。
❸ 在执行MapReduce程序时,会首先从HDFS中读取数据块,然后按行拆分成<key,value>对,这个过程则是由TextInputFormat类完成的。TextInputFormat类继承了抽象类FileInputFormat<K,V>,而FileInputFormat<K, V>又继承了抽象类InputFormat<K, V>,抽象类InputFormat<K, V>中定义了两个方法:getSplits()和createRecordReader()。getSplits()方法负责将HDFS数据解析为InputSplit集合,createRecordReader()方法负责将一个InputSplit解析为一个<key,value>对记录。抽象类InputFormat<K, V>的源码如下:
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P109_14352.jpg?sign=1739370003-7epGoVhIQqwKgFFF8OqagLL57IJLVUs4-0-562c05fe08965c17f17c75eb0701f962)
3. 程序运行
程序的打包和执行参考前面的“单词计数”和“数据去重”案例,此处不再赘述。
执行完成后,查看执行结果,如图5-11所示。
![](https://epubservercos.yuewen.com/B7D09D/15825993105224906/epubprivate/OEBPS/Images/Figure-P110_14356.jpg?sign=1739370003-gs51g9Zieo2VUE5YDdHVCPHVHSF90Y2y-0-59fdb537ae2899ded9e50866a42d8e90)
图5-11 查看二次排序程序执行结果