Java代码:flink wordcount代码示例及解读

阅读 29

2023-09-02


WordCount

WordCountPojo.java

代码

package wordCount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.util.Collector;
import wordCount.util.WordCountData;

@SuppressWarnings("serial")
public class WordCountPojo {

    /**
     * This is the POJO (Plain Old Java Object) that is being used
     * for all the operations.
     * As long as all fields are public or have a getter/setter, the system can handle them
     */
    public static class Word {

        // fields
        private String word;
        private int frequency;

        // constructors
        public Word() {}

        public Word(String word, int i) {
            this.word = word;
            this.frequency = i;
        }

        // getters setters
        public String getWord() {
            return word;
        }

        public void setWord(String word) {
            this.word = word;
        }

        public int getFrequency() {
            return frequency;
        }

        public void setFrequency(int frequency) {
            this.frequency = frequency;
        }

        @Override
        public String toString() {
            return "Word=" + word + " freq=" + frequency;
        }
    }

    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataSet<String> text;
        if (params.has("input")) {
            // read the text file from given input path
            text = env.readTextFile(params.get("input"));
        } else {
            // get default test text data
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            text = WordCountData.getDefaultTextLineDataSet(env);
        }

        DataSet<Word> counts =
                // split up the lines into Word objects (with frequency = 1)
                text.flatMap(new Tokenizer())
                        // group by the field word and sum up the frequency
                        .groupBy("word")
                        .reduce(new ReduceFunction<Word>() {
                            @Override
                            public Word reduce(Word value1, Word value2) throws Exception {
                                return new Word(value1.word, value1.frequency + value2.frequency);
                            }
                        });

        if (params.has("output")) {
            counts.writeAsText(params.get("output"), FileSystem.WriteMode.OVERWRITE);
            // execute program
            env.execute("WordCount-Pojo Example");
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }

    }

    // *************************************************************************
    //     USER FUNCTIONS
    // *************************************************************************

    /**
     * Implements the string tokenizer that splits sentences into words as a user-defined
     * FlatMapFunction. The function takes a line (String) and splits it into
     * multiple Word objects.
     */
    public static final class Tokenizer implements FlatMapFunction<String, Word> {

        @Override
        public void flatMap(String value, Collector<Word> out) {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    out.collect(new Word(token, 1));
                }
            }
        }
    }

}

代码解释

这段代码是一个使用 Apache Flink 实现的简单的单词计数程序。它包含了一个 POJO 类 WordCountPojo.Word 和一个 main 方法。

代码解释如下:

  1. @SuppressWarnings("serial") 是一个注解,用于告诉编译器忽略特定类型的警告信息。
  2. WordCountPojo 类是一个包含 main 方法的公共类。
  3. Word 是一个静态嵌套类,用于表示单词及其频率的 POJO(Plain Old Java Object)。它包含了以下成员:
  • word:表示单词的字符串类型字段。
  • frequency:表示单词出现频率的整数类型字段。
  • Word():默认构造函数。
  • Word(String word, int i):带参数的构造函数,用于设置单词和频率的初始值。
  • getWord()setWord():用于获取和设置单词字段的方法。
  • getFrequency()setFrequency():用于获取和设置频率字段的方法。
  • toString():重写的 toString() 方法,返回包含单词和频率的字符串表示。
  1. main 方法是程序的入口点。它使用 Apache Flink 的执行环境 ExecutionEnvironment 来设置和执行 Flink 作业。下面是主要步骤:
  • 创建 ParameterTool 对象 params,用于从命令行参数中获取配置参数。
  • 获取执行环境 env
  • 将参数设置为全局作业参数,以便在 Web 接口中使用。
  • 根据参数中指定的输入路径读取文本数据集,或者使用默认的测试文本数据集。
  • 对文本进行处理,首先使用 Tokenizer 函数将每行文本拆分为单词,并转换为 Word 对象(频率初始值为1)。
  • 将转换后的数据按照单词字段进行分组,并使用 ReduceFunction 对相同单词的频率进行累加。
  • 如果参数中指定了输出路径,则将结果写入到指定的文件中,并执行作业。
  • 如果没有指定输出路径,则将结果打印到标准输出。
  1. Tokenizer 是一个实现了 FlatMapFunction 接口的静态内部类,用于将输入的文本行拆分为单词并转换为 Word 对象。它包含以下方法:
  • flatMap(String value, Collector<Word> out):接收一个输入字符串,将其转换为小写并按非字母字符拆分为单词。然后遍历每个单词,如果单词长度大于0,则创建一个新的 Word 对象,并将其添加到输出集合中。

总体而言,该代码实现了一个简单的单词计数程序,使用 Flink 提供的数据处理功能。它将输入文本拆分为单词,并统计每个单词出现的频率,最后将结果输出到文件或标准输出。

WordCount.java

代码

package wordCount;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
import wordCount.util.WordCountData;

public class WordCount {

    // *************************************************************************
    //     PROGRAM
    // *************************************************************************

