文库网
关注排行榜

当前无数据...

马上注册,结交更多好友,享用更多功能,让你轻松玩转社区。

您需要 登录 才可以下载或查看,没有帐号?立即注册

x

在之前的博客《MapReduce系列(5) | MapReduce任务流程和shuffle机制的简单解析》,博主为大家分享了MapReduce的整体计算任务流程以及shuffle阶段主要的作用。本篇博客博主分享的是Shuffle之Partition分区详解。

目录


一. Shuffle之Partition分区

  今天我们讲的第五步,Partition分区操作。

{tilte}-云竹

  Partition分区:按照一定的分区规则,将key value的list进行分区。分区的创建分为默认的和自定义两种。

1.1. 默认Partition分区

  1. <code class="prism language-java">public class HashPartitioner<K,V> extends Partitioner<K,V>{
  2. public int getPartition(K key,V value, int numReduceTasks){
  3. return(key.hashCode()& Integer.MAX_VALUE) % numReduceTasks;
  4. }
  5. }
  6. </code>
复制代码

  默认分区是根据key的hashCode对ReduceTasks个数取模得到的。用户没法控制哪个key存储到哪个分区。既然用户不能控制存储到某个区间,能自定义么,答案是可以的。

1.2. 自定义Partitioner

  • 1. 自定义类继承Partitioner,重写getPartition()方法
  1. <code class="prism language-java">public class CustomPartitioner extends Partitioner<Text, FlowBean>{
  2. @Override
  3. public int getPartition(Text key,FlowBean value, int numReduceTasks){
  4. // 控制分区代码逻辑
  5. ... ...
  6. return Partition;
  7. }
  8. }
  9. </code>
复制代码
  • 2. 在Job驱动中,设置自定义Partitioner

job.setPartitionerClass(CustomPartitioner.class);

  • 3. 自定义Partition后,要根据自定义Partitioner的逻辑设置相应数量的ReduceTask

job.setNumReduceTasks(5);

1.3. 分区总结

  1. 如果ReduceTask的数量>getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;
  2. 如果1
  3. 如果ReduceTask的数量=1,则不管MapTask端输出多少个分区文件,最终结果都交给这一个ReduceTask,最终也就只会产生一个结果文件part-r-00000;
  4. 分区号必须从零开始,逐一累加。

1.4. 案例的简单分析

  假设自定义分区数为5,则

  1. job.setNumReduceTasks(1); 会正常运行,只不过会产生一个输出文件
  2. job.setNumReduceTasks(2); 会报错
  3. job.setNumReduceTasks(6); 大于5,程序会正常运行,会产生空文件

二. 案例分析

  案例继续采用《MapReduce系列(4) | Hadoop序列化》中的文档,有需要文档的可以到此章自行复制保存。

2.1. 需求

将统计结果按照手机归属地不同省份输出到不同文件中(分区)

  • 1. 输入数据

{tilte}-云竹

  • 2. 期望输出数据

  手机号136、137、138、139开头都分别放到一个独立的4个文件中,其他开头的放到一个文件中。

2.2. 需求分析

{tilte}-云竹

2.3 代码实现

  • 1. 在系列的基础上,增加一个分区类MyPartitioner
  1. <code class="prism language-java">package com.buwenbuhuo.partition;
  2. import com.buwenbuhuo.flowsun.FlowBean;
  3. import org.apache.hadoop.io.Text;
  4. import org.apache.hadoop.mapreduce.Partitioner;
  5. /**
  6. * @author 卜温不火
  7. * @create 2020-04-23 20:27
  8. * com.buwenbuhuo.partition - the name of the target package where the new class or interface will be created.
  9. * mapreduce0422 - the name of the current project.
  10. */
  11. public class MyPartitioner extends Partitioner<Text, FlowBean> {
  12. @Override
  13. public int getPartition(Text text, FlowBean flowBean, int numPartitions) {
  14. String phone = text.toString();
  15. switch (phone.substring(0, 3)) {
  16. case "136":
  17. return 0;
  18. case "137":
  19. return 1;
  20. case "138":
  21. return 2;
  22. case "139":
  23. return 3;
  24. default:
  25. return 4;
  26. }
  27. }
  28. }
  29. </code>
