0
点赞
收藏
分享

微信扫一扫

【MapReduce】基础案例 ---- 自定义OutputFormat <根据内容输出到指定文件目录中>



文章目录

  • ​​常见的OutputFormat实现类​​
  • ​​  ☠ 自定义OutputFormat​​
  • ​​案例​​
  • ​​▪ 需求分析​​
  • ​​▪ 代码实现​​
  • ​​自定义FilterOutputFormat​​
  • ​​Mapper阶段​​
  • ​​Reducer阶段​​
  • ​​Driver阶段​​

OutputFormat是MR输出的基类,所有实现MR输出都实现了OutputFormat接口。

常见的OutputFormat实现类

1.文本输出TestOutputFormat

  • 默认的输出格式是TestOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TestOutputFormat调用 toString()方法把它们转换为字符串。

2.SequenceFileOutputFormat

  • 将SequenceFileOutputFormat输出作为后续MR任务的输入,这便是一种好的输出格式,因为它格式紧凑,很容易被压缩

3.自定义OutputFormat

  • 根据用户需求,自定义实现输出

【MapReduce】基础案例 ---- 自定义OutputFormat <根据内容输出到指定文件目录中>_ide

案例

▪ 需求分析

【MapReduce】基础案例 ---- 自定义OutputFormat <根据内容输出到指定文件目录中>_mapreduce_02


​​返回顶部​​

▪ 代码实现

自定义FilterOutputFormat
  • 继承自FileOutputFormat,同时泛型应当与Reduce一致
  • 重写RecordWriter,自定义输出流。

public class FilterOutputFormat extends FileOutputFormat <Text, NullWritable>{
@Override
public RecordWriter<Text, NullWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
return new FliterRecordWriter(job);
}
}

  • 重写FliterRecordWriter()方法,创建输出流,指定文件输出路径
  • 重写write()方法,定义文件输出时评判指标,根据判定使用指定的流输出到指定目录(文件中)
  • 关闭资源IO流

public class FliterRecordWriter extends RecordWriter<Text, NullWritable> {
// 创建流对象
FSDataOutputStream fosatguigu;
FSDataOutputStream fosother;
// 构造输出流
public FliterRecordWriter(TaskAttemptContext job) {
try {
// 1.获取文件系统
FileSystem fs = FileSystem.get(job.getConfiguration());
// 2.创建输出到atguigu.log的输出流
fosatguigu = fs.create(new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\OutputFormat数据输出\\atguigu.log"));
// 3.创建输出到other.log的输出流
fosother = fs.create(new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\OutputFormat数据输出\\other.log"));

} catch (Exception e) {
e.printStackTrace();
}
}
@Override
public void write(Text key, NullWritable value) throws IOException, InterruptedException {
// 判断key的内容是否包含atguigu
if (key.toString().contains("atguigu")) {
fosatguigu.write(key.toString().getBytes());
} else {
fosother.write(key.toString().getBytes());
}
}
@Override
public void close(TaskAttemptContext context) throws IOException, InterruptedException {
// 关闭IO流
IOUtils.closeStream(fosatguigu);
IOUtils.closeStream(fosother);
}
}

​​返回顶部​​

Mapper阶段
  • 读取数据,由于不需要进行内部操作,直接写出

public class FilterMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
// http://www.baidu.com
// http://www.google.com
// 写出
context.write(value,NullWritable.get());
}
}

​​返回顶部​​

Reducer阶段
  • reducer阶段只是将读取的数据输出,这里注意将读取的内容进行格式化,增加换行,否则最终文件以字符串追加形式,一整行展现。

public class FilterReducer extends Reducer<Text, NullWritable,Text,NullWritable> {
Text k = new Text();
@Override
protected void reduce(Text key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
// http://www.baidu.com
// 写出,添加换行符
String line = key.toString();
line = line + "\r\n";
k.set(line);
// 循环防止有重复
for (NullWritable value:values){
context.write(k,NullWritable.get());
}
}
}

​​返回顶部​​

Driver阶段

public class FilterDriver {
public static void main(String[] args) {
Job job = null;
Configuration conf = new Configuration();
try {
// 获取job
job = Job.getInstance(conf);
// 配置
job.setMapperClass(FilterMapper.class);
job.setReducerClass(FilterReducer.class);
job.setJarByClass(FilterDriver.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(NullWritable.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(NullWritable.class);
// 将自定义的输出格式组件设置到job中
job.setOutputFormatClass(FilterOutputFormat.class);
// 设置输入输出路径
// 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat
// 而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录
FileInputFormat.setInputPaths(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\OutputFormat数据输出\\log.txt"));
FileOutputFormat.setOutputPath(job,new Path("G:\\Projects\\IdeaProject-C\\MapReduce\\src\\main\\java\\第三章_MR框架原理\\Filteroutput"));
// 提交job
boolean result = job.waitForCompletion(true);
System.exit(result ? 0 : 1);
} catch (Exception e){
e.printStackTrace();
}
}
}

  • 虽然我们自定义了outputformat,但是因为我们的outputformat继承自fileoutputformat,而fileoutputformat要输出一个_SUCCESS文件,所以,在这还得指定一个输出目录,用于存储_SUCCESS文件。
  • 【MapReduce】基础案例 ---- 自定义OutputFormat <根据内容输出到指定文件目录中>_ide_03

​​返回顶部​​


举报

相关推荐

0 条评论