0
点赞
收藏
分享

微信扫一扫

MapReduce手写 wordcount

1.项目架构:

MapReduce手写 wordcount_mapreduce

2.编写WordCount类

package com.yqq;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
/**
* @Author yqq
* @Date 2021/11/03 14:24
* @Version 1.0
*/
public class WordCount {
public static void main(String[] args) throws Exception {
String parm1 = "/wordcount/input";
String parm2 = "/wordcount/output2";
//创建配置文件对象,加载默认的配置文件
Configuration conf = new Configuration(true);
//设置本地运行
// conf.set("mapreduce.framework.name","local");
//构造Job作业的对象
Job job = Job.getInstance(conf,"wordcount");
//设置程序的入口类
job.setJarByClass(WordCount.class);
//设置输入路径:必须存在的路劲
FileInputFormat.addInputPath(job,new Path(parm1));
//设置输出路劲:输出路劲必须是不存在的
FileOutputFormat.setOutputPath(job,new Path(parm2));
//设置Mapper类
job.setMapperClass(WCMapprer.class);
//设置Mapper类的输出的key和value类型
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//指定Reduce类和value
job.setReducerClass(WCReduce.class);
//设置reduce类输出的key的类型
job.setOutputKeyClass(Text.class);
job.setMapOutputValueClass(IntWritable.class);
//提交作业
job.waitForCompletion(true);
}
}

3.编写WCMapprer类

package com.yqq;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

/**
* @Author yqq
* @Date 2021/11/03 14:44
* @Version 1.0
*/
public class WCMapprer extends Mapper<LongWritable, Text,Text, IntWritable> {
private Text keyOut = new Text();
private IntWritable valueOut = new IntWritable(1);
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//输入:0 hello tom
//将Text类型的value装换成字符串
String line = value.toString();
//按照空格进行拆分
String[] words = line.split(" ");
//输出:hello 1
// tom 1
//遍历输出
for (String word : words) {
keyOut.set(word);
context.write(keyOut,valueOut);
}
}
}

4.编写WCReduce类

package com.yqq;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;
import java.util.Iterator;

/**
* @Author yqq
* @Date 2021/11/03 14:56
* @Version 1.0
*/
public class WCReduce extends Reducer<Text, IntWritable,Text,IntWritable> {
private IntWritable valueOut = new IntWritable();
@Override
protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
//输入:key->hello value->1, 1, 1, 1
int sum = 0;
//获取values中的迭代器
Iterator<IntWritable> iterator = values.iterator();
//循环遍历,求和
while (iterator.hasNext())
sum += iterator.next().get();
//输出:key->hello value->4
valueOut.set(sum);
context.write(key,valueOut);
}
}

5.本地测试

如果想本地运行,则可以如此设置:在WordCount类里加conf.set(“mapreduce.framework.name”,“local”);

6.打包

先注释掉 WordCount类中://conf.set(“mapreduce.framework.name”,“local”); 只打包三个类就可以。
MapReduce手写 wordcount_mapreduce_02
MapReduce手写 wordcount_apache_03

7.上传与运行

xftp 上传wc.jar到/root 下

[root@node1 ~]# ll
total 36
-rw-------. 1 root root 900 Jul 23 08:57 anaconda-ks.cfg
drwxr-xr-x 2 root root 4096 Nov 2 22:57 bin
-rw-r--r--. 1 root root 8816 Aug 4 08:52 install.log
-rw-r--r--. 1 root root 3384 Jul 23 08:56 install.log.syslog
-rw-r--r-- 1 root root 7499 Nov 3 17:33 wc.jar
-rw-r--r-- 1 root root 87 Nov 3 13:24 wc.txt
  1. 运行
[root@node1 ~]# yarn jar wc.jar com.yqq.WordCount /wordcount/input /wordcount/output2
21/11/03 17:51:11 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
21/11/03 17:51:13 WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this.
21/11/03 17:51:15 INFO input.FileInputFormat: Total input paths to process : 1
21/11/03 17:51:16 INFO mapreduce.JobSubmitter: number of splits:1
21/11/03 17:51:16 INFO mapreduce.JobSubmitter: Submitting tokens for job: job_1635932141340_0001
21/11/03 17:51:17 INFO impl.YarnClientImpl: Submitted application application_1635932141340_0001
21/11/03 17:51:17 INFO mapreduce.Job: The url to track the job: http://node3:8088/proxy/application_1635932141340_0001/
21/11/03 17:51:17 INFO mapreduce.Job: Running job: job_1635932141340_0001
21/11/03 17:51:34 INFO mapreduce.Job: Job job_1635932141340_0001 running in uber mode : false
21/11/03 17:51:34 INFO mapreduce.Job: map 0% reduce 0%
21/11/03 17:51:45 INFO mapreduce.Job: map 100% reduce 0%
21/11/03 17:52:06 INFO mapreduce.Job: map 100% reduce 100%
21/11/03 17:52:07 INFO mapreduce.Job: Job job_1635932141340_0001 completed successfully
21/11/03 17:52:07 INFO mapreduce.Job: Counters: 49
File System Counters
FILE: Number of bytes read=182
FILE: Number of bytes written=218655
FILE: Number of read operations=0
FILE: Number of large read operations=0
FILE: Number of write operations=0
HDFS: Number of bytes read=191
HDFS: Number of bytes written=41
HDFS: Number of read operations=6
HDFS: Number of large read operations=0
HDFS: Number of write operations=2
Job Counters
Launched map tasks=1
Launched reduce tasks=1
Data-local map tasks=1
Total time spent by all maps in occupied slots (ms)=8905
Total time spent by all reduces in occupied slots (ms)=17000
Total time spent by all map tasks (ms)=8905
Total time spent by all reduce tasks (ms)=17000
Total vcore-milliseconds taken by all map tasks=8905
Total vcore-milliseconds taken by all reduce tasks=17000
Total megabyte-milliseconds taken by all map tasks=9118720
Total megabyte-milliseconds taken by all reduce tasks=17408000
Map-Reduce Framework
Map input records=8
Map output records=16
Map output bytes=144
Map output materialized bytes=182
Input split bytes=104
Combine input records=0
Combine output records=0
Reduce input groups=6
Reduce shuffle bytes=182
Reduce input records=16
Reduce output records=6
Spilled Records=32
Shuffled Maps =1
Failed Shuffles=0
Merged Map outputs=1
GC time elapsed (ms)=370
CPU time spent (ms)=3240
Physical memory (bytes) snapshot=290283520
Virtual memory (bytes) snapshot=4126760960
Total committed heap usage (bytes)=139792384
Shuffle Errors
BAD_ID=0
CONNECTION=0
IO_ERROR=0
WRONG_LENGTH=0
WRONG_MAP=0
WRONG_REDUCE=0
File Input Format Counters
Bytes Read=87
File Output Format Counters
Bytes Written=41
  1. 查看
[root@node1 ~]# hdfs dfs -cat /wordcount/output2
21/11/03 17:53:03 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
cat: `/wordcount/output2': Is a directory
[root@node1 ~]# hdfs dfs -cat /wordcount/output2/part-r-00000
21/11/03 17:53:43 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
andy 3
hello 5
joy 3
mark 1
rose 2
tom 2


举报

相关推荐

0 条评论