    public static void main(String[] args) throws Exception {

        final ParameterTool params = ParameterTool.fromArgs(args);

        // set up the execution environment
        final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();

        // make parameters available in the web interface
        env.getConfig().setGlobalJobParameters(params);

        // get input data
        DataSet<String> text;
        if (params.has("input")) {
            // read the text file from given input path
            text = env.readTextFile(params.get("input"));
        } else {
            // get default test text data
            System.out.println("Executing WordCount example with default input data set.");
            System.out.println("Use --input to specify file input.");
            text = WordCountData.getDefaultTextLineDataSet(env);
        }

        DataSet<Tuple2<String, Integer>> counts =
                // split up the lines in pairs (2-tuples) containing: (word,1)
                text.flatMap(new Tokenizer())
                        // group by the tuple field "0" and sum up tuple field "1"
                        .groupBy(0)
                        .sum(1);

        // emit result
        if (params.has("output")) {
            counts.writeAsCsv(params.get("output"), "\n", " ");
            // execute program
            env.execute("WordCount Example");
        } else {
            System.out.println("Printing result to stdout. Use --output to specify output path.");
            counts.print();
        }

    }

    // *************************************************************************
    //     USER FUNCTIONS
    // *************************************************************************

    /**
     * Implements the string tokenizer that splits sentences into words as a user-defined
     * FlatMapFunction. The function takes a line (String) and splits it into
     * multiple pairs in the form of "(word,1)" ({@code Tuple2<String, Integer>}).
     */
    public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {

        @Override
        public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws InterruptedException {
            // normalize and split the line
            String[] tokens = value.toLowerCase().split("\\W+");

            // emit the pairs
            for (String token : tokens) {
                if (token.length() > 0) {
                    // Sleep 10s while processing each for word
                    Thread.sleep(1_000L);
                    out.collect(new Tuple2<>(token, 1));
                }
            }
        }
    }

}

代码解释

这段代码是一个使用 Apache Flink 实现的简单的单词计数程序。它包含了一个 main 方法和一个 Tokenizer 内部类。

代码解释如下:

  1. WordCount 类是一个公共类,包含了 main 方法。
  2. main 方法是程序的入口点。它使用 Apache Flink 的执行环境 ExecutionEnvironment 来设置和执行 Flink 作业。以下是主要步骤:
  • 创建 ParameterTool 对象 params,用于从命令行参数中获取配置参数。
  • 获取执行环境 env
  • 将参数设置为全局作业参数,以便在 Web 接口中使用。
  • 根据参数中指定的输入路径读取文本数据集,或者使用默认的测试文本数据集。
  • 对文本进行处理,首先使用 Tokenizer 函数将每行文本拆分为单词,并转换为 (word, 1) 的元组。
  • 将转换后的数据按照单词字段进行分组,并对元组的第二个字段进行求和。
  • 如果参数中指定了输出路径,则将结果以 CSV 格式写入到指定的文件中,并执行作业。
  • 如果没有指定输出路径,则将结果打印到标准输出。
  1. Tokenizer 是一个实现了 FlatMapFunction 接口的静态内部类,用于将输入的文本行拆分为单词并转换为 (word, 1) 的元组。它包含以下方法:
  • flatMap(String value, Collector<Tuple2<String, Integer>> out):接收一个输入字符串,将其转换为小写并按非字母字符拆分为单词。然后遍历每个单词,如果单词长度大于0,则创建一个新的 (word, 1) 的元组,并将其添加到输出集合中。在每个单词处理时,线程会休眠1秒钟,以模拟一个耗时的操作。

总体而言,该代码实现了一个简单的单词计数程序,使用 Flink 提供的数据处理功能。它将输入文本拆分为单词,并统计每个单词出现的次数,最后将结果输出到文件或标准输出。在 Tokenizer 类中,还模拟了一个耗时的操作,以展示在处理数据时可以执行一些自定义操作的能力。

maven pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>batch</artifactId>
    <version>1.0-SNAPSHOT</version>
    <dependencies>

        <!-- Flink dependencies -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-core</artifactId>
            <version>1.9.0</version>
        </dependency>

        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-java</artifactId>
            <version>1.9.0</version>
        </dependency>
        <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-clients -->
        <dependency>
            <groupId>org.apache.flink</groupId>
            <artifactId>flink-clients_2.12</artifactId>
            <version>1.9.0</version>
        </dependency>


    </dependencies>


    <build>
        <plugins>
            <!--JDK版本 -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>2.5.1</version>
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                    <encoding>UTF-8</encoding>
                    <showWarnings>true</showWarnings>
                </configuration>
            </plugin>
            <!--  flink 打包插件    -->
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-shade-plugin</artifactId>
                <version>3.0.0</version>
                <executions>
                    <execution>
                        <phase>package</phase>
                        <goals>
                            <goal>shade</goal>
                        </goals>
                        <configuration>
                            <artifactSet>
                                <excludes>
                                    <exclude>com.google.code.findbugs:jsr305</exclude>
                                    <exclude>org.slf4j:*</exclude>
                                    <exclude>log4j:*</exclude>
                                </excludes>
                            </artifactSet>
                            <filters>
                                <filter>
                                    <!-- Do not copy the signatures in the META-INF folder.
                                    Otherwise, this might cause SecurityExceptions when using the JAR. -->
                                    <artifact>*:*</artifact>
                                    <excludes>
                                        <exclude>META-INF/*.SF</exclude>
                                        <exclude>META-INF/*.DSA</exclude>
                                        <exclude>META-INF/*.RSA</exclude>
                                    </excludes>
                                </filter>
                            </filters>
                            <transformers>
                                <transformer implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
                                    <mainClass>wordCount.WordCount</mainClass>
                                </transformer>
                            </transformers>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>

</project>

links:

https://github.com/will-che/flink-simple-tutorial


精彩评论(0)

0 0 举报