0
点赞
收藏
分享

微信扫一扫

26.Spark创建RDD集合

AbrahamW 2022-06-17 阅读 34

打开eclipse创建maven项目

26.Spark创建RDD集合_apache

 

 

 

26.Spark创建RDD集合_java_02

 

 

 

 26.Spark创建RDD集合_java_03

 

 

26.Spark创建RDD集合_spark_04

 

 

26.Spark创建RDD集合_apache_05

 

 

 

pom.xml文件

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>com.it19gong</groupId>
<artifactId>sparkproject</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<name>sparkproject</name>
<url>http://maven.apache.org</url>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>

<dependencies>
<dependency>
<groupId>junit</groupId>
<artifactId>junit</artifactId>
<version>3.8.1</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_2.10</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_2.10</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-hive_2.10</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_2.10</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka_2.10</artifactId>
<version>1.5.1</version>
</dependency>
<dependency>
<groupId>mysql</groupId>
<artifactId>mysql-connector-java</artifactId>
<version>5.1.6</version>
</dependency>

<dependency>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-core</artifactId>
<version>3.2.1</version>
</dependency>


<dependency>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-rdbms</artifactId>
<version>4.1.7</version>
</dependency>


<dependency>
<groupId>org.datanucleus</groupId>
<artifactId>datanucleus-api-jdo</artifactId>
<version>3.2.6</version>
</dependency>

</dependencies>

<build>
<sourceDirectory>src/main/java</sourceDirectory>
<testSourceDirectory>src/main/test</testSourceDirectory>

<plugins>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
<descriptorRefs>
<descriptorRef>jar-with-dependencies</descriptorRef>
</descriptorRefs>
<archive>
<manifest>
<mainClass></mainClass>
</manifest>
</archive>
</configuration>
<executions>
<execution>
<id>make-assembly</id>
<phase>package</phase>
<goals>
<goal>single</goal>
</goals>
</execution>
</executions>
</plugin>

<plugin>
<groupId>org.codehaus.mojo</groupId>
<artifactId>exec-maven-plugin</artifactId>
<version>1.2.1</version>
<executions>
<execution>
<goals>
<goal>exec</goal>
</goals>
</execution>
</executions>
<configuration>
<executable>java</executable>
<includeProjectDependencies>true</includeProjectDependencies>
<includePluginDependencies>false</includePluginDependencies>
<classpathScope>compile</classpathScope>
<mainClass>com.it19gong.sparkproject.App</mainClass>
</configuration>
</plugin>

<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-compiler-plugin</artifactId>
<configuration>
<source>1.6</source>
<target>1.6</target>
</configuration>
</plugin>

</plugins>
</build>

</project>

 

 

创建一个WordCountLocal.java文件

26.Spark创建RDD集合_spark_06

 

 

package com.it19gong.sparkproject;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class WordCountLocal {

public static void main(String[] args) {
//1.设置本地开发
SparkConf conf = new SparkConf().setAppName("WordCountLocal").setMaster("local");

//2.创建spark上下文
JavaSparkContext sc = new JavaSparkContext(conf);
//3.读取文件
JavaRDD<String> lines = sc.textFile("E://Mycode//dianshixiangmu//sparkproject//data//spark.txt");
//4.开始进行计算
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

private static final long serialVersionUID = 1L;

@Override
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}

});

JavaPairRDD<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;

@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}

});

JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(

new Function2<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}

});

wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {

private static final long serialVersionUID = 1L;

@Override
public void call(Tuple2<String, Integer> wordCount) throws Exception {
System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");
}

});

sc.close();


}
}

 

 

运行一下

