0
点赞
收藏
分享

微信扫一扫

pyspark

一、pyspark

为了让Spark支持Python,Apache Spark社区发布了一个工具PySpark。使用PySpark,我们可以使用Python编程语言处理RDD。这一切是由一个名为Py4j的库达到的。其架构如下所示。

pyspark_#yyds干货盘点#

PySpark的优势之一是在开发中允许你直接调用Python的内置库和第三方库如果Spark是本地模式,可以直接调用Python的第三方库。但如果是集群模式的话,则会发生错误.原因是PySpark 需要在各个执行节点的机器上执行操作,而与操作相关的文件存在本地.


二、PySpark程序开发

PySpark的一大优势是允许使用Python调用Spark的,因此允许在开发程序是同时调用Python的众多第三方库。因为Python和Scala语言都支持函数式的编程,也都支持匿名函数。所以大部分的Spark接口在PySpark中都有支持,并且方法名也一致。


1.获得SparkContext对象

from pyspark import SparkContext
sc = SparkContext.getOrCreate()

2.创建RDD

PySpark没有makeRDD()方法,但常用的textFile()、parallelize()、textFile()、wholeTextFiles()。同时在PySpark中针对Python的使用习惯也增加了一些新的接口。PySpark支持的创建RDD的方法如下表所示。

表 PySpark API接口表-1

API

示例

parallelize(c, numSlices=None)

sc.parallelize([0, 2, 3, 4, 6], 5).collect()

range(start, end=None, step=1, numSlices=None)

sc.range(5).collect()

textFile(name, minPartitions=None, use_unicode=True)

sc.textFile(path).collect()

wholeTextFiles(path, minPartitions=None, use_unicode=True)

sc.wholeTextFiles(dirPath).collect()

更详细的关于PySpark RDD创建的相关接口,可以查看官方API文档(http://spark.apache.org/docs/2.0.0/api/python/pyspark.html#module-pyspark)。

3.加载文件

我们在使用Spark的时候有时候需要将一些数据分发到计算节点中,可以使用addFile函数来分发这些文件。addFile方法可以接收本地文件(或者HDFS上的文件),甚至是文件夹(如果是文件夹,必须是HDFS路径),然后Spark的Driver和Exector可以通过SparkFiles.get()方法来获取文件的绝对路径。

调用SparkContext的addFile()方法加载文件。

sc.addFile(path)

调用SparkFiles的get()方法获取文件的绝对路径。

SparkFiles.get(filename)

在PySpark中,为了能够让各个worker节点能加载并执行Python文件中的方法,SparkContext还还添加了一个addPyFile()方法。调用此方法加载Python文件后,可以直接import该Python文件,并调用该文件中的方法。

定义Python文件。

#sci.py
def sqrt(num):
        return num * num
def circle_area(r):
        return 3.14 * sqrt(r)

调用addPyFile()方法,并import该Python文件调用其中的方法。

sc.addPyFile("file:///root/sci.py")

from sci import circle_area

sc.parallelize([5, 9, 21]).map(lambda x : circle_area(x)).collect()

这种使用方式可以令PySpark编程更加灵活,复用之前由Python开发代码,提高代码复用率。

4.匿名函数

在Scala中使用map()、filter()、flatmap()等算子时,经常会使用匿名函数作为这个高阶函数的参数。这里以列表的形式罗列一些PySpark中支持的常用算子及其函数原型。

表 PySpark常用算子及函数原型

算子类型

算子函数原型

转换算子Transformation

map(f, preservesPartitioning=False)

filter(f)

mapValues(f)

distinct(numPartitions=None)

reduceByKey(func, numPartitions=None, partitionFunc=)

groupByKey(numPartitions=None, partitionFunc=)

sortByKey(ascending=True, numPartitions=None, keyfunc= at 0x7f839c2cf758>)

union(other)

join(other, numPartitions=None)

动作算子Actions

count()

collect()

take(num)

first()

reduce(f)

foreach(f)

lookup(key)

max(key=None)

min(key=None)

saveAsTextFile(path, compressionCodecClass=None)

在Python中也可以使用匿名函数,使用的方式是lambda函数。下面示例中,用Scala开发的Spark代码在PySpark中转换成如下代码。

Scala代码:

val a=sc.parallelize(List("dog","tiger","lion","cat","panther","eagle"))
val b=a.map(x=>(x,1))
b.collect

Python代码:

a=sc.parallelize(("dog","tiger","lion","cat","panther","eagle"))
b=a.map(lambda x:(x,1))
b.collect()

注意,在Python中lambda  函数只能有一个表达式。但是结合Python中的生成式和if...else三元表达式,lambda表达式可以覆盖绝大部分的使用场景。

5.SparkSQL

在PySpark中使用SparkSQL时,获得SparkSession对象的方法如下:

from pyspark.sql import SparkSession
# create the spark session
ss = SparkSession.builder.getOrCreate()

在PySpark中使用SparkSQL时,大部分使用方法与Spark中区别不大。但是需要重点掌握的是Spark DataFrame和Pandas DataFrame之间的转换。Pandas是Python数据处理中非常重要的第三方库,很多Python的第三方机器学习库或人工智能库的数据处理阶段都会使用到Pandas。

Pandas DataFrame 转 Spark DataFrame的方法

spark.createDataFrame(pandas_df)

Spark DataFrame转Pandas DataFrame

spark_df.toPandas()




spark读取hive数据

from pyspark.sql import SparkSession,HiveContext

app_name = 'feaHitStatis'
spark = SparkSession.builder.appName(app_name).getOrCreate()
hive_context= HiveContext(spark)

# 生成查询的SQL语句,这个跟hive的查询语句一样,所以也可以加where等条件语句
hive_database = "database1"
hive_table = "test"
hive_read = "select * from {}.{}".format(hive_database, hive_table)
# 通过SQL语句在hive中查询的数据直接是dataframe的形式
read_df = hive_context.sql(hive_read)
read_df.show()

写hive数据

方式1:

df.registerTempTable('test_hive')
hive_context.sql("create table default.write_test select * from test_hive")

方式二:write.mode方式

resB.repartition(1).sortWithinPartitions(resB.fea_cmple).write.mode('overwrite').saveAsTable("dm_mms_lhq.feature_statis_b_%s"%(task_id) )




方式一:配置环境变量 自动读取

spark = SparkSession.builder.master("local[*]")
.appName("test").enableHiveSupport().getOrCreate()
read_df=spark.sql("select * from dm_events.dm_usereventfinal limit 1")
read_df.show()

方式二:不需配置环境变量

spark = SparkSession.builder.master("spark://192.168.142.197:7077") 
.config("hive.metastore.uris","thrift://192.168.142.197:9083")
.appName("test").enableHiveSupport().getOrCreate()
read_df = spark.sql("select * from dm_events.dm_usereventfinal limit 1")
read_df.show()

举报

相关推荐

0 条评论