博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
Hadoop MapReduce之PartitionerClass
阅读量:4094 次
发布时间:2019-05-25

本文共 5808 字,大约阅读时间需要 19 分钟。

一、引入PartitionerClass

      之前借助WordCount程序,对MapReduce的原理有了一定了叙述:

      Map和Reduce是完成数据处理的两个先后步骤:Map函数将数据读入,做切分处理之后,以key、value的键值对传送出去,Reduce接收此键值对,进行统计处理。在Map和Reduce之间还有一个中间过程,对数据进行一些处理,今天就来窥探其一-----PartitionerClass。

        MapReduce程序每个Reduce可以产生一个输出文件,Partitioner 组件可以对 Map处理后的键值对 按Key值 进行分区,从而将不同分区的 Key 交由不同的 Reduce 处理。在MapReduce程序中可以在主函数中动态地设置Reduce数量。使用语句job.setNumReduceTasks(num); 之间在调用Job类的成员函数即可,传入参数即为需要设置的Reduce数量。下面看一下源码中关于此函数的定义:

/**   * Set the number of reduce tasks for the job.   * @param tasks the number of reduce tasks   * @throws IllegalStateException if the job is submitted   */  public void setNumReduceTasks(int tasks) throws IllegalStateException {    ensureState(JobState.DEFINE);    conf.setNumReduceTasks(tasks);  }
 

二、设置Reduce数量(默认PartitionerClass的作用)

         下面通过实例来说明PartitionerClass以及setNumReduce的使用。

          例子:手机用户数据流量统计

        通过用户上网日志对用户的流量使用情况进行统计,数据格式如下,每一项使用"\t"分割,每行数据第2列为用户手机号,最后两列分别为用户的上传流量和下载流量。

1、程序即输出结果

使用MapReduce进行数据处理,并在main主函数中设置Reduce数量,程序和执行结果如下:

package ypx.datacount;import java.io.IOException;import java.util.HashMap;import java.util.Map;import org.apache.hadoop.conf.Configuration;import org.apache.hadoop.fs.FileSystem;import org.apache.hadoop.fs.Path;import org.apache.hadoop.io.LongWritable;import org.apache.hadoop.io.Text;import org.apache.hadoop.mapreduce.Job;import org.apache.hadoop.mapreduce.Mapper;import org.apache.hadoop.mapreduce.Partitioner;import org.apache.hadoop.mapreduce.Reducer;import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;public class DataCount {		public static class DCMapper extends Mapper
{ @Override protected void map(LongWritable k1, Text v1, Context context) throws IOException, InterruptedException { String [] lines = v1.toString().split("\t"); //按行读入数据,然后用“\t”分割成字符串数组,存入lines //此时每行数据的每一项都被存入了lines字符串组,下面语句取出需要的数据项,本程序中为第 1、8、9项 //取出数据后封装在类DataBean中,作为map的value值输出 String telNo = lines[1]; Long upPayLoad = Long.parseLong(lines[8]); Long downPayLoad = Long.parseLong(lines[9]); //词句DataBean实例化对象语句可以写在map函数之前,这样可以避免每次map函数 //处理数据时都产生bean对像,造成内存垃圾 DataBean bean = new DataBean(telNo,upPayLoad,downPayLoad); //context.write(new Text(telNo),new DataBean(telNo,upPayLoad,downPayLoad)); context.write(new Text(telNo), bean); } } public static class DCReducer extends Reducer
{ @Override protected void reduce(Text k2, Iterable
v2s, Context context) throws IOException, InterruptedException { long up_sum = 0; //定义上行流量总和变量 long down_sum = 0; //定义下行流量总和变量 //Redecu函数的功能实现部分,循环取出DataBean类数组中的对象,对每个对象进行操作 for (DataBean dataBean : v2s) { up_sum += dataBean.getUpPayLoad(); down_sum += dataBean.getDownPayLoad(); } String telno = k2.toString(); context.write(k2, new DataBean(telno,up_sum,down_sum)); } } final static String INPUT_PATH = "hdfs://192.168.42.130:9000/input/DataCount"; final static String OUTPUT_PATH = "hdfs://192.168.42.130:9000/output/DataCount"; public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { Configuration conf = new Configuration(); Path path = new Path(OUTPUT_PATH); FileSystem fileSystem = path.getFileSystem(conf); if (fileSystem.exists(new Path(OUTPUT_PATH))) { fileSystem.delete(new Path(OUTPUT_PATH),true); } Job job = Job.getInstance(conf); job.setJarByClass(DataCount.class); job.setMapperClass(DCMapper.class); job.setReducerClass(DCReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(DataBean.class); FileInputFormat.setInputPaths(job, new Path(INPUT_PATH)); FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));  //********设置Reduce数量*************// job.setNumReduceTasks(5); //*****************************//  job.waitForCompletion(true); }}

        通过浏览器可以看到,在hdfs的输出文件夹中,输出文件分成了5部分,这5部分的大小不一样,程序把map输出的键值对按照默认的方法分成5部分,即HashPartitioner类,把键值对通过key值进行哈希映射,分为5种不同的映射值,分发给不同的Reduce进行处理,最后输出到不同的文件中。

2、HashPartitioner类源码

public class HashPartitioner
extends Partitioner
{ /** Use {@link Object#hashCode()} to partition. */ public int getPartition(K key, V value, int numReduceTasks) { return (key.hashCode() & Integer.MAX_VALUE) % numReduceTasks; }}
     
可以看到函数返回值为int型,即为通过hash函数计算后得到的值。这里的numReduceTasks就是程序中自己设置的Reduce数量,此程序中值为5,对numReduceTasks求余也是为了将所有数据映射到5个最终的输出文件中。

三、自定义类(重载PartitionerClass

      如果想要按照自己的意图对map处理后的键值对进行分区,则必须按照自己的业务逻辑重写Partitioner类。这里按手机号的前三位将数据分为4个区,重写代码如下:

  

/*==========================================*/	public static class ServicePreviderPartitioner extends Partitioner
{ //定义映射表,用来给随后代码写映射关系 private static Map
providerMap = new HashMap
(); //静态块 static { providerMap.put("135", 1); providerMap.put("136", 1); providerMap.put("137", 1); providerMap.put("138", 1); providerMap.put("139", 1); providerMap.put("150", 2); providerMap.put("159", 2); providerMap.put("183", 3); providerMap.put("182", 3); } @Override public int getPartition(Text key, DataBean value, int numPartitions) { String account = key.toString();            String sub_acc = account.substring(0, 3); //取出手机号的前三位            Integer code = providerMap.get(sub_acc); if (code == null) { code = 0; } // 返回一个int类型,为分区号 return code; } } /*==========================================*/
           重写的
Partitioner类中,返回值为int型的code,即数据的分区号。按手机号的前三位将数据分为4个区(第四种为其他类型,code值为0),按照此方法的输出文件如下:

         可以看到输出文件还是5个,这是由于主函数中Reduce的数量定义为5。因为自定义的Partitioner类只有4个分区,所以这里只有四个文件是有内容的,最后一个文件大小为0B。也可以看到所有的文件大小与之前按照默认分区方法(HashPartitioner)得到的文件大小均不相同。

四、其他 Partitioner

查看 Partitioner 的 API 可以看到 Partitioner 的 4 个实现类:

BinaryPartitioner, HashPartitioner, KeyFieldBasedPartitioner, TotalOrderPartitioner
你可能感兴趣的文章
realsense-ros里里程计相关代码
查看>>
似乎写个ROS功能包并不难,你会订阅话题发布话题,加点逻辑处理,就可以写一些基础的ROS功能包了。
查看>>
PX4官方用户和开发手册的首页面是会给你选择英文和中文的
查看>>
博士的申请考核制
查看>>
找到了中文版的mavlink手册
查看>>
浅谈飞控开发的仿真功能
查看>>
我觉得在室内弄无人机开发装个防撞机架还是很有必要的,TBUS就做得很好。
查看>>
serial也是见到很多次了,似乎它就是一种串行通信协议
查看>>
TBUS的一些信息
查看>>
专业和业余的区别就在于你在基础在基本功打磨练习花的时间
查看>>
通过mavlink实现自主航线的过程笔记
查看>>
Ardupilot飞控Mavlink代码学习
查看>>
这些网站有一些嵌入式面试题合集
查看>>
我觉得刷题是有必要的,不然小心实际被问的时候懵逼,我觉得你需要刷个50份面试题。跟考研数学疯狂刷卷子一样!
查看>>
我觉得嵌入式面试三要素:基础吃透+项目+大量刷题,缺一不可。不刷题是不行的。而且得是大量刷,刷出感觉套路,别人做题都做得是固定题型套路条件反射了,你还在那慢慢理解慢慢推是不行的,也是考研的教训。
查看>>
React Native之原理浅析
查看>>
Git操作清单
查看>>
基础算法
查看>>
前端面试
查看>>
React Hooks 完全指南
查看>>