26.Spark创建RDD集合_apache_07

 

 

Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
19/11/10 14:12:18 INFO SparkContext: Running Spark version 1.5.1
19/11/10 14:12:18 INFO SecurityManager: Changing view acls to: Brave
19/11/10 14:12:18 INFO SecurityManager: Changing modify acls to: Brave
19/11/10 14:12:18 INFO SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(Brave); users with modify permissions: Set(Brave)
19/11/10 14:12:19 INFO Slf4jLogger: Slf4jLogger started
19/11/10 14:12:19 INFO Remoting: Starting remoting
19/11/10 14:12:19 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@10.0.75.1:55718]
19/11/10 14:12:19 INFO Utils: Successfully started service 'sparkDriver' on port 55718.
19/11/10 14:12:19 INFO SparkEnv: Registering MapOutputTracker
19/11/10 14:12:19 INFO SparkEnv: Registering BlockManagerMaster
19/11/10 14:12:19 INFO DiskBlockManager: Created local directory at C:\Users\Brave\AppData\Local\Temp\blockmgr-58932928-9dcc-40bd-86b9-9056cb077e9e
19/11/10 14:12:19 INFO MemoryStore: MemoryStore started with capacity 2.9 GB
19/11/10 14:12:19 INFO HttpFileServer: HTTP File server directory is C:\Users\Brave\AppData\Local\Temp\spark-7e77199e-97b8-4ad1-850d-45a4b9dbb981\httpd-2e225558-a380-410a-83ab-6d4353461237
19/11/10 14:12:19 INFO HttpServer: Starting HTTP Server
19/11/10 14:12:19 INFO Utils: Successfully started service 'HTTP file server' on port 55719.
19/11/10 14:12:19 INFO SparkEnv: Registering OutputCommitCoordinator
19/11/10 14:12:19 INFO Utils: Successfully started service 'SparkUI' on port 4040.
19/11/10 14:12:19 INFO SparkUI: Started SparkUI at http://10.0.75.1:4040
19/11/10 14:12:19 WARN MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
19/11/10 14:12:19 INFO Executor: Starting executor ID driver on host localhost
19/11/10 14:12:19 INFO Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 55756.
19/11/10 14:12:19 INFO NettyBlockTransferService: Server created on 55756
19/11/10 14:12:19 INFO BlockManagerMaster: Trying to register BlockManager
19/11/10 14:12:19 INFO BlockManagerMasterEndpoint: Registering block manager localhost:55756 with 2.9 GB RAM, BlockManagerId(driver, localhost, 55756)
19/11/10 14:12:19 INFO BlockManagerMaster: Registered BlockManager
19/11/10 14:12:20 INFO MemoryStore: ensureFreeSpace(120136) called with curMem=0, maxMem=3086525399
19/11/10 14:12:20 INFO MemoryStore: Block broadcast_0 stored as values in memory (estimated size 117.3 KB, free 2.9 GB)
19/11/10 14:12:20 INFO MemoryStore: ensureFreeSpace(12681) called with curMem=120136, maxMem=3086525399
19/11/10 14:12:20 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 12.4 KB, free 2.9 GB)
19/11/10 14:12:20 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:55756 (size: 12.4 KB, free: 2.9 GB)
19/11/10 14:12:20 INFO SparkContext: Created broadcast 0 from textFile at WordCountLocal.java:25
19/11/10 14:12:22 WARN : Your hostname, DESKTOP-76BE8V4 resolves to a loopback/non-reachable address: fe80:0:0:0:597b:a8f9:8691:f5d2%eth10, but we couldn't find any external IP address!
19/11/10 14:12:23 INFO FileInputFormat: Total input paths to process : 1
19/11/10 14:12:23 INFO SparkContext: Starting job: foreach at WordCountLocal.java:62
19/11/10 14:12:23 INFO DAGScheduler: Registering RDD 3 (mapToPair at WordCountLocal.java:38)
19/11/10 14:12:23 INFO DAGScheduler: Got job 0 (foreach at WordCountLocal.java:62) with 1 output partitions
19/11/10 14:12:23 INFO DAGScheduler: Final stage: ResultStage 1(foreach at WordCountLocal.java:62)
19/11/10 14:12:23 INFO DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
19/11/10 14:12:23 INFO DAGScheduler: Missing parents: List(ShuffleMapStage 0)
19/11/10 14:12:23 INFO DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at mapToPair at WordCountLocal.java:38), which has no missing parents
19/11/10 14:12:23 INFO MemoryStore: ensureFreeSpace(4832) called with curMem=132817, maxMem=3086525399
19/11/10 14:12:23 INFO MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.7 KB, free 2.9 GB)
19/11/10 14:12:23 INFO MemoryStore: ensureFreeSpace(2688) called with curMem=137649, maxMem=3086525399
19/11/10 14:12:23 INFO MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.6 KB, free 2.9 GB)
19/11/10 14:12:23 INFO BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:55756 (size: 2.6 KB, free: 2.9 GB)
19/11/10 14:12:23 INFO SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861
19/11/10 14:12:23 INFO DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at mapToPair at WordCountLocal.java:38)
19/11/10 14:12:23 INFO TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
19/11/10 14:12:23 INFO TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, PROCESS_LOCAL, 2156 bytes)
19/11/10 14:12:23 INFO Executor: Running task 0.0 in stage 0.0 (TID 0)
19/11/10 14:12:23 INFO HadoopRDD: Input split: file:/E:/Mycode/dianshixiangmu/sparkproject/data/spark.txt:0+159
19/11/10 14:12:23 INFO deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
19/11/10 14:12:23 INFO deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
19/11/10 14:12:23 INFO deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
19/11/10 14:12:23 INFO deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
19/11/10 14:12:23 INFO deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
19/11/10 14:12:23 INFO Executor: Finished task 0.0 in stage 0.0 (TID 0). 2253 bytes result sent to driver
19/11/10 14:12:23 INFO TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 83 ms on localhost (1/1)
19/11/10 14:12:23 INFO TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
19/11/10 14:12:23 INFO DAGScheduler: ShuffleMapStage 0 (mapToPair at WordCountLocal.java:38) finished in 0.094 s
19/11/10 14:12:23 INFO DAGScheduler: looking for newly runnable stages
19/11/10 14:12:23 INFO DAGScheduler: running: Set()
19/11/10 14:12:23 INFO DAGScheduler: waiting: Set(ResultStage 1)
19/11/10 14:12:23 INFO DAGScheduler: failed: Set()
19/11/10 14:12:23 INFO DAGScheduler: Missing parents for ResultStage 1: List()
19/11/10 14:12:23 INFO DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[4] at reduceByKey at WordCountLocal.java:49), which is now runnable
19/11/10 14:12:23 INFO MemoryStore: ensureFreeSpace(2496) called with curMem=140337, maxMem=3086525399
19/11/10 14:12:23 INFO MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.4 KB, free 2.9 GB)
19/11/10 14:12:23 INFO MemoryStore: ensureFreeSpace(1509) called with curMem=142833, maxMem=3086525399
19/11/10 14:12:23 INFO MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1509.0 B, free 2.9 GB)
19/11/10 14:12:23 INFO BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:55756 (size: 1509.0 B, free: 2.9 GB)
19/11/10 14:12:23 INFO SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:861
19/11/10 14:12:23 INFO DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[4] at reduceByKey at WordCountLocal.java:49)
19/11/10 14:12:23 INFO TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
19/11/10 14:12:23 INFO TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1901 bytes)
19/11/10 14:12:23 INFO Executor: Running task 0.0 in stage 1.0 (TID 1)
19/11/10 14:12:23 INFO ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
19/11/10 14:12:23 INFO ShuffleBlockFetcherIterator: Started 0 remote fetches in 3 ms
jkjf appeared 1 times.
spark appeared 4 times.
hive appeared 3 times.
klsdjflk appeared 1 times.
hadoop appeared 3 times.
flume appeared 2 times.
appeared 6 times.
dshfjdslfjk appeared 1 times.
sdfjjk appeared 1 times.
djfk appeared 1 times.
hava appeared 1 times.
java appeared 3 times.
sdjfk appeared 1 times.
sdfjs appeared 1 times.
19/11/10 14:12:23 INFO Executor: Finished task 0.0 in stage 1.0 (TID 1). 1165 bytes result sent to driver
19/11/10 14:12:23 INFO TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 24 ms on localhost (1/1)
19/11/10 14:12:23 INFO TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
19/11/10 14:12:23 INFO DAGScheduler: ResultStage 1 (foreach at WordCountLocal.java:62) finished in 0.024 s
19/11/10 14:12:23 INFO DAGScheduler: Job 0 finished: foreach at WordCountLocal.java:62, took 0.189574 s
19/11/10 14:12:23 INFO SparkUI: Stopped Spark web UI at http://10.0.75.1:4040
19/11/10 14:12:23 INFO DAGScheduler: Stopping DAGScheduler
19/11/10 14:12:23 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/11/10 14:12:23 INFO MemoryStore: MemoryStore cleared
19/11/10 14:12:23 INFO BlockManager: BlockManager stopped
19/11/10 14:12:23 INFO BlockManagerMaster: BlockManagerMaster stopped
19/11/10 14:12:23 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/11/10 14:12:23 INFO SparkContext: Successfully stopped SparkContext
19/11/10 14:12:23 INFO ShutdownHookManager: Shutdown hook called
19/11/10 14:12:23 INFO ShutdownHookManager: Deleting directory C:\Users\Brave\AppData\Local\Temp\spark-7e77199e-97b8-4ad1-850d-45a4b9dbb981

 

 

 

