Mapreduce的序列化和流量统计程序开发
一、Hadoop数据序列化的数据类型
Java数据类型 => Hadoop数据类型
SRE实战 互联网时代守护先锋,助力企业售后服务体系运筹帷幄!一键直达领取阿里云限量特价优惠。int IntWritable
float FloatWritable
long LongWritable
double DoubleWritable
String Text
boolean BooleanWritable
byte ByteWritable
map MapWritable
array ArrayWritable
二、Hadoop的序列化
1.什么是序列化?
在java中,序列化接口是Serializable,它下面又实现了很多的序列化接口,所以java的序列化是一个重量级的序列化框架,一个对象被java序列化之后会附带很多额外的信息(校验信息、header、继承体系等),不便于在网络中进行高效的传输,所以Hadoop开发了一套自己的序列化框架——Writable。
序列化就是把内存当中的对象,转化为字节序列以便于存储和网络传输;
反序列化是将收到的字节序列或硬盘当中的持续化数据,转换成内存中的对象。
2.序列化的理解方法(自己悟的,不对勿喷~~)
比如下面流量统计案例中,流量的封装类FlowBean实现了Writable接口,其中定义了变量upFlow、dwFlow、flowSum;
在Mapper和Reducer类中初始化封装类FlowBean时,内存会分配空间加载这些对象,而这些对象不便于在网络中高效的传输,这是封装类FlowBean中的序列化方法将这些对象转换为字节序列,方便了存储和传输;
当Mapper或Reducer需要将这些对象的字节序列写出到磁盘时,封装类FlowBean中的反序列化方法将字节序列转换为对象,然后写道磁盘中。
3.序列化特点
序列化与反序列化时分布式数据处理当中经常会出现的,比如hadoop通信是通过远程调用(rpc)实现的,这个过程就需要序列化。
特点:1)紧凑;
2)快速
3)可扩展
4)可互操作
三、Mapreduce的流量统计程序案例
1.代码
/** * @author: PrincessHug * @date: 2019/3/23, 23:38 * @Blog: https://www.cnblogs.com/HelloBigTable/ */ public class FlowBean implements Writable { private long upFlow; private long dwFlow; private long flowSum; public long getUpFlow() { return upFlow; } public void setUpFlow(long upFlow) { this.upFlow = upFlow; } public long getDwFlow() { return dwFlow; } public void setDwFlow(long dwFlow) { this.dwFlow = dwFlow; } public long getFlowSum() { return flowSum; } public void setFlowSum(long flowSum) { this.flowSum = flowSum; } public FlowBean() { } public FlowBean(long upFlow, long dwFlow) { this.upFlow = upFlow; this.dwFlow = dwFlow; this.flowSum = upFlow + dwFlow; } /** * 序列化 * @param out 输出流 * @throws IOException */ @Override public void write(DataOutput out) throws IOException { out.writeLong(upFlow); out.writeLong(dwFlow); out.writeLong(flowSum); } /** * 反序列化 * @param in * @throws IOException */ @Override public void readFields(DataInput in) throws IOException { upFlow = in.readLong(); dwFlow = in.readLong(); flowSum = in.readLong(); } @Override public String toString() { return upFlow + "\t" + dwFlow + "\t" + flowSum; } } public class FlowCountMapper extends Mapper<LongWritable, Text,Text,FlowBean> { @Override protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException { //获取数据 String line = value.toString(); //切分数据 String[] fields = line.split("\t"); //封装数据 String phoneNum = fields[1]; long upFlow = Long.parseLong(fields[fields.length - 3]); long dwFlow = Long.parseLong(fields[fields.length - 2]); //发送数据 context.write(new Text(phoneNum),new FlowBean(upFlow,dwFlow)); } } public class FlowCountReducer extends Reducer<Text,FlowBean,Text,FlowBean> { @Override protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException { //聚合数据 long upFlow_sum = 0; long dwFlow_sum = 0; for (FlowBean f:values){ upFlow_sum += f.getUpFlow(); dwFlow_sum += f.getDwFlow(); } //发送数据 context.write(key,new FlowBean(upFlow_sum,dwFlow_sum)); } } public class FlowPartitioner extends Partitioner<Text,FlowBean> { @Override public int getPartition(Text key, FlowBean value, int i) { //获取用来分区的电话号码前三位 String phoneNum = key.toString().substring(0, 3); //设置分区逻辑 int partitionNum = 4; if ("135".equals(phoneNum)){ return 0; }else if ("137".equals(phoneNum)){ return 1; }else if ("138".equals(phoneNum)){ return 2; }else if ("139".equals(phoneNum)){ return 3; } return partitionNum; } } public class FlowCountDriver { public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException { //获取配置,定义工具 Configuration conf = new Configuration(); Job job = Job.getInstance(); //设置运行类 job.setJarByClass(FlowCountDriver.class); //设置Mapper类及Mapper输出数据类型 job.setMapperClass(FlowCountMapper.class); job.setMapOutputKeyClass(Text.class); job.setMapOutputValueClass(FlowBean.class); //设置Reducer类及其输出数据类型 job.setReducerClass(FlowCountReducer.class); job.setOutputKeyClass(Text.class); job.setOutputValueClass(FlowBean.class); //设置自定义分区 job.setPartitionerClass(FlowPartitioner.class); job.setNumReduceTasks(5); //设置文件输入输出流 FileInputFormat.setInputPaths(job,new Path("G:\\mapreduce\\flow\\in")); FileOutputFormat.setOutputPath(job,new Path("G:\\mapreduce\\flow\\inpartitionout")); //返回运行完成 if (job.waitForCompletion(true)){ System.out.println("运行完毕!"); }else { System.out.println("运行出错!"); } } }