欢迎关注大数据技术架构与案例微信公众号:过往记忆大数据
过往记忆博客公众号iteblog_hadoop
欢迎关注微信公众号:
过往记忆大数据

我在前面的文章介绍了MapReduce中两种全排序的方法及其实现。但是上面的两种方法都是有很大的局限性:

  • 方法一在数据量很大的时候会出现OOM问题;
  • 方法二虽然能够将数据分散到多个Reduce中,但是问题也很明显:我们必须手动地找到各个Reduce的分界点,尽量使得分散到每个Reduce的数据量均衡。而且每次修改Reduce的个数时,都得手动去找一次Key的分界点!非常不灵活。
  • 本文这里介绍的第三种使用MapReduce全局排序的方法算是比较通用了,而且是内置的实现。

    使用TotalOrderPartitioner进行全排序

    我们都知道Hadoop内置有个 HashPartitioner 分区实现类,MapReduce默认就是使用它;但其实Hadoop内置还有个名为 TotalOrderPartitioner 的分区实现类,看名字就清楚它其实就是解决全排序的问题。如果你去看他的实现,其主要做的事实际上和我们上文介绍的 IteblogPartitioner 分区实现类很类似,也就是根据Key的分界点将不同的Key发送到相应的分区。问题是,上文用到的分界点是我们人为计算的;而这里用到的分界点是由程序解决的!

    数据抽样

    寻找合适的Key分割点需要我们对数据的分布有个大概的了解;如果数据量很大的话,我们不可能对所有的数据进行分析然后选出 N-1 (N代表Reduce的个数)个分割点,最适合的方式是对数据进行抽样,然后对抽样的数据进行分析并选出合适的分割点。Hadoop提供了三种抽样的方法:

  • SplitSampler:从s个split中选取前n条记录取样
  • RandomSampler:随机取样
  • IntervalSampler:从s个split里面按照一定间隔取样,通常适用于有序数据
  • 这三个抽样都实现了K[] getSample(InputFormat inf, Job job) throws IOException, InterruptedException; 方法;通过调用这个方法我们可以返回抽样到的Key数组,除了 IntervalSampler 类返回的抽样Key是有序的,其他都无序。获取到采样好的Key数组之后,需要对其进行排序,然后选择好N-1 (N代表Reduce的个数)个分割点,最后将这些Key分割点存储到指定的HDFS文件中,存储的文件格式是SequenceFile,使用如下:

    TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2]));
    InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.01, 1000, 100);
    InputSampler.writePartitionFile(job, sampler);
    

    TotalOrderPartitioner

    上面通过 InputSampler.writePartitionFile(job, sampler); 存储好了分割点,然后 TotalOrderPartitioner 类会在 setConf 函数中读取这个文件,并根据Key的类型分别创建不同的数据结构:

  • 如果 Key 的类型是 BinaryComparableBytesWritableText ),并且 mapreduce.totalorderpartitioner.naturalorder 属性的指是 true ,则会构建trie 树,便于后面的查找;

    在计算机科学中,trie,又称前缀树或字典树,是一种有序树,用于保存关联数组,其中的键通常是字符串。与二叉查找树不同,键不是直接保存在节点中,而是由节点在树中的位置决定。一个节点的所有子孙都有相同的前缀,也就是这个节点对应的字符串,而根节点对应空字符串。一般情况下,不是所有的节点都有对应的值,只有叶子节点和部分内部节点所对应的键才有相关的值。(摘自:https://zh.wikipedia.org/wiki/Trie

  • 其他情况会构建一个 BinarySearchNode,用二分查找
  • 最后程序通过调用 getPartition 函数决定当前Key应该发送到哪个Reduce中:

    public int getPartition(K key, V value, int numPartitions) {
        return partitions.findPartition(key);
    }
    

    程序实现

    下面是使用 TotalOrderPartitioner 类进行全局排序的完整代码:

    package com.iteblog.mapreduce.sort;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.mapreduce.lib.partition.InputSampler;
    import org.apache.hadoop.mapreduce.lib.partition.TotalOrderPartitioner;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import java.io.IOException;
    
    public class TotalSortV3 extends Configured implements Tool {
        static class SimpleMapper extends Mapper<Text, Text, Text, IntWritable> {
            @Override
            protected void map(Text key, Text value,
                               Context context) throws IOException, InterruptedException {
                IntWritable intWritable = new IntWritable(Integer.parseInt(key.toString()));
                context.write(key, intWritable);
            }
        }
    
        static class SimpleReducer extends Reducer<Text, IntWritable, IntWritable, NullWritable> {
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values,
                                  Context context) throws IOException, InterruptedException {
                for (IntWritable value : values)
                    context.write(value, NullWritable.get());
            }
        }
    
        public static class KeyComparator extends WritableComparator {
            protected KeyComparator() {
                super(Text.class, true);
            }
    
            @Override
            public int compare(WritableComparable w1, WritableComparable w2) {
                int v1 = Integer.parseInt(w1.toString());
                int v2 = Integer.parseInt(w2.toString());
    
                return v1 - v2;
            }
        }
    
        @Override
        public int run(String[] args) throws Exception {
            Configuration conf = getConf();
            conf.set("mapreduce.totalorderpartitioner.naturalorder", "false");
            Job job = Job.getInstance(conf, "Total Order Sorting");
            job.setJarByClass(TotalSortV3.class);
            job.setInputFormatClass(KeyValueTextInputFormat.class);
            job.setSortComparatorClass(KeyComparator.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            job.setNumReduceTasks(3);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(NullWritable.class);
    
            TotalOrderPartitioner.setPartitionFile(job.getConfiguration(), new Path(args[2]));
            InputSampler.Sampler<Text, Text> sampler = new InputSampler.RandomSampler<>(0.01, 1000, 100);
            InputSampler.writePartitionFile(job, sampler);
    
            job.setPartitionerClass(TotalOrderPartitioner.class);
            job.setMapperClass(SimpleMapper.class);
            job.setReducerClass(SimpleReducer.class);
    
            job.setJobName("iteblog");
    
            return job.waitForCompletion(true) ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new TotalSortV3(), args);
            System.exit(exitCode);
        }
    }
    

    运行程序

    [iteblog@www.iteblog.com /home/iteblog]$ hadoop jar total-sort-0.1.jar com.iteblog.mapreduce.sort.TotalSortV3 /user/iteblog/input /user/iteblog/output /user/iteblog/partitions
    
    ##生成的 Key 分割点
    [iteblog@www.iteblog.com ~]$ hadoop fs -text /user/iteblog/partitions
    10978 (null)
    21611 (null)
    
    [iteblog@www.iteblog.com ~]$ hadoop fs -ls /user/iteblog/output/
    Found 4 items
    -rw-r--r--   3 iteblog supergroup          0 2017-05-09 16:56 /user/iteblog/output/_SUCCESS
    -rw-r--r--   3 iteblog supergroup     335923 2017-05-09 16:56 /user/iteblog/output/part-r-00000
    -rw-r--r--   3 iteblog supergroup     388362 2017-05-09 16:56 /user/iteblog/output/part-r-00001
    -rw-r--r--   3 iteblog supergroup     407472 2017-05-09 16:56 /user/iteblog/output/part-r-00002
    
    [iteblog@www.iteblog.com ~]$ hadoop fs -text /user/iteblog/output/part-r-00000 | head -n 10
    0
    0
    0
    1
    1
    1
    1
    1
    1
    
    [iteblog@www.iteblog.com ~]$ hadoop fs -text /user/iteblog/output/part-r-00000 | tail -n 10
    10976
    10976
    10976
    10977
    10977
    10977
    10977
    10977
    10977
    10977
    
    [iteblog@www.iteblog.com ~]$ hadoop fs -text /user/iteblog/output/part-r-00001 | head -n 10
    10978
    10978
    10978
    10978
    10978
    10978
    10978
    10978
    10978
    10978
    
    [iteblog@www.iteblog.com ~]$ hadoop fs -text /user/iteblog/output/part-r-00001 | tail -n 10
    21609
    21609
    21609
    21610
    21610
    21610
    21610
    21610
    21610
    21610
    
    [iteblog@www.iteblog.com ~]$ hadoop fs -text /user/iteblog/output/part-r-00002 | head -n 10
    21611
    21611
    21611
    21611
    21611
    21611
    21611
    21611
    21611
    21611
    
    [iteblog@www.iteblog.com ~]$ hadoop fs -text /user/iteblog/output/part-r-00002 | tail -n 10
    32766
    32766
    32766
    32766
    32767
    32767
    32767
    32767
    32767
    32767
    

    注意事项

    1、我们这里使用的 InputFormat 类是 KeyValueTextInputFormat ,而不是 TextInputFormat 。因为采样是对Key进行的,而 TextInputFormat 的 Key 是偏移量,这样的采样结果是无意义的;而如果使用 KeyValueTextInputFormat 作为输入类型,则可以将数据存放在 Key 中,从而得到正确的采样结果。

    2、我们 map 输出 Key 的类型是 Text ,这是没办法的,因为 InputSampler.writePartitionFile 函数实现的原因,必须要求 map 输入和输出 Key 的类型一致,否则会出现如下的异常:

    Exception in thread "main" java.io.IOException: wrong key class: org.apache.hadoop.io.Text is not class org.apache.hadoop.io.LongWritable
      at org.apache.hadoop.io.SequenceFile$RecordCompressWriter.append(SequenceFile.java:1380)
      at com.iteblog.mapreduce.sort.TotalSortV3.writePartitionFile(TotalSortV3.java:106)
      at com.iteblog.mapreduce.sort.TotalSortV3.run(TotalSortV3.java:47)
      at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:70)
      at org.apache.hadoop.util.ToolRunner.run(ToolRunner.java:84)
      at com.iteblog.mapreduce.sort.TotalSortV3.main(TotalSortV3.java:73)
      at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
      at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
      at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
      at java.lang.reflect.Method.invoke(Method.java:606)
      at org.apache.hadoop.util.RunJar.main(RunJar.java:212)
    
    本博客文章除特别声明,全部都是原创!
    原创文章版权归过往记忆大数据(过往记忆)所有,未经许可不得转载。
    本文链接: 【三种方法实现Hadoop(MapReduce)全局排序(2)】(https://www.iteblog.com/archives/2147.html)
    喜欢 (20)
    分享 (0)
    发表我的评论
    取消评论

    表情
    本博客评论系统带有自动识别垃圾评论功能,请写一些有意义的评论,谢谢!
    (14)个小伙伴在吐槽
    1. 为何运行结果是字典序?哪里有问题?找了好久了,望赐教。

      哲别2018-06-05 13:05 回复
    2. 代码片段能提供吗

      w3970907702018-06-05 14:04 回复
    3. 代码就是本文中的代码,没有修改。输入是一个数值一行的data1和data2文件,输出结果是3个文件没错,每个文件都有序,但是好像是字典序。

      大米粥2018-06-05 14:21 回复
    4. job.setSortComparatorClass(KeyComparator.class);
      这一句你用了吗?

      w3970907702018-06-05 14:24 回复
    5. https://pan.baidu.com/s/1KZB6b23ojP5MtiL7PTTFDQ,麻烦您看看,谢谢你的帮助。

      大米粥2018-06-05 14:30
  • 您结果只看前10行、后10行,是发现不了这个问题的。

    大米粥2018-06-05 14:24 回复
  • 原因找到了,我上面的代码忘记加上一个配置 conf.set("mapreduce.totalorderpartitioner.naturalorder", "false");
    你把上面这行代码加到你工程里面去,然后再运行,结果就是按照数字的大小顺序全排序的。

    w3970907702018-06-07 14:15 回复
  • 我要先学习一下这个参数的意思,感谢感谢!

    大米粥2018-06-07 14:19
  • 有空麻烦您帮着看看吧,感谢。我把源码和相关文件都放到网盘里了。

    大米粥2018-06-05 15:39 回复
  • 好的,我晚点时间看下。

    w3970907702018-06-05 16:35 回复
  • 你好,也许你很忙,真是不好意思又打扰你。这个代码确实要用,因此我关注了好久。能否麻烦你有空给看看问题出在什么地方?非常感谢。

    大米粥2018-06-06 23:48
  • 你文件能发到我邮箱嘛?我百度网盘进不去。我邮箱:wyphao.2007@163.com

    w3970907702018-06-07 11:24 回复
  • 邮件已发。

    大米粥2018-06-07 12:25
  • 为何运行结果是字典序?哪里有问题?找了好久了,望赐教。

    哲别2018-06-05 13:05 回复