目录
背景
第一部分 Spark SQL
第二部分 Spark SQL实践
第三部分
参考文献及资料
背景
第一部分 Spark SQL
1.1 Spark 2.0之前
在Spark 1.6
(Spark 2.0
之前)中,我们通过Spark SQL
用于处理结构化数据。Spark SQL
的入口点是SQLContext
类或者子类HiveContext
。
例如下面的例子(Pyspark
),可以访问Hive
仓库中表(default
库中的users
表):
注:需要在
Spark conf
目录(classpath
)下配置hive
相关配置(hive-site.xml
),才能访问Hive MetaStore
,实现读写Hive
表。
1 | from pyspark.sql import HiveContext |
另外也支持将DataFrame
注册成Table
,通过SQL
语言处理DataFrame
数据。例如下面的例子:
1 | from pyspark.sql import SQLContext |
注:
SQLContext
现在只支持sql
语法解析器(SQL-92
语法),而HiveContext
现在既支持sql语法解析器又支持hivesql
语法解析器,默认为hivesql
语法解析器,用户可以通过配置参数(spark.sql.dialect
)切换成sql
语法解析器,来运行hiveql
不支持的语法。
通常我们直接使用HiveContext
(HiveContext is a super set of the SQLContext. Hortonworks and the Spark community suggest using the HiveContext.
)。但是需要Hive
启动MetaStore
的Thrift
接口服务。
1.2 Spark 2.0 之后
Spark 2.0
之前,SparkContext
是Spark的唯一切入点(Entry Point
),其他Context
都是基于其实现。例如下面例子:
1 | from pyspark import SparkContext |
Spark2.0
开始只需要创建一个SparkSession
就够了,SparkConf
、SparkContext
和SQLContext
都已经被封装在SparkSession
当中,常用场景逐步替代SparkContext
。
创建方法如下:
1 | from pyspark.sql import SparkSession |
SparkSession接口代替Spark1.6 中的SQLcontext和HiveContext 来实现对数据的加载、转换、处理等工作,并且实现了SQLcontext和HiveContext的所有功能。
在新版本中并不需要之前那么繁琐的创建很多对象,只需要创建一个SparkSession对象即可。SparkSession支持从不同的数据源加载数据,并把数据转换成DataFrame,并支持把DataFrame转换成SQLContext自身中的表。然后使用SQL语句来操作数据,也提供了HiveQL以及其他依赖于Hive的功能支持。
第二部分 Spark SQL实践
2.1 PySpark on Jupyter
在测试研发环境我们通常使用Jupyter
来研发和测试数据类Spark
任务。启动命令:
1 | /root/anaconda3/bin/python /root/anaconda3/bin/jupyter-notebook --ip=0.0.0.0 --no-browser --allow-root --notebook-dir=/opt/jupyter |
需要提前配置相关Python
环境变量:
1 | "SPARK_HOME": "/opt/spark-2.3.2/", |
启动Jupyter
后已经预定义好了两个对象:sc
和spark
,类型分别是:SparkContext
和SparkSession
。
1 | Spark context Web UI available at http://hadoop01:4040 |
用户在Jupyter
上编写代码时候,直接使用两个对象即可(注意:如果重新定义,Spark部分配置是无法覆盖的)。
2.2 Two Catalogs
Spark 2.0
版本后,Spark
有两个Catalogs
,分别是Hive
和in-memory
。如果需要使用Hive
需要配置Spark
配置(参数默认值为:in-memory
):
1 | spark.sql.catalogImplementation:hive |
可以在Spark UI
上查看任务配置,显示如下:
Spark
任务可以有如下功能:
- 读写
Hive
库的元数据库(读写Hive
表数据); - 使用
Hive
的UDF
函数; - 使用
Hive
的SerDe
序列化;
另外也可以基于内存实现Catalogs
,从而和Hive
解耦。配置如下:
1 | spark.sql.catalogImplementation:in-memory |
2.3 连接Hive
SparkSession
连接Hive Metastore
,需要指定配置文件hive-site.xml
(文件从hive
conf
目录中拷贝至Spark
conf
目录)。
也可以通过在创建SparkSession
对象是通过配置指定hive.metastore.uris
,例如:
1 | spark = SparkSession |
接下来我们就可以通过SparkSession
对Hive
进行操作。
1 | import spark.implicits._ |
}
2.4 PySpark
Catalog
API
PySpark
中目前支持的访问Catalog
的方法清单:
SPARK CATALOG API IN PYSPARK | DESCRIPTION |
---|---|
currentDatabase() | This API returns the current default database in this session |
setCurrentDatabase() | You can use this API to sets the current default database in this session. |
listDatabases() | Returns a list of databases available across all sessions |
listTables(dbName=None) | Returns a list of tables/views in the specified database. API uses current database if no database is provided. |
listFunctions(dbName=None) | Returns a list of functions registered in the specified database. API uses current database if no database is provided. |
listColumns(tableName, dbName=None) | Returns a list of columns for the given table/view in the specified database.API uses current database if no database is provided. |
createTable(tableName, path=None, source=None, schema=None, **options) | Creates a table based on the dataset in a data source and returns the DataFrame associated with the table. |
dropGlobalTempView(viewName) | Drops the global temporary view with the given view name in the catalog. If the view has been cached before, then it will also be uncached. Returns true if this view is dropped successfully, false otherwise. |
dropTempView(viewName) | Drops the local temporary view with the given view name in the catalog. If the view has been cached before, then it will also be uncached. Returns true if this view is dropped successfully, false otherwise. |
isCached (tableName) |
Returns true if the table is currently cached in-memory. |
recoverPartitions(tableName) | Recovers all the partitions of the given table and update the catalog. Only works with a partitioned table, and not a view. |
refreshByPath(path) | Invalidates and refreshes all the cached data for any DataFrame that contains the given data source path. |
refreshTable(tableName) | Invalidates and refreshes all the cached data and metadata of the given table. |
cacheTable(tableName) | Caches the specified table in-memory. |
isCached(tableName) | Returns true if the table is currently cached in-memory. |
uncacheTable(tableName) | Removes the specified table from the in-memory cache. |
clearCache() | Removes all cached tables from the in-memory cache. |
第三部分 Spark on Hive 和Hive on Spark
3.1 Hive on Spark
Hive
最开始底层使用MapReduce
作为计算引擎,通常称为:Hive on MapReduce
。为了提升效率出现了tez
内存计算引擎,即Hive on tez
。后续考虑到Spark
的优秀性能,Hive
项目也支持Spark
作为底层引擎。这就是Hive on Spark
。
在Hive CLI
中可以临时调整计算引擎:
1 | 配置mapreduce计算引擎 |
如果持久化配置,需要在配置文件hive-default.xml
中调整计算引擎,如下:
1 | <property> |
3.2 Spark on Hive
Spark
本身是一个计算引擎,并不负责数据的存储,计算过程需要从外部数据源读写数据。当然Hive
也是一种外部数据源,所以这种模式通常被称为:Spark on Hive
。
这时候用户可以通过Spark
的提供的java/scala/pyhon/r
等接口访问外部数据源。
3。spark + spark hive catalog。这是spark和hive结合的一种新形势,随着数据湖相关技术的进一步发展,这种模式现在在市场上受到了越来越多用户的青睐。其本质是,数据以orc/parquet/delta lake等格式存储在分布式文件系统如hdfs或对象存储系统如s3中,然后通过使用spark计算引擎提供的scala/java/python等api或spark 语法规范的sql来进行处理。由于在处理分析时针对的对象是table, 而table的底层对应的才是hdfs/s3上的文件/对象,所以我们需要维护这种table到文件/对象的映射关系,而spark自身就提供了 spark hive catalog来维护这种table到文件/对象的映射关系。注意这里的spark hive catalog,其本质是使用了hive 的 metasore 相关 api来读写表到文件/对象的映射关系(以及一起其他的元数据信息)到 metasore db如mysql, postgresql等数据库中。(由于spark编译时可以把hive metastore api等相关代码一并打包到spark的二进制安装包中,所以使用这种模式,我们并不需要额外单独安装hive)。
问题
对于shell中重新定义spark
1 | scala> val spark = SparkSession.builder().enableHiveSupport().getOrCreate() |
spark和hive版本兼容问题:
https://blog.csdn.net/z1941563559/article/details/120764519
如果Windows启动部署Spark,需要注意Java部署的路径中不能有括号等特殊字符。
参考文献及资料
1、Pyspark接口说明,链接:https://cwiki.apache.org/confluence/display/Hive/Hive+on+Spark%3A+Getting+Started