集群执行wordcount程序

创建一个WordCountCluster.java文件

        // 如果要在spark集群上运行,需要修改的,只有两个地方

                   // 第一,将SparkConf的setMaster()方法给删掉,默认它自己会去连接

                   // 第二,我们针对的不是本地文件了,修改为hadoop hdfs上的真正的存储大数据的文件

                   SparkConf conf = new SparkConf().setAppName("WordCountCluster");                   

                   JavaSparkContext sc = new JavaSparkContext(conf);

                   JavaRDD<String> lines = sc.textFile("hdfs://node1:9000/spark.txt");

package com.it19gong.sparkproject;

import java.util.Arrays;

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.FlatMapFunction;
import org.apache.spark.api.java.function.Function2;
import org.apache.spark.api.java.function.PairFunction;
import org.apache.spark.api.java.function.VoidFunction;

import scala.Tuple2;

public class WordCountCluster {

public static void main(String[] args) {
//1.设置本地开发
SparkConf conf = new SparkConf().setAppName("WordCountCluster");

//2.创建spark上下文
JavaSparkContext sc = new JavaSparkContext(conf);
//3.读取文件
JavaRDD<String> lines = sc.textFile("hdfs://node1/spark.txt");
//4.开始进行计算
JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {

private static final long serialVersionUID = 1L;

@Override
public Iterable<String> call(String line) throws Exception {
return Arrays.asList(line.split(" "));
}

});

JavaPairRDD<String, Integer> pairs = words.mapToPair(
new PairFunction<String, String, Integer>() {
private static final long serialVersionUID = 1L;

@Override
public Tuple2<String, Integer> call(String word) throws Exception {
return new Tuple2<String, Integer>(word, 1);
}

});

JavaPairRDD<String, Integer> wordCounts = pairs.reduceByKey(

new Function2<Integer, Integer, Integer>() {

private static final long serialVersionUID = 1L;

@Override
public Integer call(Integer v1, Integer v2) throws Exception {
return v1 + v2;
}

});

wordCounts.foreach(new VoidFunction<Tuple2<String,Integer>>() {

private static final long serialVersionUID = 1L;

@Override
public void call(Tuple2<String, Integer> wordCount) throws Exception {
System.out.println(wordCount._1 + " appeared " + wordCount._2 + " times.");
}

});

sc.close();


}
}

 

 

