0
点赞
收藏
分享

微信扫一扫

Spark-shell例子

乱世小白 2022-01-26 阅读 205


​hadoop2@ubuntu:/liguodong/software/spark$ bin/spark-shell --master spark://ubuntu:7077 --executor-memory 2g​

//parallelize演示(并行化scala的数据集)
val num=sc.parallelize(1 to 10) //将数组并行化成RDD,默认分片
val doublenum=num.map(_*2) //每个元素*2
val threenum=doublenum.filter(_%3==0) //过滤出能整除3的元素
//Action触发job的运行
threenum.collect
threenum.toDebugString

//并行化时,并进行分片
val num1=sc.parallelize(1 to 10,6)
val doublenum1=num1.map(_*2)
val threenum1=doublenum1.filter(_ % 3 == 0)
threenum1.collect
threenum1.toDebugString

threenum.cache()
val fournum=threenum.map(x=>x*x)
fournum.collect
fournum.toDebugString
threenum.unpersist() //删除掉cache

num.reduce(_+_) //元素累教
num.take(5) //选择前5个
num.first //第一个元素
num.count //计数
num.take(5).foreach(println)//前5个数打印出来
//K-V演示
val kv1=sc.parallelize(List(("A",1),("B",2),("C",3),("A",4),("B",5)))
//按key进行排序
kv1.sortByKey().collect //注意sortByKey的小括号不能省
//按key分组,不做合并
kv1.groupByKey().collect
==>res13: Array[(String, Iterable[Int])] = Array((B,CompactBuffer(2, 5)),
(A,CompactBuffer(1, 4)), (C,CompactBuffer(3)))

//按key分组,会做合并
kv1.reduceByKey(_+_).collect
==>res15: Array[(String, Int)] = Array((B,7), (A,5), (C,3))

val kv2=sc.parallelize(List(("A",1),("A",4),("C",3),("A",4),("B",5)))
//去重
kv2.distinct.collect
==>res17: Array[(String, Int)] = Array((A,4), (A,1), (B,5), (C,3))
//合并
kv1.union(kv2).collect
==>res18: Array[(String, Int)] = Array((A,1), (B,2), (C,3), (A,4), (B,5), (A,1), (A,4), (C,3), (A,4), (B,5))



val kv3=sc.parallelize(List(("A",10),("B",20),("D",30)))
kv1.join(kv3).collect
==>res21: Array[(String, (Int, Int))] = Array((B,(2,20)), (B,(5,20)), (A,(1,10)), (A,(4,10)))


kv1.cogroup(kv3).collect
==>res22: Array[(String, (Iterable[Int], Iterable[Int]))] = Array((B,(CompactBuffer(2, 5),
CompactBuffer(20))), (D,(CompactBuffer(),CompactBuffer(30))), (A,(CompactBuffer(1, 4),CompactBuffer(10))),
(C,(CompactBuffer(3),CompactBuffer())))


val kv4=sc.parallelize(List(List(1,2),List(3,4)))
kv4.flatMap(x=>x.map(_+1)).collect
==>res23: Array[Int] = Array(2, 3, 4, 5)
//文件读取演示
val rdd1=sc.textFile("hdfs://localhost:9000/liguodong/test.txt")
val words=rdd1.flatMap(_.split(" "))
val wordscount=words.map(x=>(x,1)).reduceByKey(_+_)
wordscount.collect
wordscount.toDebugString

val rdd2=sc.textFile("hdfs://localhost:9000/liguodong/*.txt")
rdd2.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect

//读取压缩文件
val rdd3=sc.textFile("hdfs://localhost:9000/liguodong/oo.txt.gz")
rdd3.flatMap(_.split(" ")).map(x=>(x,1)).reduceByKey(_+_).collect
//日志处理演示
//下载地址:http://download.labs.sogou.com/dl/q.html 完整版(2GB):gz格式
//访问时间\t用户ID\t[查询词]\t该URL在返回结果中的排名\t用户点击的顺序号\t用户点击的URL
//SogouQ1.txt、SogouQ2.txt、SogouQ3.txt分别是用head -n 或者tail -n 从SogouQ数据日志文件中截取

