0
点赞
收藏
分享

微信扫一扫

【hadoop】 3005-hadoop对象序列化编码

诗远 2023-05-17 阅读 54


一、hadoop序列化操作

Writable接口, 是根据  DataInput  和  DataOutput  实现的简单、有效的序列化对象





MR的任意Key和Value必须实现Writable接口 .


MR的任意key必须实现WritableComparable接口












二、自定义Writable,实现MapReduce程序



1、需求内容



日期                         手机号             流量1          流量2



20150403            1380013800           201                 51



20150403            1390013800           202                 52



20150403            1370013800           203                 54



20150403            1360013800           204                 55



统计  每个用户、上行流量、下行流量、总流量





2、程序实现代码



package serializable;
  

 import java.io.DataInput;
  
 import java.io.DataOutput;
  
 import java.io.IOException;
  

 import org.apache.hadoop.io.Writable;
  

 public class FlowBean implements Writable{
  
     
  
      private String mobileNumber;
  
      private long upFlow;
  
      private long downFlow;
  
     
  
      public FlowBean() {
  
      }
  
      public FlowBean(long upFlow, long downFlow,String mobileNumber) {
  
 
 
  
           this.mobileNumber = mobileNumber;
  
           this.upFlow = upFlow;
  
           this.downFlow = downFlow;
  
      }
  

      @Override
  
      public void write(DataOutput out) throws IOException {
  
          
  
           out.writeUTF(mobileNumber);
  
           out.writeLong(upFlow);
  
           out.writeLong(downFlow);
  
      }
  

      @Override
  
      public void readFields(DataInput in) throws IOException {
  
          
  
           this.mobileNumber = in.readUTF();
  
           this.upFlow = in.readLong();
  
           this.downFlow = in.readLong();
  
      }
  

      public String getMobileNumber() {
  
           return mobileNumber;
  
      }
  

      public void setMobileNumber(String mobileNumber) {
  
           this.mobileNumber = mobileNumber;
  
      }
  

      public long getUpFlow() {
  
           return upFlow;
  
      }
  

      public void setUpFlow(long upFlow) {
  
           this.upFlow = upFlow;
  
      }
  

      public long getDownFlow() {
  
           return downFlow;
  
      }
  

      public void setDownFlow(long downFlow) {
  
           this.downFlow = downFlow;
  
      }
  
      @Override
  
      public String toString() {
  
           return  this.upFlow + "\t" + this.downFlow + "\t" + (this.upFlow + this.downFlow);
  
      }
  
 }     
   

 
 

 
 

 
 

 
 
package 
 serializable;
 

 
import 
 java.io.IOException;
 
import 
 java.net.URI;
 

 
import 
 org.apache.commons.lang.StringUtils;
 
import 
 org.apache.hadoop.conf.Configuration;
 
import 
 org.apache.hadoop.fs.FileSystem;
 
import 
 org.apache.hadoop.fs.Path;
 
import 
 org.apache.hadoop.io.LongWritable;
 
import 
 org.apache.hadoop.io.Text;
 
import 
 org.apache.hadoop.mapreduce.Job;
 
import 
 org.apache.hadoop.mapreduce.Mapper;
 
import 
 org.apache.hadoop.mapreduce.Reducer;
 
import 
 org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 
import 
 org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
 

 
