【Hadoop】MapReduce案例——TFIDF案例

阅读 59

2022-04-25

文章目录

一、前期准备

可参考 “词频统计” 案例中的前期准备阶段

二、数据准备

采用微博爬取数据weibo.txt

三、TF计算

1.TfJob.class

package com.hdtrain.tfidf;

import com.google.inject.internal.cglib.core.$AbstractClassGenerator;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TfJob {
    public static void main(String[] args) {
        Configuration configuration = new Configuration(true);
        configuration.set("mapreduce.framework.name", "local");

        try {
            FileSystem fs = FileSystem.get(configuration);
            Job job = Job.getInstance(configuration);
            job.setJarByClass(TfJob.class);
            job.setJobName("TF");

            job.setOutputKeyClass(TfBean.class);
            job.setOutputValueClass(IntWritable.class);
            job.setNumReduceTasks(3);
            job.setPartitionerClass(TfPartition.class);
            job.setMapperClass(TfMapper.class);
            job.setReducerClass(TfReducer.class);

            FileInputFormat.addInputPath(job, new Path("/data/weibo/"));

            Path path = new Path("/results/weibo/weibo-tf/");
            if (fs.exists(path)){
                fs.delete(path, true);
            }
            FileOutputFormat.setOutputPath(job, path);

            job.waitForCompletion(true);
        } catch (IOException | InterruptedException | ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}

2.TfMapper.class

package com.hdtrain.tfidf;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.wltea.analyzer.core.IKSegmenter;
import org.wltea.analyzer.core.Lexeme;

import java.io.IOException;
import java.io.StringReader;
import java.util.ArrayList;
import java.util.List;

public class TfMapper extends Mapper<LongWritable, Text, TfBean, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] v = value.toString().trim().split("\t");
        if (v.length>=2){
            // 开始拆分数据
            String id = v[0].trim();
            String content = v[1].trim();
            //定义一个List存放分词器获取的值
            List<String> words = new ArrayList<>();
            //开始分词
            StringReader sr = new StringReader(content);
            IKSegmenter ikSegmenter = new IKSegmenter(sr, true);
            Lexeme word = null;
            while ((word = ikSegmenter.next()) != null){
                String w = word.getLexemeText();
                context.write(new TfBean(id, w), new IntWritable(1));
            }
            context.write(new TfBean("count", ""), new IntWritable(1));
        }

    }
}

3.TfReducer.class

package com.hdtrain.tfidf;

import org.apache.hadoop.io.DoubleWritable;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import javax.swing.*;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;

public class TfReducer extends Reducer<TfBean, IntWritable, Text, DoubleWritable> {
    @Override
    protected void reduce(TfBean key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0; // 一篇微博中总共出现了多少个单词
        //统计一篇微博中每个词出现的次数
        Map<TfBean, Integer> map = new HashMap<>();
        for (IntWritable i:values){
            sum = sum + i.get();
            //判断是否存在,如果存在就累加,不存在就初始化为1
            TfBean tfBean = new TfBean(key.getId(), key.getWord());
            if (map.containsKey(tfBean)){
                map.put(tfBean, map.get(tfBean) + 1);
            } else {
                map.put(tfBean, 1);
            }
        }
        //开始迭代写数据
        if (key.getId().equals("count")){
            context.write(new Text("count"), new DoubleWritable(sum));
        } else {
            for (Map.Entry<TfBean, Integer> entry: map.entrySet()){
                context.write(new Text(entry.getKey().getWord() + "_" + entry.getKey().getId()), new DoubleWritable(entry.getValue() * 1.0 / sum));
            }
        }
    }
}

4.TfBean.class

package com.hdtrain.tfidf;

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class TfBean implements WritableComparable<TfBean> {
    private String id;
    private String word;

    public void setId(String id) {
        this.id = id;
    }

    public void setWord(String word) {
        this.word = word;
    }

    public String getId() {
        return id;
    }

    public String getWord() {
        return word;
    }

    public TfBean() {
    }

    public TfBean(String id, String word) {
        super();
        this.id = id;
        this.word = word;
    }

    @Override
    public void write(DataOutput dataOutput) throws IOException {
        dataOutput.writeUTF(id);
        dataOutput.writeUTF(word);
    }

    @Override
    public void readFields(DataInput dataInput) throws IOException {
        this.id = dataInput.readUTF();
        this.word = dataInput.readUTF();
    }

    @Override
    public int compareTo(TfBean o) {
        int results = this.id.compareTo(o.getId());
        return results;
    }

    @Override
    public int hashCode() {
        return super.hashCode();
    }
}

5.TfPartition.class

package com.hdtrain.tfidf;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.mapreduce.lib.partition.HashPartitioner;


public class TfPartition extends HashPartitioner<TfBean, IntWritable> {
    @Override
    public int getPartition(TfBean key, IntWritable value, int numReduceTasks) {
        if (key.getId().equals("count")){
            return numReduceTasks - 1;  // id为count的由标号为2的主机处理
        } else {
            return Math.abs(key.getId().hashCode()) % (numReduceTasks -1); // 其余由标号为0、1的主机处理
        }
    }
}

6.计算结果
在这里插入图片描述
在这里插入图片描述

四、IDF计算

1.IdfJob.class

