Fork me on GitHub

Pyspark系列文章-Spark SQL与Hive交互实践

目录

  • 背景

  • 第一部分 Spark SQL

  • 第二部分 Spark SQL实践

  • 第三部分

  • 参考文献及资料

背景

第一部分 Spark SQL

1.1 Spark 2.0之前

Spark 1.6Spark 2.0之前)中,我们通过Spark SQL用于处理结构化数据。Spark SQL的入口点是SQLContext类或者子类HiveContext

例如下面的例子(Pyspark),可以访问Hive仓库中表(default库中的users表):

注:需要在Spark conf目录(classpath)下配置hive相关配置(hive-site.xml),才能访问Hive MetaStore,实现读写Hive表。

1
2
3
4
5
from pyspark.sql import HiveContext
# Pyspark on Jupyter,sc(sparkContext)已创建
hiveContext = HiveContext(sc)
tableExample = hiveContext.table("default.users")
tableExample.show()

另外也支持将DataFrame注册成Table,通过SQL语言处理DataFrame数据。例如下面的例子:

1
2
3
4
5
6
7
8
from pyspark.sql import SQLContext
sqlContext = SQLContext(sc)
# 从外部数据源创建DataFrame
jsonData = "file:///opt/spark-1.5.1/examples/src/main/resources/people.json"
df = sqlContext.read.json(jsonData)
# 注册表
sqlContext.registerDataFrameAsTable(df, "tempTable")
sqlContext.sql("SELECT * from tempTable limit 1").show()

注:SQLContext现在只支持sql语法解析器(SQL-92语法),而HiveContext现在既支持sql语法解析器又支持hivesql语法解析器,默认为hivesql语法解析器,用户可以通过配置参数(spark.sql.dialect)切换成sql语法解析器,来运行hiveql不支持的语法。

通常我们直接使用HiveContextHiveContext is a super set of the SQLContext. Hortonworks and the Spark community suggest using the HiveContext.)。但是需要Hive启动MetaStoreThrift接口服务。

1.2 Spark 2.0 之后

Spark 2.0之前,SparkContext是Spark的唯一切入点(Entry Point),其他Context都是基于其实现。例如下面例子:

1
2
3
4
5
6
7
8
9
10
11
12
13
14
from pyspark import SparkContext
from pyspark.sql import SQLContext
from pyspark.sql import HiveContext
from pyspark.streaming import StreamingContext
from pyspark import SparkConf
# 实例化SparkContext,通常起名sc
conf = SparkConf()
sc = SparkContext(conf=conf)
# hive
hiveContext = HiveContext(sc)
# sql
sqlContext = SQLContext(sc)
# 流处理
streamingcontext = StreamingContext(sc)

Spark2.0开始只需要创建一个SparkSession就够了,SparkConfSparkContextSQLContext都已经被封装在SparkSession当中,常用场景逐步替代SparkContext

创建方法如下:

1
2
3
4
5
6
from pyspark.sql import SparkSession
spark = SparkSession \
.builder \
.appName("Python Spark SQL basic example") \
.config("spark.some.config.option", "some-value") \
.getOrCreate()

SparkSession接口代替Spark1.6 中的SQLcontext和HiveContext 来实现对数据的加载、转换、处理等工作,并且实现了SQLcontext和HiveContext的所有功能。

在新版本中并不需要之前那么繁琐的创建很多对象,只需要创建一个SparkSession对象即可。SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并支持把DataFrame转换成SQLContext自身中的表。然后使用SQL语句来操作数据,也提供了HiveQL以及其他依赖于Hive的功能支持。

第二部分 Spark SQL实践

2.1 PySpark on Jupyter

在测试研发环境我们通常使用Jupyter来研发和测试数据类Spark任务。启动命令:

1
# /root/anaconda3/bin/python /root/anaconda3/bin/jupyter-notebook --ip=0.0.0.0 --no-browser --allow-root --notebook-dir=/opt/jupyter

需要提前配置相关Python环境变量:

1
2
3
4
5
6
"SPARK_HOME": "/opt/spark-2.3.2/",
"PYSPARK_PYTHON": "/root/anaconda3/bin/python",
"PYSPARK_DRIVER_PYTHON": "ipython3",
"PYTHONPATH": "/opt/spark-2.3.2/python/:/opt/spark-2.3.2/python/lib/py4j-0.10.7-src.zip",
"PYTHONSTARTUP": "/opt/spark-2.3.2/python/pyspark/shell.py",
"PYSPARK_SUBMIT_ARGS": "--name pyspark --master local pyspark-shell"

启动Jupyter后已经预定义好了两个对象:scspark,类型分别是:SparkContextSparkSession

1
2
3
Spark context Web UI available at http://hadoop01:4040
Spark context available as 'sc' (master = local[*], app id = local-1653645196618).
Spark session available as 'spark'.

用户在Jupyter上编写代码时候,直接使用两个对象即可(注意:如果重新定义,Spark部分配置是无法覆盖的)。

2.2 Two Catalogs

Spark 2.0版本后,Spark有两个Catalogs,分别是Hivein-memory。如果需要使用Hive需要配置Spark配置(参数默认值为:in-memory):

1
spark.sql.catalogImplementation:hive

可以在Spark UI上查看任务配置,显示如下:

hive

Spark 任务可以有如下功能:

  • 读写Hive库的元数据库(读写Hive表数据);
  • 使用HiveUDF函数;
  • 使用HiveSerDe序列化;

另外也可以基于内存实现Catalogs,从而和Hive解耦。配置如下:

1
spark.sql.catalogImplementation:in-memory

memory

2.3 连接Hive

SparkSession连接Hive Metastore,需要指定配置文件hive-site.xml(文件从hive conf目录中拷贝至Spark conf目录)。

