0
点赞
收藏
分享

微信扫一扫

spark DataFrame操作


先创建测试数据:

spark DataFrame操作_big data

开启zookeeper、hadoop和spark集群

在hdfs中创建spark文件夹,再将本地文件上传过去

[root@hadoop01 data]# hdfs dfs -mkdir /spark
[root@hadoop01 data]# hdfs dfs -put /export/data/person.txt /spark

在读取时会出现两个bug,分别是:

Failed to get database global_temp,returning NoSuchObjectException.

Error creating transactional connection factory.

解决如下:

1、需要将hive中conf目录的配置文件hive-site.xml传到spark的conf目录中

2、mysql作为元数据数据库,需要在spark-shell启动时添加驱动,–jars包

spark DataFrame操作_big data_02

spark DataFrame操作_sql_03

spark-shell启动方式:

spark DataFrame操作_scala_04

Spark读取数据源的方式进行创建DataFrame

scala> val personDF = spark.read.text("/spark/person.txt")
personDF: org.apache.spark.sql.DataFrame = [value: string]

scala> personDF.printSchema()
root
|-- value: string (nullable = true)

DataFrame的show()方法可以查看当前DataFrame的结果数据

scala> personDF.show()
+-------------+
| value|
+-------------+
|1 zhangsan 20|
| 2 lisi 29|
| 3 wangwu 25|
| 4 zhaoliu 30|
| 5 tianqi 35|
| 6 jerry 40|
+-------------+

RDD的toDF()方法,可以将RDD转换为DataFrame对象

scala> val lineRDD = sc.textFile("/spark/person.txt").map(_.split(" "))
lineRDD: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[6] at map at <console>:24

scala> case class Person(id:Int,name:String,age:Int)
defined class Person


scala> val personRDD = lineRDD.map(x => Person(x(0).toInt, x(1), x(2).toInt))
personRDD: org.apache.spark.rdd.RDD[Person] = MapPartitionsRDD[7] at map at <console>:27

scala> val personDF = personRDD.toDF()
personDF: org.apache.spark.sql.DataFrame = [id: int, name: string ... 1 more field]

scala> personDF.show
+---+--------+---+
| id| name|age|
+---+--------+---+
| 1|zhangsan| 20|
| 2| lisi| 29|
| 3| wangwu| 25|
| 4| zhaoliu| 30|
| 5| tianqi| 35|
| 6| jerry| 40|
+---+--------+---+


scala> personDF.printSchema
root
|-- id: integer (nullable = false)
|-- name: string (nullable = true)
|-- age: integer (nullable = false)

查看personDF对象的name字段数据

scala> personDF.select(personDF.col("name")).show()
+--------+
| name|
+--------+
|zhangsan|
| lisi|
| wangwu|
| zhaoliu|
| tianqi|
| jerry|
+--------+

select()操作可以实现对列名进行重命名

scala> personDF.select(personDF("name").as("username"),personDF("age")).show()
+--------+---+
|username|age|
+--------+---+
|zhangsan| 20|
| lisi| 29|
| wangwu| 25|
| zhaoliu| 30|
| tianqi| 35|
| jerry| 40|
+--------+---+

过滤age大于等于25的数据

scala> personDF.filter(personDF("age") >= 25).show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| 2| lisi| 29|
| 3| wangwu| 25|
| 4|zhaoliu| 30|
| 5| tianqi| 35|
| 6| jerry| 40|
+---+-------+---+

按年龄进行分组并统计相同年龄的人数

scala> personDF.groupBy("age").count().show()
+---+-----+
|age|count|
+---+-----+
| 20| 1|
| 40| 1|
| 35| 1|
| 25| 1|
| 29| 1|
| 30| 1|
+---+-----+

按年龄降序排列

scala> personDF.sort(personDF("age").desc).show()
+---+--------+---+
| id| name|age|
+---+--------+---+
| 6| jerry| 40|
| 5| tianqi| 35|
| 4| zhaoliu| 30|
| 2| lisi| 29|
| 3| wangwu| 25|
| 1|zhangsan| 20|
+---+--------+---+

使用SQL风格操作的前提是需要将DataFrame注册成一个临时表

scala> personDF.registerTempTable("t_person")
warning: there was one deprecation warning; re-run with -deprecation for details

查询年龄最大的前两名人的信息

scala> spark.sql("select * from t_person order by age desc limit 2").show()
+---+------+---+
| id| name|age|
+---+------+---+
| 6| jerry| 40|
| 5|tianqi| 35|
+---+------+---+

查询年龄大于25的人的信息

scala> spark.sql("select * from t_person where age > 25").show()
+---+-------+---+
| id| name|age|
+---+-------+---+
| 2| lisi| 29|
| 4|zhaoliu| 30|
| 5| tianqi| 35|
| 6| jerry| 40|
+---+-------+---+



举报

相关推荐

0 条评论