复制代码
  • 2. 在驱动函数中增加自定义数据分区设置和ReduceTask设置
  1. <code class="prism language-java">package com.buwenbuhuo.partition;
  2. /**
  3. * @author 卜温不火
  4. * @create 2020-04-23 14:14
  5. * com.buwenbuhuo.flowsun - the name of the target package where the new class or interface will be created.
  6. * mapreduce0422 - the name of the current project.
  7. */
  8. import com.buwenbuhuo.flowsun.FlowBean;
  9. import com.buwenbuhuo.flowsun.FlowDriver;
  10. import com.buwenbuhuo.flowsun.FlowMapper;
  11. import com.buwenbuhuo.flowsun.FlowReducer;
  12. import org.apache.hadoop.conf.Configuration;
  13. import org.apache.hadoop.fs.Path;
  14. import org.apache.hadoop.io.Text;
  15. import org.apache.hadoop.mapreduce.Job;
  16. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
  17. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
  18. import java.io.IOException;
  19. public class PartitionerDriver {
  20. public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
  21. // 1 获取job实例
  22. Job job = Job.getInstance(new Configuration());
  23. // 2.设置类路径
  24. job.setJarByClass(PartitionerDriver.class);
  25. // 3 指定本业务job要使用的mapper/Reducer业务类
  26. job.setMapperClass(FlowMapper.class);
  27. job.setReducerClass(FlowReducer.class);
  28. // 8 指定自定义数据分区
  29. job.setPartitionerClass(MyPartitioner.class);
  30. // 9 同时指定相应数量的reduce task
  31. job.setNumReduceTasks(5);
  32. // 4 指定mapper输出数据的kv类型
  33. job.setMapOutputKeyClass(Text.class);
  34. job.setMapOutputValueClass(FlowBean.class);
  35. // 5 指定最终输出的数据的kv类型
  36. job.setOutputKeyClass(Text.class);
  37. job.setOutputValueClass(FlowBean.class);
  38. // 6 指定job的输入原始文件所在目录
  39. FileInputFormat.setInputPaths(job, new Path("d:\\input"));
  40. FileOutputFormat.setOutputPath(job, new Path("d:\\output"));
  41. // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
  42. boolean result = job.waitForCompletion(true);
  43. System.exit(result ? 0 : 1);
  44. }
  45. }
  46. </code>
复制代码

2.4 运行并查看结果

  • 1. 运行
    {tilte}-云竹
  • 2. 查看
    {tilte}-云竹

本期的分享就到这里了,小伙伴们有什么疑惑或好的建议可以积极在评论区留言~,博主会持续更新新鲜好玩的技术,喜欢的小伙伴们不要忘了点赞,记得要关注博主呐ヾ(◍°∇°◍)ノ゙。


本站资源均由网上搜集或网友上传提供,内容仅供观摩学习交流之用,本站将不对任何资源负法律责任.如有侵犯您的版权,请及时联系我们(邮箱:892481490@qq.com,客服QQ:892481490),我们会尽快处理!QQ350550790是骗子,注意不要和他交易!!!
发帖求助前要善用【论坛搜索】功能, 那里可能会有你要找的答案,也能为你节约不少学习时间;
如何回报帮助你解决问题的坛友,好办法就是点击帖子下方的评分按钮给对方加(威望)和(贡献)而不会扣除自己的积分。
如发现灌水帖、病毒木马帖、广告帖、工具不能正常使用、网盘链接失效,请点击【举报】 核实有几率会给予额外的B币奖励哦!
    您需要登录后才可以回帖 登录 | 立即注册

    本版积分规则

    发布资源 快速回复 返回列表 客服中心 官方QQ群

    QQ|小黑屋|手机版|编程之家论坛 ( 桂ICP备18002029号 )

    Powered by 编程之家  © 20019-2021