对工程进行打包

26.Spark创建RDD集合_spark_08

 

 26.Spark创建RDD集合_apache_09

 

 26.Spark创建RDD集合_java_10

 

 

 

把spark.txt文件上传到集群

26.Spark创建RDD集合_java_11

 

 

 

同时把文件上传到hdfs上

26.Spark创建RDD集合_spark_12

 

 

把刚刚打好的架包上传到集群

26.Spark创建RDD集合_java_13

 

 

 

修改一下包的名字

26.Spark创建RDD集合_java_14

 

 

 

 

现在我们编写一个脚本

Wordcount.sh

/opt/modules/spark-1.5.1-bin-hadoop2.6/bin/spark-submit --class com.it19gong.sparkproject.WordCountCluster --num-executors 3--driver-memory 100m --executor-memory 100m --executor-cores 3/home/hadoop/sparkproject.jar

 

 

26.Spark创建RDD集合_apache_15

 

 

 

启动spark

26.Spark创建RDD集合_spark_16

 

 

执行脚本

[hadoop@node1 ~]$ ./Wordcount.sh 
19/11/10 15:33:40 INFO spark.SparkContext: Running Spark version 1.5.1
19/11/10 15:33:41 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
19/11/10 15:33:41 INFO spark.SecurityManager: Changing view acls to: hadoop
19/11/10 15:33:41 INFO spark.SecurityManager: Changing modify acls to: hadoop
19/11/10 15:33:41 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(hadoop); users with modify permissions: Set(hadoop)
19/11/10 15:33:42 INFO slf4j.Slf4jLogger: Slf4jLogger started
19/11/10 15:33:42 INFO Remoting: Starting remoting
19/11/10 15:33:42 INFO util.Utils: Successfully started service 'sparkDriver' on port 42358.
19/11/10 15:33:42 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriver@192.168.86.131:42358]
19/11/10 15:33:42 INFO spark.SparkEnv: Registering MapOutputTracker
19/11/10 15:33:42 INFO spark.SparkEnv: Registering BlockManagerMaster
19/11/10 15:33:42 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-2e4d079e-a368-4779-93e6-de99c948e0d2
19/11/10 15:33:42 INFO storage.MemoryStore: MemoryStore started with capacity 52.2 MB
19/11/10 15:33:42 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-b3327381-3897-4200-913a-0b09e354f0e1/httpd-1cd444b4-38c9-4cec-9e16-66d0d2c1117c
19/11/10 15:33:42 INFO spark.HttpServer: Starting HTTP Server
19/11/10 15:33:43 INFO server.Server: jetty-8.y.z-SNAPSHOT
19/11/10 15:33:43 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:40646
19/11/10 15:33:43 INFO util.Utils: Successfully started service 'HTTP file server' on port 40646.
19/11/10 15:33:43 INFO spark.SparkEnv: Registering OutputCommitCoordinator
19/11/10 15:33:43 INFO server.Server: jetty-8.y.z-SNAPSHOT
19/11/10 15:33:43 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
19/11/10 15:33:43 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
19/11/10 15:33:43 INFO ui.SparkUI: Started SparkUI at http://192.168.86.131:4040
19/11/10 15:33:44 INFO spark.SparkContext: Added JAR file:/home/hadoop/sparkproject.jar at http://192.168.86.131:40646/jars/sparkproject.jar with timestamp 1573371224734
19/11/10 15:33:45 WARN metrics.MetricsSystem: Using default name DAGScheduler for source because spark.app.id is not set.
19/11/10 15:33:45 INFO executor.Executor: Starting executor ID driver on host localhost
19/11/10 15:33:46 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 35773.
19/11/10 15:33:46 INFO netty.NettyBlockTransferService: Server created on 35773
19/11/10 15:33:46 INFO storage.BlockManagerMaster: Trying to register BlockManager
19/11/10 15:33:46 INFO storage.BlockManagerMasterEndpoint: Registering block manager localhost:35773 with 52.2 MB RAM, BlockManagerId(driver, localhost, 35773)
19/11/10 15:33:46 INFO storage.BlockManagerMaster: Registered BlockManager
19/11/10 15:33:47 INFO storage.MemoryStore: ensureFreeSpace(130448) called with curMem=0, maxMem=54747463
19/11/10 15:33:47 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 127.4 KB, free 52.1 MB)
19/11/10 15:33:47 INFO storage.MemoryStore: ensureFreeSpace(14403) called with curMem=130448, maxMem=54747463
19/11/10 15:33:47 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 14.1 KB, free 52.1 MB)
19/11/10 15:33:47 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on localhost:35773 (size: 14.1 KB, free: 52.2 MB)
19/11/10 15:33:47 INFO spark.SparkContext: Created broadcast 0 from textFile at WordCountCluster.java:25
19/11/10 15:33:48 INFO mapred.FileInputFormat: Total input paths to process : 1
19/11/10 15:33:48 INFO spark.SparkContext: Starting job: foreach at WordCountCluster.java:62
19/11/10 15:33:48 INFO scheduler.DAGScheduler: Registering RDD 3 (mapToPair at WordCountCluster.java:38)
19/11/10 15:33:48 INFO scheduler.DAGScheduler: Got job 0 (foreach at WordCountCluster.java:62) with 1 output partitions
19/11/10 15:33:48 INFO scheduler.DAGScheduler: Final stage: ResultStage 1(foreach at WordCountCluster.java:62)
19/11/10 15:33:48 INFO scheduler.DAGScheduler: Parents of final stage: List(ShuffleMapStage 0)
19/11/10 15:33:48 INFO scheduler.DAGScheduler: Missing parents: List(ShuffleMapStage 0)
19/11/10 15:33:49 INFO scheduler.DAGScheduler: Submitting ShuffleMapStage 0 (MapPartitionsRDD[3] at mapToPair at WordCountCluster.java:38), which has no missing parents
19/11/10 15:33:49 INFO storage.MemoryStore: ensureFreeSpace(4800) called with curMem=144851, maxMem=54747463
19/11/10 15:33:49 INFO storage.MemoryStore: Block broadcast_1 stored as values in memory (estimated size 4.7 KB, free 52.1 MB)
19/11/10 15:33:49 INFO storage.MemoryStore: ensureFreeSpace(2665) called with curMem=149651, maxMem=54747463
19/11/10 15:33:49 INFO storage.MemoryStore: Block broadcast_1_piece0 stored as bytes in memory (estimated size 2.6 KB, free 52.1 MB)
19/11/10 15:33:49 INFO storage.BlockManagerInfo: Added broadcast_1_piece0 in memory on localhost:35773 (size: 2.6 KB, free: 52.2 MB)
19/11/10 15:33:49 INFO spark.SparkContext: Created broadcast 1 from broadcast at DAGScheduler.scala:861
19/11/10 15:33:49 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ShuffleMapStage 0 (MapPartitionsRDD[3] at mapToPair at WordCountCluster.java:38)
19/11/10 15:33:49 INFO scheduler.TaskSchedulerImpl: Adding task set 0.0 with 1 tasks
19/11/10 15:33:49 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, localhost, ANY, 2179 bytes)
19/11/10 15:33:49 INFO executor.Executor: Running task 0.0 in stage 0.0 (TID 0)
19/11/10 15:33:49 INFO executor.Executor: Fetching http://192.168.86.131:40646/jars/sparkproject.jar with timestamp 1573371224734
19/11/10 15:33:49 INFO util.Utils: Fetching http://192.168.86.131:40646/jars/sparkproject.jar to /tmp/spark-b3327381-3897-4200-913a-0b09e354f0e1/userFiles-5ed25560-5fd3-43ce-8f5f-d5017b6e5c4e/fetchFileTemp5320647246797663342.tmp
19/11/10 15:33:51 INFO executor.Executor: Adding file:/tmp/spark-b3327381-3897-4200-913a-0b09e354f0e1/userFiles-5ed25560-5fd3-43ce-8f5f-d5017b6e5c4e/sparkproject.jar to class loader
19/11/10 15:33:51 INFO rdd.HadoopRDD: Input split: hdfs://node1/spark.txt:0+159
19/11/10 15:33:51 INFO Configuration.deprecation: mapred.tip.id is deprecated. Instead, use mapreduce.task.id
19/11/10 15:33:51 INFO Configuration.deprecation: mapred.task.id is deprecated. Instead, use mapreduce.task.attempt.id
19/11/10 15:33:51 INFO Configuration.deprecation: mapred.task.is.map is deprecated. Instead, use mapreduce.task.ismap
19/11/10 15:33:51 INFO Configuration.deprecation: mapred.task.partition is deprecated. Instead, use mapreduce.task.partition
19/11/10 15:33:51 INFO Configuration.deprecation: mapred.job.id is deprecated. Instead, use mapreduce.job.id
19/11/10 15:33:52 INFO executor.Executor: Finished task 0.0 in stage 0.0 (TID 0). 2253 bytes result sent to driver
19/11/10 15:33:52 INFO scheduler.DAGScheduler: ShuffleMapStage 0 (mapToPair at WordCountCluster.java:38) finished in 3.353 s
19/11/10 15:33:52 INFO scheduler.DAGScheduler: looking for newly runnable stages
19/11/10 15:33:52 INFO scheduler.DAGScheduler: running: Set()
19/11/10 15:33:52 INFO scheduler.DAGScheduler: waiting: Set(ResultStage 1)
19/11/10 15:33:52 INFO scheduler.DAGScheduler: failed: Set()
19/11/10 15:33:52 INFO scheduler.DAGScheduler: Missing parents for ResultStage 1: List()
19/11/10 15:33:52 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 3337 ms on localhost (1/1)
19/11/10 15:33:52 INFO scheduler.DAGScheduler: Submitting ResultStage 1 (ShuffledRDD[4] at reduceByKey at WordCountCluster.java:49), which is now runnable
19/11/10 15:33:52 INFO storage.MemoryStore: ensureFreeSpace(2496) called with curMem=152316, maxMem=54747463
19/11/10 15:33:52 INFO storage.MemoryStore: Block broadcast_2 stored as values in memory (estimated size 2.4 KB, free 52.1 MB)
19/11/10 15:33:52 INFO storage.MemoryStore: ensureFreeSpace(1511) called with curMem=154812, maxMem=54747463
19/11/10 15:33:52 INFO storage.MemoryStore: Block broadcast_2_piece0 stored as bytes in memory (estimated size 1511.0 B, free 52.1 MB)
19/11/10 15:33:52 INFO storage.BlockManagerInfo: Added broadcast_2_piece0 in memory on localhost:35773 (size: 1511.0 B, free: 52.2 MB)
19/11/10 15:33:52 INFO spark.SparkContext: Created broadcast 2 from broadcast at DAGScheduler.scala:861
19/11/10 15:33:52 INFO scheduler.DAGScheduler: Submitting 1 missing tasks from ResultStage 1 (ShuffledRDD[4] at reduceByKey at WordCountCluster.java:49)
19/11/10 15:33:52 INFO scheduler.TaskSchedulerImpl: Adding task set 1.0 with 1 tasks
19/11/10 15:33:52 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 0.0, whose tasks have all completed, from pool
19/11/10 15:33:52 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 1.0 (TID 1, localhost, PROCESS_LOCAL, 1960 bytes)
19/11/10 15:33:52 INFO executor.Executor: Running task 0.0 in stage 1.0 (TID 1)
19/11/10 15:33:52 INFO storage.ShuffleBlockFetcherIterator: Getting 1 non-empty blocks out of 1 blocks
19/11/10 15:33:52 INFO storage.ShuffleBlockFetcherIterator: Started 0 remote fetches in 8 ms
jkjf appeared 1 times.
spark appeared 4 times.
hive appeared 3 times.
klsdjflk appeared 1 times.
hadoop appeared 3 times.
flume appeared 2 times.
appeared 6 times.
dshfjdslfjk appeared 1 times.
sdfjjk appeared 1 times.
djfk appeared 1 times.
hava appeared 1 times.
java appeared 3 times.
sdjfk appeared 1 times.
sdfjs appeared 1 times.
19/11/10 15:33:52 INFO executor.Executor: Finished task 0.0 in stage 1.0 (TID 1). 1165 bytes result sent to driver
19/11/10 15:33:52 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 1.0 (TID 1) in 340 ms on localhost (1/1)
19/11/10 15:33:52 INFO scheduler.TaskSchedulerImpl: Removed TaskSet 1.0, whose tasks have all completed, from pool
19/11/10 15:33:52 INFO scheduler.DAGScheduler: ResultStage 1 (foreach at WordCountCluster.java:62) finished in 0.340 s
19/11/10 15:33:52 INFO scheduler.DAGScheduler: Job 0 finished: foreach at WordCountCluster.java:62, took 4.028079 s
19/11/10 15:33:52 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
19/11/10 15:33:53 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
19/11/10 15:33:53 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.86.131:4040
19/11/10 15:33:53 INFO scheduler.DAGScheduler: Stopping DAGScheduler
19/11/10 15:33:53 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/11/10 15:33:53 INFO storage.MemoryStore: MemoryStore cleared
19/11/10 15:33:53 INFO storage.BlockManager: BlockManager stopped
19/11/10 15:33:53 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
19/11/10 15:33:53 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/11/10 15:33:53 INFO spark.SparkContext: Successfully stopped SparkContext
19/11/10 15:33:53 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
19/11/10 15:33:53 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
19/11/10 15:33:53 INFO util.ShutdownHookManager: Shutdown hook called
19/11/10 15:33:53 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-b3327381-3897-4200-913a-0b09e354f0e1
[hadoop@node1 ~]$ ls

 

 

 

