0
点赞
收藏
分享

微信扫一扫

【MapReduce】综合案例



文章目录

  • ​​综合案例​​
  • ​​① 数据文件​​
  • ​​② 具体要求​​
  • ​​③ 具体实现​​
  • ​​• 上传文件​​
  • ​​• 封装Bean类​​
  • ​​• Mapper类缓存information.txt,实现与student.txt的连接​​
  • ​​• Reducer类统计对象属性为NULL​​
  • ​​• Partition类实现按照性别分区​​
  • ​​• Driver类配置​​

综合案例

① 数据文件

【MapReduce】综合案例_java

② 具体要求

  • information.txt 上传到 HDFS
  • student.txt 放在本地
  • 通过distribute缓存读取hdfs上的数据,
  • 将 HDFS和本地数据封装到一个JavaBean对象中
  • 要求在map端封装好对象,在reduce端计算对象中属性为Null的个数作为value个数输出,输出key为Bean对象的tostring
  • 分区设为两个,根据性别分区

③ 具体实现

• 上传文件

【MapReduce】综合案例_apache_02

• 封装Bean类

package CSDN综合练习;

import org.apache.hadoop.io.Writable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class Bean implements Writable {
// 封装属性
private String id;
private String name;
private String sex;
private String hobby;
private String job;
// 无参构造器
public Bean() {
}
// 重写toString
@Override
public String toString() {
return "Bean{" +
"id='" + id + '\'' +
", name='" + name + '\'' +
", sex='" + sex + '\'' +
", hobby='" + hobby + '\'' +
", job='" + job + '\'' +
'}';
}
// 序列化
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(id);
dataOutput.writeUTF(name);
dataOutput.writeUTF(sex);
dataOutput.writeUTF(hobby);
dataOutput.writeUTF(job);
}
// 反序列化
@Override
public void readFields(DataInput dataInput) throws IOException {
id = dataInput.readUTF();
name = dataInput.readUTF();
sex = dataInput.readUTF();
hobby = dataInput.readUTF();
job = dataInput.readUTF();
}
// set\get
public String getId() {
return id;
}

public void setId(String id) {
this.id = id;
}

public String getName() {
return name;
}

public void setName(String name) {
this.name = name;
}

public String getSex() {
return sex;
}

public void setSex(String sex) {
this.sex = sex;
}

public String getHobby() {
return hobby;
}

public void setHobby(String hobby) {
this.hobby = hobby;
}

public String getJob() {
return job;
}

public void setJob(String job) {
this.job = job;
}
}

​​返回顶部​​

• Mapper类缓存information.txt,实现与student.txt的连接

package CSDN综合练习;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import java.io.BufferedReader;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStreamReader;
import java.net.URI;
import java.util.HashMap;

public class Mapper extends org.apache.hadoop.mapreduce.Mapper<LongWritable, Text,Text, Bean> {

Bean bean = new Bean();
HashMap map = new HashMap();
/**
* 缓存hdfs上的数据表
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void setup(Context context) throws IOException, InterruptedException {
try {
// 获取缓存文件
URI[] cacheFiles = context.getCacheFiles();
// 通过缓存文件获取路径
String path = cacheFiles[0].getPath().toString();
System.out.println(path);
// 读取文件信息
BufferedReader br = new BufferedReader(new InputStreamReader(new FileInputStream(path),"UTF-8"));
String line;
while (StringUtils.isNotEmpty(line = br.readLine())){
// 游戏 大数据 1
// 读取一行数据,拆分
String[] fields = line.split("\t");
map.put(fields[2],fields[0]+"\t"+fields[1]);
}
// 关闭资源
IOUtils.closeStream(br);

} catch (Exception e){
e.printStackTrace();
}
}

@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// 1 张三 女
// 读取一行数据,进行拆分
String line = value.toString();
String[] fields = line.split("\t");
// 封装bean对象
String id = fields[0];
String name = fields[1];
String sex = fields[2];
String hobby = (String)map.get(id).toString().split("\t")[0];
String job = (String)map.get(id).toString().split("\t")[1];
bean.setId(id);
bean.setName(name);
bean.setSex(sex);
bean.setHobby(hobby);
bean.setJob(job);
// 写出
context.write(new Text(id),bean);
}
}

​​返回顶部​​

• Reducer类统计对象属性为NULL

package CSDN综合练习;

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;

public class Reduce extends Reducer<Text, Bean,Bean,LongWritable> {
@Override
protected void reduce(Text key, Iterable<Bean> values, Context context) throws IOException, InterruptedException {
// 获取数据进行null值统计
int count = 0;
for (Bean bean:values){
if (bean.getName()==null||bean.getName().equals("Null")){
count++;
}
if (bean.getJob()==null||bean.getJob().equals("Null")){
count++;
}
if (bean.getHobby()==null||bean.getHobby().equals("Null")){
count++;
}
if (bean.getSex()==null||bean.getSex().equals("Null")){
count++;
}
context.write(bean,new LongWritable(count));
}
}
}

​​返回顶部​​

• Partition类实现按照性别分区

package CSDN综合练习;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Partitioner;

public class Partition extends Partitioner<Text, Bean> {
@Override
public int getPartition(Text text,Bean bean, int numPartitions) {
// 获取用户信息
String sex = bean.getSex();
// 指定分区数
int partition = 1;
if (sex.equals("男")){
partition = 0;
} else {
partition = 1;
}
return partition;
}
}

​​返回顶部​​

• Driver类配置

package CSDN综合练习;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;

public class Driver {
public static void main(String[] args) {
Job job;
Configuration conf = new Configuration();
try {
// 获取job
job = Job.getInstance(conf);
// 配置
job.setMapperClass(Mapper.class);
job.setReducerClass(Reduce.class);
job.setJarByClass(Driver.class);

job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(Bean.class);
job.setOutputKeyClass(Bean.class);
job.setOutputValueClass(LongWritable.class);

// 配置缓存
String path = "hdfs://192.168.64.178:9000/user/MR/input/information.txt";
job.addCacheFile(new URI("file:///G:/Projects/IdeaProject-C/MapReduce/src/main/java/CSDN综合练习/data/information.txt"));

// 自定义分区
job.setPartitionerClass(Partition.class);
// reduce计算的数量
job.setNumReduceTasks(2);

// 配置输入输出文件
FileInputFormat.setInputPaths(job,new Path("G:/Projects/IdeaProject-C/MapReduce/src/main/java/CSDN综合练习/data/student.txt"));
FileOutputFormat.setOutputPath(job,new Path("G:/Projects/IdeaProject-C/MapReduce/src/main/java/CSDN综合练习/output_withoutReducer"));

// 提交job
boolean result = job.waitForCompletion(true);
System.exit(result? 0:1);
} catch (Exception e){
e.printStackTrace();
}
}
}

【MapReduce】综合案例_hadoop_03

注意:

  • 在这里使用上面的hdfs文件路径的时候会报错:java.io.FileNotFoundException: \user\MR\input\information.txt (系统找不到指定的路径。)原因暂时未知,解决后会跟新~

​​返回顶部​​


举报

相关推荐

0 条评论