在生产环境中,数仓团队根据业务需求,生产一张张表(明细表 detail、维度表 dim、主题表 topic、聚合表 aggr、应用表 app),落在 Hive 管理的数据仓库中。Spark 是大规模分布式计算引擎,将数据加载到内存中进行计算。在大规模复杂的计算中,会用到 Hive SQL 或 Spark SQL,前者内存资源需求少但计算慢,后者吃内存资源但计算更快。
首先,安装配置 Java 开发环境
# 安装 Java 开发环境
brew install oracle-jdk@21
接着,配置 R 软件发现 Java 开发环境,一些依赖 Java 的 R 包需要,比如 rJava 包。
sudo R CMD javareconf
从源码安装 rJava 包,这样便可以利用系统安装的 Java 开发环境。
install.packages("rJava", type = "source")
install.packages("sparklyr", type = "source")
library(sparklyr)
##
## Attaching package: 'sparklyr'
## The following object is masked from 'package:stats':
##
## filter
# Spark 配置
config = spark_config()
config
## $spark.env.SPARK_LOCAL_IP.local
## [1] "127.0.0.1"
##
## $sparklyr.connect.csv.embedded
## [1] "^1.*"
##
## $spark.sql.legacy.utcTimestampFunc.enabled
## [1] TRUE
##
## $sparklyr.connect.cores.local
## [1] 10
##
## $spark.sql.shuffle.partitions.local
## [1] 10
选择与 Spark 配套的 Hadoop 的版本,这个安装包几百兆,可以考虑先在 Spark 官网单独下载,再解压到本地目录 ~/spark/ 下面。
mkdir -p ~/spark
tar -xzf spark-4.1.1-bin-hadoop3.tgz -C ~/spark/
# Spark 安装目录
Sys.setenv(SPARK_HOME="~/spark/spark-4.1.1-bin-hadoop3")
sc <- spark_connect(
master = "local", # spark_install() 本地安装的 Spark
method = "shell" # 对应 spark-submit
)
# 拷贝一个数据集到 Spark 环境中
iris_tbl <- copy_to(sc, iris, overwrite = TRUE)
# 查看已连接的表
dplyr::src_tbls(sc)
## [1] "iris"
鸢尾花数据集 iris 导入 Spark 后,可以用 Spark SQL 查询数据了。
library(DBI)
iris_preview <- dbGetQuery(sc, "SELECT * FROM iris LIMIT 10")
iris_preview
## Sepal_Length Sepal_Width Petal_Length Petal_Width Species
## 1 5.1 3.5 1.4 0.2 setosa
## 2 4.9 3.0 1.4 0.2 setosa
## 3 4.7 3.2 1.3 0.2 setosa
## 4 4.6 3.1 1.5 0.2 setosa
## 5 5.0 3.6 1.4 0.2 setosa
## 6 5.4 3.9 1.7 0.4 setosa
## 7 4.6 3.4 1.4 0.3 setosa
## 8 5.0 3.4 1.5 0.2 setosa
## 9 4.4 2.9 1.4 0.2 setosa
## 10 4.9 3.1 1.5 0.1 setosa
在 Spark 环境中,列名不能含有点号,因为点号在数据库中有特殊的含义。a.b a 常表示数据库,b 表示该数据库下的表。
1 分组聚合统计
按 Species 变量分组统计各类鸢尾花的数量,以及 Sepal_Length 的平均值。
dbGetQuery(sc, "SELECT count(1) as cnt, avg(Sepal_Length) as avg_Sepal_Length, Species FROM iris GROUP BY Species")
## cnt avg_Sepal_Length Species
## 1 50 6.588 virginica
## 2 50 5.936 versicolor
## 3 50 5.006 setosa
分组取 Sepal_Length最长的样本。
dbGetQuery(sc, "
SELECT max(Sepal_Length) as max_Sepal_Length, Species
FROM iris GROUP BY Species")
## max_Sepal_Length Species
## 1 7.9 virginica
## 2 7.0 versicolor
## 3 5.8 setosa
# 等价于
aggregate(iris, Sepal.Length ~ Species, max)
## Species Sepal.Length
## 1 setosa 5.8
## 2 versicolor 7.0
## 3 virginica 7.9
分组计算某列的中位数。
dbGetQuery(sc, "
SELECT median(Sepal_Length) as med_Sepal_Length, Species
FROM iris GROUP BY Species")
## med_Sepal_Length Species
## 1 6.5 virginica
## 2 5.9 versicolor
## 3 5.0 setosa
计算某列的分布。
dbGetQuery(sc, "
SELECT histogram_numeric(Sepal_Length, 5) as hn
FROM iris")
## hn
## 1 4.586, 14.000, 5.110, 40.000, 5.771, 38.000, 6.511, 47.000, 7.509, 11.000
返回值是(x,y) x 表示柱子的中心位置,y 表示柱子的高度(频数)
hist(iris$Sepal.Length, breaks = 6, plot = F)
## $breaks
## [1] 4.0 4.5 5.0 5.5 6.0 6.5 7.0 7.5 8.0
##
## $counts
## [1] 5 27 27 30 31 18 6 6
##
## $density
## [1] 0.06667 0.36000 0.36000 0.40000 0.41333 0.24000 0.08000 0.08000
##
## $mids
## [1] 4.25 4.75 5.25 5.75 6.25 6.75 7.25 7.75
##
## $xname
## [1] "iris$Sepal.Length"
##
## $equidist
## [1] TRUE
##
## attr(,"class")
## [1] "histogram"
分组计算某列的分布。
dbGetQuery(sc, "
SELECT histogram_numeric(Sepal_Length, 5) as hn, Species
FROM iris GROUP BY Species")
## hn
## 1 4.900, 1.000, 5.880, 10.000, 6.457, 23.000, 7.090, 10.000, 7.717, 6.000
## 2 5.040, 5.000, 5.588, 16.000, 5.969, 13.000, 6.380, 10.000, 6.800, 6.000
## 3 4.400, 5.000, 4.689, 9.000, 5.000, 22.000, 5.355, 11.000, 5.733, 3.000
## Species
## 1 virginica
## 2 versicolor
## 3 setosa
aggregate(iris, Sepal.Length ~ Species, FUN = function(x) hist(x, breaks = 5, plot = F)$mids )
## Species Sepal.Length
## 1 setosa 4.25, 4.75, 5.25, 5.75
## 2 versicolor 4.75, 5.25, 5.75, 6.25, 6.75
## 3 virginica 4.75, 5.25, 5.75, 6.25, 6.75, 7.25, 7.75
aggregate(iris, Sepal.Length ~ Species, FUN = function(x) hist(x, breaks = 5, plot = F)$counts )
## Species Sepal.Length
## 1 setosa 5, 23, 19, 3
## 2 versicolor 3, 8, 19, 12, 8
## 3 virginica 1, 0, 8, 19, 10, 6, 6
分箱的算法不同,结果也不同。
2 窗口函数
窗口函数是为了应对复杂查询而设计的,下面以 RANK() 和 row_number() 为例。
目标:每类鸢尾花中 Sepal.Length (萼片长度)最长的 5 个样本。
- 按 Sepal.Length (萼片长度)对样本排序。
dbGetQuery(sc, "
SELECT *, RANK() OVER (PARTITION BY Species ORDER BY Sepal_Length DESC) AS rank
FROM iris
LIMIT 5
")
## Sepal_Length Sepal_Width Petal_Length Petal_Width Species rank
## 1 5.8 4.0 1.2 0.2 setosa 1
## 2 5.7 4.4 1.5 0.4 setosa 2
## 3 5.7 3.8 1.7 0.3 setosa 2
## 4 5.5 4.2 1.4 0.2 setosa 4
## 5 5.5 3.5 1.3 0.2 setosa 4
dbGetQuery(sc, "
SELECT *, row_number() OVER (PARTITION BY Species ORDER BY Sepal_Length DESC) AS rn
FROM iris
LIMIT 5
")
## Sepal_Length Sepal_Width Petal_Length Petal_Width Species rn
## 1 5.8 4.0 1.2 0.2 setosa 1
## 2 5.7 4.4 1.5 0.4 setosa 2
## 3 5.7 3.8 1.7 0.3 setosa 3
## 4 5.5 4.2 1.4 0.2 setosa 4
## 5 5.5 3.5 1.3 0.2 setosa 5
RANK() 和 row_number() 都是窗口函数,前者对某列排序,值相同的给相同的序号,后者排完序后,序号是唯一的。都是在原数据上新增一列记录排序结果。
- 筛选排序结果。
dbGetQuery(sc, "
SELECT * FROM
(
SELECT *, RANK() OVER (PARTITION BY Species ORDER BY Sepal_Length DESC) AS rank
FROM iris
) a
WHERE a.rank < 6
")
## Sepal_Length Sepal_Width Petal_Length Petal_Width Species rank
## 1 5.8 4.0 1.2 0.2 setosa 1
## 2 5.7 4.4 1.5 0.4 setosa 2
## 3 5.7 3.8 1.7 0.3 setosa 2
## 4 5.5 4.2 1.4 0.2 setosa 4
## 5 5.5 3.5 1.3 0.2 setosa 4
## 6 7.0 3.2 4.7 1.4 versicolor 1
## 7 6.9 3.1 4.9 1.5 versicolor 2
## 8 6.8 2.8 4.8 1.4 versicolor 3
## 9 6.7 3.1 4.4 1.4 versicolor 4
## 10 6.7 3.0 5.0 1.7 versicolor 4
## 11 6.7 3.1 4.7 1.5 versicolor 4
## 12 7.9 3.8 6.4 2.0 virginica 1
## 13 7.7 3.8 6.7 2.2 virginica 2
## 14 7.7 2.6 6.9 2.3 virginica 2
## 15 7.7 2.8 6.7 2.0 virginica 2
## 16 7.7 3.0 6.1 2.3 virginica 2
dbGetQuery(sc, "
SELECT * FROM
(
SELECT *, row_number() OVER (PARTITION BY Species ORDER BY Sepal_Length DESC) AS rn
FROM iris
) a
WHERE a.rn < 6
")
## Sepal_Length Sepal_Width Petal_Length Petal_Width Species rn
## 1 5.8 4.0 1.2 0.2 setosa 1
## 2 5.7 4.4 1.5 0.4 setosa 2
## 3 5.7 3.8 1.7 0.3 setosa 3
## 4 5.5 4.2 1.4 0.2 setosa 4
## 5 5.5 3.5 1.3 0.2 setosa 5
## 6 7.0 3.2 4.7 1.4 versicolor 1
## 7 6.9 3.1 4.9 1.5 versicolor 2
## 8 6.8 2.8 4.8 1.4 versicolor 3
## 9 6.7 3.1 4.4 1.4 versicolor 4
## 10 6.7 3.0 5.0 1.7 versicolor 5
## 11 7.9 3.8 6.4 2.0 virginica 1
## 12 7.7 3.8 6.7 2.2 virginica 2
## 13 7.7 2.6 6.9 2.3 virginica 3
## 14 7.7 2.8 6.7 2.0 virginica 4
## 15 7.7 3.0 6.1 2.3 virginica 5
最后,当不需要使用 Spark 的时候,要关闭清理连接。
spark_disconnect(sc)