0
点赞
收藏
分享

微信扫一扫

Spark 编程模型(中)

扶摇_hyber 2022-06-20 阅读 61

Spark 编程模型(中)_java

 

先在IDEA新建一个maven项目

Spark 编程模型(中)_spark_02

我这里用的是jdk1.8,选择相应的骨架

Spark 编程模型(中)_java_03

Spark 编程模型(中)_java_04

这里选择本地在window下安装的maven

Spark 编程模型(中)_spark_05

Spark 编程模型(中)_scala_06

 

 新的项目创建成功

Spark 编程模型(中)_java_07

 

 我的开始pom.xml文件配置

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.gong.spark</groupId>
<artifactId>learning-spark</artifactId>
<version>1.0-SNAPSHOT</version>
<inceptionYear>2008</inceptionYear>
<properties>
<scala.version>2.10.4</scala.version>
<spark.version>1.6.1</spark.version>
</properties>

<repositories>
<repository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</repository>
</repositories>

<pluginRepositories>
<pluginRepository>
<id>scala-tools.org</id>
<name>Scala-Tools Maven2 Repository</name>
<url>http://scala-tools.org/repo-releases</url>
</pluginRepository>
</pluginRepositories>

<dependencies>
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>4.4</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.specs</groupId>
<artifactId>specs</artifactId>
<version>1.2.5</version>
<scope>test</scope>
</dependency>
<!--spark-->
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>${spark.version}</version>
<scope>provided</scope>
</dependency>
</dependencies>

<build>
<!--
<sourceDirectory>src/main/scala</sourceDirectory>
<testSourceDirectory>src/test/scala</testSourceDirectory>
-->
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>compile</goal>
<goal>testCompile</goal>
</goals>
</execution>
</executions>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
<args>
<arg>-target:jvm-1.5</arg>
</args>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-eclipse-plugin</artifactId>
<configuration>
<downloadSources>true</downloadSources>
<buildcommands>
<buildcommand>ch.epfl.lamp.sdt.core.scalabuilder</buildcommand>
</buildcommands>
<additionalProjectnatures>
<projectnature>ch.epfl.lamp.sdt.core.scalanature</projectnature>
</additionalProjectnatures>
<classpathContainers>
<classpathContainer>org.eclipse.jdt.launching.JRE_CONTAINER</classpathContainer>
<classpathContainer>ch.epfl.lamp.sdt.launching.SCALA_CONTAINER</classpathContainer>
</classpathContainers>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>2.4.1</version>
<executions>
<!-- Run shade goal on package phase -->
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<transformers>
<!-- add Main-Class to manifest file -->
<transformer
implementation="org.apache.maven.plugins.shade.resource.ManifestResourceTransformer">
<!--<mainClass>com.dajiang.MyDriver</mainClass>-->
</transformer>
</transformers>
<createDependencyReducedPom>false</createDependencyReducedPom>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<reporting>
<plugins>
<plugin>
<groupId>org.scala-tools</groupId>
<artifactId>maven-scala-plugin</artifactId>
<configuration>
<scalaVersion>${scala.version}</scalaVersion>
</configuration>
</plugin>
</plugins>
</reporting>
</project>

 

 

先在终端下试下打包

Spark 编程模型(中)_spark_08

 

 Spark 编程模型(中)_spark_09

可以看到失败了!!!

把这几个生成默认的东西删除掉

Spark 编程模型(中)_java_10

 

 再次测试

Spark 编程模型(中)_java_11

 

成功了

Spark 编程模型(中)_scala_12

 

 把他clean一下

Spark 编程模型(中)_scala_13

Spark 编程模型(中)_scala_14

 

进入自己在虚拟机安装的centos里面的spark

Spark 编程模型(中)_java_15

 

 

先测试一下spark环境有没有问题

Spark 编程模型(中)_spark_16

Spark 编程模型(中)_scala_17

OK运行成,环境没问题!

                                                                                                                                                           

 

 

 

 

                                                                               1.创建RDD

方式一:从集合创建RDD

Spark 编程模型(中)_java_18

回到idea,在main路径下新建java目录,并且对其以下操作:

Spark 编程模型(中)_spark_19

 

 在test路径下新建java目录,对其以下操作:

Spark 编程模型(中)_java_20

Spark 编程模型(中)_scala_21

 

建包

Spark 编程模型(中)_spark_22

Spark 编程模型(中)_scala_23

 

在当前包下起个名字,其实也就是在这个包的路径下再建下一级目录

Spark 编程模型(中)_spark_24

Spark 编程模型(中)_java_25

 

 

Spark 编程模型(中)_spark_26

 

 

 因为我们现在要写的是java程序,所以新建一个java类

Spark 编程模型(中)_java_27

Spark 编程模型(中)_scala_28

Spark 编程模型(中)_scala_29

 

 写个简单的程序测下运行一下

Spark 编程模型(中)_spark_30

Spark 编程模型(中)_scala_31

OK没问题,可以运行!

 

把这里的这个插件由原来的1.5改成1.8,因为刚刚跑的时候有警告

Spark 编程模型(中)_scala_32

 

 在这里新建一个包,具体怎么操作这里就不重复了

Spark 编程模型(中)_scala_33

 

插入之前写好的MyJavaWordCount.java的代码

Spark 编程模型(中)_scala_34

 

 MyJavaWordCount.java参考代码

package com.gong.spark.chap2_3;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import scala.Tuple2;

import java.util.Arrays;

public class MyJavaWordCount {
public static void main(String[] args){
//参数检查
if(args.length<2) {
System.err.println("Usage:MyJavaCount <input> <output>");
System.exit(1);
}

//获取参数
String input=args[0];
String output=args[1];

//创建java版本的SparkContext
SparkConf conf=new SparkConf().setAppName("MyJavaWordCount");
JavaSparkContext sc = new JavaSparkContext(conf);

//读取数据
JavaRDD<String> inputRdd=sc.textFile(input);

//进行相关计算
JavaRDD<String> words=inputRdd.flatMap(new FlatMapFunction<String, String>() {
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}
});

JavaPairRDD<String,Integer> result = words.mapToPair(new PairFunction<String, String, Integer>() {
public Tuple2<String,Integer> call(String word) throws Exception{
return new Tuple2(word,1);
}
}).reduceByKey(new Function2<Integer, Integer, Integer>() {
public Integer call(Integer x, Integer y) throws Exception {
return x+y;
}
});

//保存结果
result.saveAsTextFile(output);

//关闭sc
sc.stop();
}
}

 

把之前写好的scala版本的WordCount程序放进来

Spark 编程模型(中)_spark_35

 

参考代码:

package com.gong.spark

import org.apache.spark.{SparkConf, SparkContext}

object MyWordCount {
def main(args: Array[String]):Unit={
//参数检查
if(args.length<2){
System.err.println("Usage:MyWordCount <input> <output>")
System.exit(1)
}

//获取参数
val input=args(0)
val output=args(1)

//创建SparkContext
val conf=new SparkConf().setAppName("myWordCount")
val sc=new SparkContext(conf)
//读取数据
val lines=sc.textFile(input)

//进行相关计算
val resultRdd=lines.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)

//保存结果
resultRdd.saveAsTextFile(output)
sc.stop()
}
}

 

 

在终端mvn package

Spark 编程模型(中)_scala_36

 

 Spark 编程模型(中)_java_37

 

 可以看到失败了,看来还是要把这里还回1.5版本的,不能乱改

Spark 编程模型(中)_spark_38

 

 再次在终端mvn package,可以看到成功了!

Spark 编程模型(中)_scala_39

 

 

方式二;读取外部存储创建RDD

Spark 编程模型(中)_java_40

 

Spark 编程模型(中)_java_41

 

 

transformation操作

 

 惰性求值

Spark 编程模型(中)_java_42

 

 

转换操作

 Spark 编程模型(中)_scala_43

 

基本转换操作1

Spark 编程模型(中)_java_44

 

 

 

基本转换操作2

 Spark 编程模型(中)_java_45

 

                                                                                                             控制操作

 

Spark 编程模型(中)_scala_46

   

                                                                                                                                                                          action操作

Spark 编程模型(中)_scala_47

 

举报

相关推荐

0 条评论