public 
 class 
 FlowSumRunner {
 
                
 
                 
 private 
 static 
 final 
 String 
 HDFS_PATH 
 = 
 "hdfs://cloud01:9000" 
 ;
 
                
 
                 
 public 
 static 
 class 
 FlowSumMapper 
 extends 
 Mapper<LongWritable, Text, Text, FlowBean> {
 

 
                                FlowBean 
 flowBean 
 = 
 null
 ;
 
                                 
 @Override
 
                                 
 protected 
 void 
 map(LongWritable 
 key
 , Text 
 value
 , Context 
 context 
 )
 
                                                                 
 throws 
 IOException, InterruptedException {
 
                                                
 
                                                String[] 
 values 
 = StringUtils.split( 
 value
 .toString(), 
 "\t" 
 );
 
//根据实际位置调整         
 
                                                String 
 mobileNumber 
 = 
 values 
 [1];
 
                                                 
 long 
 upFlow 
 = 
 new 
 Long( 
 values
 [7]);
 
                                                 
 long 
 downFlow 
 = 
 new 
 Long( 
 values
 [8]);
 
                                    
 flowBean 
 = 
 new 
 FlowBean(
 upFlow
 , 
 downFlow
 ,
 mobileNumber 
 );
 
                                    
 context
 .write(
 new 
 Text(
 mobileNumber
 ), 
 flowBean
 );
 
                                }
 
                }
 
                
 
                 
 public 
 static 
 class 
 FlowSumReducer 
 extends 
 Reducer<Text, FlowBean,Text, FlowBean>{
 
                                
 
                                FlowBean 
 v 
 = 
 new 
 FlowBean();
 
                                 
 @Override
 
                                 
 protected 
 void 
 reduce(Text 
 k
 , Iterable<FlowBean> 
 values
 ,Context 
 context 
 )
 
                                                                 
 throws 
 IOException, InterruptedException {
 
                                                
 
                                                 
 long 
 sumUpFlow 
 = 0;
 
                                                 
 long 
 sumDownFlow 
 = 0;
 
                                                
 
                                                 
 for
 (FlowBean 
 value 
 :
 values
 ){
 
                                                                 
 sumUpFlow 
 += 
 value 
 .getUpFlow();
 
                                                                 
 sumDownFlow 
 += 
 value 
 .getDownFlow();
 
                                                }
 
                                                 
 v
 .setUpFlow(
 sumUpFlow 
 );
 
                                                 
 v
 .setDownFlow(
 sumDownFlow 
 );
 
                                                 
 context
 .write(
 k 
 , 
 v
 );
 
                                }
 
                }
 
                
 
                
 
                 
 public 
 static 
 void 
 main(String[] 
 args
 ) {
 

 
                                Configuration 
 conf 
 = 
 new 
 Configuration();
 
                                 
 try 
 {
 
                                                Job 
 job 
 = Job.getInstance( 
 conf
 );
 
                                                 
 job
 .setJarByClass(FlowSumRunner.
 class
 );
 
                                                 
 job
 .setJar(
 "flowSumJob.jar" 
 );
 

 
                                                 
 job
 .setMapperClass(FlowSumMapper.
 class
 );
 
                                                 
 job
 .setReducerClass(FlowSumReducer.
 class
 );
 

 
                                                 
 job
 .setMapOutputKeyClass(Text.
 class
 );
 
                                                 
 job
 .setMapOutputValueClass(FlowBean.
 class
 );
 

 
                                                 
 job
 .setOutputKeyClass(Text.
 class
 );
 
                                                 
 job
 .setOutputKeyClass(FlowBean.
 class
 );
 

 
                                                Path 
 inputPath 
 = 
 new 
 Path(
 HDFS_PATH 
 + 
 "/flow/input"
 );
 
                                                Path 
 outputDir 
 = 
 new 
 Path(
 HDFS_PATH 
 + 
 "/flow/output"
 );
 
                                
 
                                                FileInputFormat. setInputPaths(
 job
 , 
 inputPath
 );
 
                                                
 
                                                FileOutputFormat. setOutputPath(
 job
 , 
 outputDir
 );
 

 
                                                FileSystem 
 fs 
 = FileSystem.get( 
 new 
 URI(
 HDFS_PATH
 ), 
 conf
 );
 
                                                 
 if 
 (
 fs 
 .exists(
 outputDir
 )) {
 
                                                                 
 fs
 .delete(
 outputDir 
 , 
 true
 );
 
                                                }
 
                                                System. exit(
 job
 .waitForCompletion( 
 true
 ) ? 0 : 1);
 
                                } 
 catch 
 (Exception 
 e 
 ) {
 
                                                 
 e
 .printStackTrace();
 
                                }
 
                }
 
}

3、查看统计结果



[hadoop@cloud01 ~]$ hadoop fs -cat /flow/output/part-r-00000



具体执行结果略



举报

相关推荐

0 条评论