也可以通过在创建SparkSession对象是通过配置指定hive.metastore.uris,例如:

1
2
3
4
5
6
spark = SparkSession
.builder
.config("hive.metastore.uris", "thrift://172.25.21.2:9083")
.enableHiveSupport()
.appName('SparkByExamples')
.getOrCreate()

接下来我们就可以通过SparkSessionHive进行操作。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
import spark.implicits._

//展示hive中的表
spark.sql("show tables").show(10)

// 创建一个dataframe
val df1 = Seq(("one", 1), ("two", 2), ("three", 3)).toDF("word", "count")
df1.show()
df1.write.mode("overwrite").saveAsTable("t4")

//打印表结构
val df2 = spark.sql("show create table t4")
df2.foreach(println(_))
//输出条数
val df3 = spark.sql("select count(1) as total from t4")
df3.show()
spark.close()

}

2.4 PySpark Catalog API

PySpark中目前支持的访问Catalog的方法清单:

SPARK CATALOG API IN PYSPARK DESCRIPTION
currentDatabase() This API returns the current default database in this session
setCurrentDatabase() You can use this API to sets the current default database in this session.
listDatabases() Returns a list of databases available across all sessions
listTables(dbName=None) Returns a list of tables/views in the specified database. API uses current database if no database is provided.
listFunctions(dbName=None) Returns a list of functions registered in the specified database. API uses current database if no database is provided.
listColumns(tableName, dbName=None) Returns a list of columns for the given table/view in the specified database.API uses current database if no database is provided.
createTable(tableName, path=None, source=None, schema=None, **options) Creates a table based on the dataset in a data source and returns the DataFrame associated with the table.
dropGlobalTempView(viewName) Drops the global temporary view with the given view name in the catalog. If the view has been cached before, then it will also be uncached. Returns true if this view is dropped successfully, false otherwise.
dropTempView(viewName) Drops the local temporary view with the given view name in the catalog. If the view has been cached before, then it will also be uncached. Returns true if this view is dropped successfully, false otherwise.
isCached(tableName) Returns true if the table is currently cached in-memory.
recoverPartitions(tableName) Recovers all the partitions of the given table and update the catalog. Only works with a partitioned table, and not a view.
refreshByPath(path) Invalidates and refreshes all the cached data for any DataFrame that contains the given data source path.
refreshTable(tableName) Invalidates and refreshes all the cached data and metadata of the given table.
cacheTable(tableName) Caches the specified table in-memory.
isCached(tableName) Returns true if the table is currently cached in-memory.
uncacheTable(tableName) Removes the specified table from the in-memory cache.
clearCache() Removes all cached tables from the in-memory cache.

第三部分 Spark on Hive 和Hive on Spark

3.1 Hive on Spark

Hive最开始底层使用MapReduce作为计算引擎,通常称为:Hive on MapReduce。为了提升效率出现了tez内存计算引擎,即Hive on tez。后续考虑到Spark的优秀性能,Hive项目也支持Spark作为底层引擎。这就是Hive on Spark

Hive CLI中可以临时调整计算引擎:

1
2
3
4
5
6
# 配置mapreduce计算引擎
set hive.execution.engine=mr;
# 配置tez计算引擎
set hive.execution.engine=tez;
# 配置spark计算引擎
set hive.execution.engine=spark;

如果持久化配置,需要在配置文件hive-default.xml中调整计算引擎,如下:

1
2
3
4
5
6
7
8
9
10
<property>
<name>hive.execution.engine</name>
<value>mr</value>
<description>
Expects one of [mr, tez, spark].
Chooses execution engine. Options are: mr (Map reduce, default), tez, spark. While MR
remains the default engine for historical reasons, it is itself a historical engine
and is deprecated in Hive 2 line. It may be removed without further warning.
</description>
</property>

3.2 Spark on Hive

Spark本身是一个计算引擎,并不负责数据的存储,计算过程需要从外部数据源读写数据。当然Hive也是一种外部数据源,所以这种模式通常被称为:Spark on Hive

这时候用户可以通过Spark的提供的java/scala/pyhon/r等接口访问外部数据源。

3。spark + spark hive catalog。这是spark和hive结合的一种新形势,随着数据湖相关技术的进一步发展,这种模式现在在市场上受到了越来越多用户的青睐。其本质是,数据以orc/parquet/delta lake等格式存储在分布式文件系统如hdfs或对象存储系统如s3中,然后通过使用spark计算引擎提供的scala/java/python等api或spark 语法规范的sql来进行处理。由于在处理分析时针对的对象是table, 而table的底层对应的才是hdfs/s3上的文件/对象,所以我们需要维护这种table到文件/对象的映射关系,而spark自身就提供了 spark hive catalog来维护这种table到文件/对象的映射关系。注意这里的spark hive catalog,其本质是使用了hive 的 metasore 相关 api来读写表到文件/对象的映射关系(以及一起其他的元数据信息)到 metasore db如mysql, postgresql等数据库中。(由于spark编译时可以把hive metastore api等相关代码一并打包到spark的二进制安装包中,所以使用这种模式,我们并不需要额外单独安装hive)。

问题

对于shell中重新定义spark

1
2
3
scala> val spark = SparkSession.builder().enableHiveSupport().getOrCreate()
2022-05-23 08:47:52 WARN SparkSession$Builder:66 - Using an existing SparkSession; some configuration may not take effect.
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@7ecca48

spark和hive版本兼容问题:

https://blog.csdn.net/z1941563559/article/details/120764519

如果Windows启动部署Spark,需要注意Java部署的路径中不能有括号等特殊字符。

参考文献及资料

1、Pyspark接口说明,链接:https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started

0%