package com.hdtrain.tfidf;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class IdfJob {
    public static void main(String[] args) {
        Configuration configuration = new Configuration(true);
        configuration.set("mapreduce.framework.name", "local");

        try {
            FileSystem fs = FileSystem.get(configuration);
            Job job = Job.getInstance(configuration);
            job.setJarByClass(IdfJob.class);
            job.setJobName("IDF");

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            job.setMapperClass(IdfMapper.class);
            job.setCombinerClass(IdfReducer.class);
            job.setReducerClass(IdfReducer.class);

            FileInputFormat.addInputPath(job, new Path("/results/weibo/weibo-tf"));
            Path path = new Path("/results/weibo/weibo-idf");
            if (fs.exists(path)){
                fs.delete(path, true);
            }
            FileOutputFormat.setOutputPath(job, path);
            job.waitForCompletion(true);
        } catch (IOException | InterruptedException | ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}

2.IdfMapper.class

package com.hdtrain.tfidf;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.IOException;

public class IdfMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 获取当前mapper task的数据片段(split)
        FileSplit fs = (FileSplit) context.getInputSplit();
        //只计算除了002以外的切片
        if (!fs.getPath().getName().contains("part-r-00002")){
            String[] v = value.toString().trim().split("\t");
            if (v.length >= 2){
                String[] ss = v[0].split("_");
                if (ss.length >= 2){
                    String word = ss[0];
                    context.write(new Text(word), new IntWritable(1));
                }
            }
        }
    }
}

3.IdfReducer.class

package com.hdtrain.tfidf;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class IdfReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for (IntWritable i: values){
            sum = sum + i.get();
        }

        context.write(key, new IntWritable(sum));
    }
}

4.计算结果
在这里插入图片描述

五、TFIDF计算

1.TfidfJob.class

package com.hdtrain.tfidf;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class TfidfJob {
    public static void main(String[] args) {
        Configuration configuration = new Configuration(true);
        configuration.set("mapreduce.framework.name", "local");

        try {
            Job job = Job.getInstance(configuration);
            job.setJarByClass(TfidfJob.class);
            job.setJobName("TFIDF");
            //加载微博总数到缓存文件
            job.addCacheFile(new Path("/results/weibo/weibo-tf/part-r-00002").toUri());
            //加载每个词在文章中的出现次数
            job.addCacheFile(new Path("/results/weibo/weibo-idf/part-r-00000").toUri());

            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
            job.setMapperClass(TfidfMapper.class);
            job.setReducerClass(TfidfReducer.class);

            FileInputFormat.addInputPath(job, new Path("/results/weibo/weibo-tf"));
            Path path = new Path("/results/weibo/weibo-tfidf");
            FileSystem fs = FileSystem.get(configuration);
            if (fs.exists(path)){
                fs.delete(path, true);
            }
            FileOutputFormat.setOutputPath(job, path);
            job.waitForCompletion(true);
        } catch (IOException | InterruptedException | ClassNotFoundException e) {
            e.printStackTrace();
        }
    }
}

2.TfidfMapper.class

package com.hdtrain.tfidf;

import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;

import java.io.BufferedReader;
import java.io.File;
import java.io.FileReader;
import java.io.IOException;
import java.net.URI;
import java.text.NumberFormat;
import java.util.HashMap;
import java.util.Map;

public class TfidfMapper extends Mapper<LongWritable, Text, Text, Text> {
    //存放微博总数
    public static Double weibo_count;
    //存放出现某个词语的微博数
    public static Map<String, Integer> wordInText = null;

    protected void setup(Context context) throws IOException {
        URI[] cacheFile = context.getCacheFiles();
        if (cacheFile != null){
            for (int i=0;i < cacheFile.length;i++){
                //获取缓存文件名称
                URI uri = cacheFile[i];
                //微博总数
                if (uri.getPath().endsWith("part-r-00002")){
                    //定义文件路径
                    Path path = new Path(uri.getPath());
                    //获取字符流
                    BufferedReader br = new BufferedReader(new FileReader(path.getName()));
                    //读取一行
                    String line = br.readLine();
                    if (line.startsWith("count")){
                        String[] ls = line.split("\t");
                        weibo_count = Double.parseDouble(ls[1].trim());
                    }
                    br.close();
                } else if (uri.getPath().endsWith("part-r-00000")){ //词条的tf
                    wordInText = new HashMap<String, Integer>();
                    Path path = new Path(uri.getPath());
                    BufferedReader br = new BufferedReader(new FileReader(path.getName()));
                    String line;
                    while ((line = br.readLine()) != null){
                        String[] ls = line.split("\t");
                        wordInText.put(ls[0], Integer.parseInt(ls[1].trim()));
                    }
                    br.close();
                }
            }
        }
    }

    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        // 获取当前mapper task的数据片段(split)
        FileSplit fs = (FileSplit) context.getInputSplit();
        //只计算除了002以外的切片
        if (!fs.getPath().getName().contains("part-r-00002")){
            String[] v = value.toString().trim().split("\t");
            if (v.length >= 2){
                double tf = Double.parseDouble(v[1].trim());
                String[] ss = v[0].split("_");
                if (ss.length >= 2){
                    String w= ss[0];
                    String id= ss[1];
                    //计算tfidf
                    double tfidf = tf * Math.log(weibo_count / wordInText.get(w));
                    NumberFormat nf = NumberFormat.getInstance();
                    nf.setMaximumFractionDigits(5);
                    context.write(new Text(id), new Text(w+":"+nf.format(tfidf)));
                }
            } else {
                System.out.println(value.toString() + "---------------------");
            }
        }
    }
}

3.TfidfReducer.class

package com.hdtrain.tfidf;

import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class TfidfReducer extends Reducer<Text, Text, Text, Text> {
    @Override
    protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
        StringBuffer sb = new StringBuffer();
        for (Text i: values){
            sb.append(i.toString() + "\t");
        }
        context.write(key, new Text(sb.toString()));
    }
}

4.计算结果
在这里插入图片描述

精彩评论(0)

0 0 举报