创建RDD(集合,本地文件,HDFS文件)

 

进行Spark核心编程时,首先要做的第一件事,就是创建一个初始的RDD。该RDD中,通常就代表和包含了Spark应用程序的输入源数据。

然后在创建了初始的RDD之后,才可以通过Spark Core提供的transformation算子,对该RDD进行转换,来获取其他的RDD。

 

Spark Core提供了三种创建RDD的方式,包括:使用程序中的集合创建RDD;使用本地文件创建RDD;使用HDFS文件创建RDD。

 

个人经验认为:

1、使用程序中的集合创建RDD,主要用于进行测试,可以在实际部署到集群运行之前,自己使用集合构造测试数据,来测试后面的spark应用的流程。

2、使用本地文件创建RDD,主要用于临时性地处理一些存储了大量数据的文件。

3、使用HDFS文件创建RDD,应该是最常用的生产环境处理方式,主要可以针对HDFS上存储的大数据,进行离线批处理操作。

 

 

并行化集合创建RDD

如果要通过并行化集合来创建RDD,需要针对程序中的集合,调用SparkContext的parallelize()方法。Spark会将集合中的数据拷贝到集群上去,形成一个分布式的数据集合,

也就是一个RDD。相当于是,集合中的部分数据会到一个节点上,而另一部分数据会到其他节点上。然后就可以用并行的方式来操作这个分布式数据集合,即RDD。

 

举报

相关推荐

0 条评论