//搜索结果排名第1,但是点击次序排在第2的数据有多少?
val rdd1 = sc.textFile("hdfs://localhost:9000/liguodong/SogouQ1.txt.tar.gz")
val rdd2=rdd1.map(_.split("\t")).filter(_.length==6)
rdd2.count()
val rdd3=rdd2.filter(_(3).toInt==1).filter(_(4).toInt==2)
rdd3.count()
rdd3.toDebugString

//session查询次数排行榜
//排序是true表示升序,flase表示降序
val rdd4=rdd2.map(x=>(x(1),1)).reduceByKey(_+_).map(x=>(x._2,x._1)).sortByKey(false).map(x=>(x._2,x._1))
rdd4.toDebugString
rdd4.saveAsTextFile("hdfs://localhost:9000/liguodong/output1")

//数据的文件可能有多个,我们可以进行合并输出到本地
hadoop2@ubuntu:/liguodong/software/hadoop$ bin/hdfs dfs -getmerge hdfs://localhost:9000/liguodong/output1
/home/hadoop2/liguodong/result

hadoop2@ubuntu:/liguodong/software/hadoop$ ll /home/hadoop2/liguodong/result
-rw-r--r-- 1 hadoop2 hadoop 10477740 Dec 22 16:07 /home/hadoop2/liguodong/result

hadoop2@ubuntu:/liguodong/software/hadoop$ head /home/hadoop2/liguodong/result
(b3c94c37fb154d46c30a360c7941ff7e,676)
(cc7063efc64510c20bcdd604e12a3b26,613)
(955c6390c02797b3558ba223b8201915,391)
(b1e371de5729cdda9270b7ad09484c4f,337)
(6056710d9eafa569ddc800fe24643051,277)
(637b29b47fed3853e117aa7009a4b621,266)
(c9f4ff7790d0615f6f66b410673e3124,231)
(dca9034de17f6c34cfd56db13ce39f1c,226)
(82e53ddb484e632437039048c5901608,221)
(c72ce1164bcd263ba1f69292abdfdf7c,214)





//join演示
val format = new java.text.SimpleDateFormat("yyyy-MM-dd")
case class Register (d: java.util.Date, uuid: String, cust_id: String, lat: Float,lng: Float)
case class Click (d: java.util.Date, uuid: String, landing_page: Int)
val reg = sc.textFile("hdfs://localhost:9000/liguodong/join/reg.tsv").map(_.split("\t")).map(r =>
(r(1), Register(format.parse(r(0)), r(1), r(2), r(3).toFloat, r(4).toFloat)))
val clk = sc.textFile("hdfs://localhost:9000/liguodong/join/clk.tsv").map(_.split("\t")).map(c =>
(c(1), Click(format.parse(c(0)), c(1), c(2).trim.toInt)))

reg.join(clk).take(2)

==>res34: Array[(String, (Register, Click))] = Array((81da510acc4111e387f3600308919594,
(Register(Tue Mar 04 00:00:00 CST 2014,81da510acc4111e387f3600308919594,2,33.85701,-117.85574),
Click(Thu Mar 06 00:00:00 CST 2014,81da510acc4111e387f3600308919594,61))),
(15dfb8e6cc4111e3a5bb600308919594,
(Register(Sun Mar 02 00:00:00 CST 2014,15dfb8e6cc4111e3a5bb600308919594,1,33.659943,-117.95812),
Click(Tue Mar 04 00:00:00 CST 2014,15dfb8e6cc4111e3a5bb600308919594,11))))


//cache()演示
//检查block命令:bin/hdfs fsck /dataguru/data/SogouQ3.txt -files -blocks -locations
val rdd5 = sc.textFile("hdfs://localhost:9000/liguodong/SogouQ1.txt")
rdd5.cache()
rdd5.count()

rdd5.count() //比较时间

cache之前与cache之后时间的比较:

cache了100%到内存之后,速度会快很多。如果内存不够,没有cache到100%,那么速度会提升,但是不能充分发挥优势。内存不够cache只要磁盘空间足够大,仍然是能够执行的。

Spark-shell例子_d3

注:

在Spark中,

map函数会对每一条输入进行指定的操作,然后为每一条输入返回一个对象;

而flatMap函数则是两个操作的集合——正是“先映射后扁平化”:

操作1:同map函数一样:对每一条输入进行指定的操作,然后为每一条输入返回一个对象。

操作2:最后将所有对象合并为一个对象。



举报

相关推荐

0 条评论