预计阅读

R 语言中使用 Spark





在生产环境中,数仓团队根据业务需求,生产一张张表(明细表 detail、维度表 dim、主题表 topic、聚合表 aggr、应用表 app),落在 Hive 管理的数据仓库中。Spark 是大规模分布式计算引擎,将数据加载到内存中进行计算。在大规模复杂的计算中,会用到 Hive SQL 或 Spark SQL,前者内存资源需求少但计算慢,后者吃内存资源但计算更快。

首先,安装配置 Java 开发环境

# 安装 Java 开发环境
brew install oracle-jdk@21

接着,配置 R 软件发现 Java 开发环境,一些依赖 Java 的 R 包需要,比如 rJava 包。

sudo R CMD javareconf

从源码安装 rJava 包,这样便可以利用系统安装的 Java 开发环境。

install.packages("rJava", type = "source")
install.packages("sparklyr", type = "source")
library(sparklyr)
## 
## Attaching package: 'sparklyr'
## The following object is masked from 'package:stats':
## 
##     filter
# Spark 配置
config = spark_config()
config
## $spark.env.SPARK_LOCAL_IP.local
## [1] "127.0.0.1"
## 
## $sparklyr.connect.csv.embedded
## [1] "^1.*"
## 
## $spark.sql.legacy.utcTimestampFunc.enabled
## [1] TRUE
## 
## $sparklyr.connect.cores.local
## [1] 10
## 
## $spark.sql.shuffle.partitions.local
## [1] 10

选择与 Spark 配套的 Hadoop 的版本,这个安装包几百兆,可以考虑先在 Spark 官网单独下载,再解压到本地目录 ~/spark/ 下面。

mkdir -p ~/spark
tar -xzf spark-4.1.1-bin-hadoop3.tgz -C ~/spark/
# Spark 安装目录
Sys.setenv(SPARK_HOME="~/spark/spark-4.1.1-bin-hadoop3")
sc <- spark_connect(
  master = "local", # spark_install() 本地安装的 Spark
  method = "shell"  # 对应 spark-submit
)
# 拷贝一个数据集到 Spark 环境中
iris_tbl <- copy_to(sc, iris, overwrite = TRUE)
# 查看已连接的表
dplyr::src_tbls(sc)
## [1] "iris"

鸢尾花数据集 iris 导入 Spark 后,可以用 Spark SQL 查询数据了。

library(DBI)
iris_preview <- dbGetQuery(sc, "SELECT * FROM iris LIMIT 10")
iris_preview
##    Sepal_Length Sepal_Width Petal_Length Petal_Width Species
## 1           5.1         3.5          1.4         0.2  setosa
## 2           4.9         3.0          1.4         0.2  setosa
## 3           4.7         3.2          1.3         0.2  setosa
## 4           4.6         3.1          1.5         0.2  setosa
## 5           5.0         3.6          1.4         0.2  setosa
## 6           5.4         3.9          1.7         0.4  setosa
## 7           4.6         3.4          1.4         0.3  setosa
## 8           5.0         3.4          1.5         0.2  setosa
## 9           4.4         2.9          1.4         0.2  setosa
## 10          4.9         3.1          1.5         0.1  setosa

在 Spark 环境中,列名不能含有点号,因为点号在数据库中有特殊的含义。a.b a 常表示数据库,b 表示该数据库下的表。

1 分组聚合统计

按 Species 变量分组统计各类鸢尾花的数量,以及 Sepal_Length 的平均值。

dbGetQuery(sc, "SELECT count(1) as cnt, avg(Sepal_Length) as avg_Sepal_Length, Species FROM iris GROUP BY Species")
##   cnt avg_Sepal_Length    Species
## 1  50            6.588  virginica
## 2  50            5.936 versicolor
## 3  50            5.006     setosa

分组取 Sepal_Length最长的样本。

dbGetQuery(sc, "
           SELECT max(Sepal_Length) as max_Sepal_Length, Species 
           FROM iris GROUP BY Species")
##   max_Sepal_Length    Species
## 1              7.9  virginica
## 2              7.0 versicolor
## 3              5.8     setosa
# 等价于
aggregate(iris, Sepal.Length ~ Species, max)
##      Species Sepal.Length
## 1     setosa          5.8
## 2 versicolor          7.0
## 3  virginica          7.9

分组计算某列的中位数。

dbGetQuery(sc, "
           SELECT median(Sepal_Length) as med_Sepal_Length, Species 
           FROM iris GROUP BY Species")
##   med_Sepal_Length    Species
## 1              6.5  virginica
## 2              5.9 versicolor
## 3              5.0     setosa

计算某列的分布。

dbGetQuery(sc, "
           SELECT histogram_numeric(Sepal_Length, 5) as hn
           FROM iris")
##                                                                          hn
## 1 4.586, 14.000, 5.110, 40.000, 5.771, 38.000, 6.511, 47.000, 7.509, 11.000

返回值是(x,y) x 表示柱子的中心位置,y 表示柱子的高度(频数)

hist(iris$Sepal.Length, breaks = 6, plot = F)
## $breaks
## [1] 4.0 4.5 5.0 5.5 6.0 6.5 7.0 7.5 8.0
## 
## $counts
## [1]  5 27 27 30 31 18  6  6
## 
## $density
## [1] 0.06667 0.36000 0.36000 0.40000 0.41333 0.24000 0.08000 0.08000
## 
## $mids
## [1] 4.25 4.75 5.25 5.75 6.25 6.75 7.25 7.75
## 
## $xname
## [1] "iris$Sepal.Length"
## 
## $equidist
## [1] TRUE
## 
## attr(,"class")
## [1] "histogram"

分组计算某列的分布。

dbGetQuery(sc, "
           SELECT histogram_numeric(Sepal_Length, 5) as hn, Species 
           FROM iris GROUP BY Species")
##                                                                        hn
## 1 4.900, 1.000, 5.880, 10.000, 6.457, 23.000, 7.090, 10.000, 7.717, 6.000
## 2 5.040, 5.000, 5.588, 16.000, 5.969, 13.000, 6.380, 10.000, 6.800, 6.000
## 3  4.400, 5.000, 4.689, 9.000, 5.000, 22.000, 5.355, 11.000, 5.733, 3.000
##      Species
## 1  virginica
## 2 versicolor
## 3     setosa
aggregate(iris, Sepal.Length ~ Species, FUN = function(x) hist(x, breaks = 5, plot = F)$mids )
##      Species                             Sepal.Length
## 1     setosa                   4.25, 4.75, 5.25, 5.75
## 2 versicolor             4.75, 5.25, 5.75, 6.25, 6.75
## 3  virginica 4.75, 5.25, 5.75, 6.25, 6.75, 7.25, 7.75
aggregate(iris, Sepal.Length ~ Species, FUN = function(x) hist(x, breaks = 5, plot = F)$counts )
##      Species          Sepal.Length
## 1     setosa          5, 23, 19, 3
## 2 versicolor       3, 8, 19, 12, 8
## 3  virginica 1, 0, 8, 19, 10, 6, 6

分箱的算法不同,结果也不同。

2 窗口函数

窗口函数是为了应对复杂查询而设计的,下面以 RANK()row_number() 为例。

目标:每类鸢尾花中 Sepal.Length (萼片长度)最长的 5 个样本。

  1. 按 Sepal.Length (萼片长度)对样本排序。
dbGetQuery(sc, "
SELECT *, RANK() OVER (PARTITION BY Species ORDER BY Sepal_Length DESC) AS rank
FROM iris
LIMIT 5
")
##   Sepal_Length Sepal_Width Petal_Length Petal_Width Species rank
## 1          5.8         4.0          1.2         0.2  setosa    1
## 2          5.7         4.4          1.5         0.4  setosa    2
## 3          5.7         3.8          1.7         0.3  setosa    2
## 4          5.5         4.2          1.4         0.2  setosa    4
## 5          5.5         3.5          1.3         0.2  setosa    4
dbGetQuery(sc, "
SELECT *, row_number() OVER (PARTITION BY Species ORDER BY Sepal_Length DESC) AS rn
FROM iris
LIMIT 5
")
##   Sepal_Length Sepal_Width Petal_Length Petal_Width Species rn
## 1          5.8         4.0          1.2         0.2  setosa  1
## 2          5.7         4.4          1.5         0.4  setosa  2
## 3          5.7         3.8          1.7         0.3  setosa  3
## 4          5.5         4.2          1.4         0.2  setosa  4
## 5          5.5         3.5          1.3         0.2  setosa  5

RANK()row_number() 都是窗口函数,前者对某列排序,值相同的给相同的序号,后者排完序后,序号是唯一的。都是在原数据上新增一列记录排序结果。

  1. 筛选排序结果。
dbGetQuery(sc, "
SELECT * FROM
(
SELECT *, RANK() OVER (PARTITION BY Species ORDER BY Sepal_Length DESC) AS rank
FROM iris
) a
WHERE a.rank < 6
")
##    Sepal_Length Sepal_Width Petal_Length Petal_Width    Species rank
## 1           5.8         4.0          1.2         0.2     setosa    1
## 2           5.7         4.4          1.5         0.4     setosa    2
## 3           5.7         3.8          1.7         0.3     setosa    2
## 4           5.5         4.2          1.4         0.2     setosa    4
## 5           5.5         3.5          1.3         0.2     setosa    4
## 6           7.0         3.2          4.7         1.4 versicolor    1
## 7           6.9         3.1          4.9         1.5 versicolor    2
## 8           6.8         2.8          4.8         1.4 versicolor    3
## 9           6.7         3.1          4.4         1.4 versicolor    4
## 10          6.7         3.0          5.0         1.7 versicolor    4
## 11          6.7         3.1          4.7         1.5 versicolor    4
## 12          7.9         3.8          6.4         2.0  virginica    1
## 13          7.7         3.8          6.7         2.2  virginica    2
## 14          7.7         2.6          6.9         2.3  virginica    2
## 15          7.7         2.8          6.7         2.0  virginica    2
## 16          7.7         3.0          6.1         2.3  virginica    2
dbGetQuery(sc, "
SELECT * FROM
(
SELECT *, row_number() OVER (PARTITION BY Species ORDER BY Sepal_Length DESC) AS rn
FROM iris
) a
WHERE a.rn < 6
")
##    Sepal_Length Sepal_Width Petal_Length Petal_Width    Species rn
## 1           5.8         4.0          1.2         0.2     setosa  1
## 2           5.7         4.4          1.5         0.4     setosa  2
## 3           5.7         3.8          1.7         0.3     setosa  3
## 4           5.5         4.2          1.4         0.2     setosa  4
## 5           5.5         3.5          1.3         0.2     setosa  5
## 6           7.0         3.2          4.7         1.4 versicolor  1
## 7           6.9         3.1          4.9         1.5 versicolor  2
## 8           6.8         2.8          4.8         1.4 versicolor  3
## 9           6.7         3.1          4.4         1.4 versicolor  4
## 10          6.7         3.0          5.0         1.7 versicolor  5
## 11          7.9         3.8          6.4         2.0  virginica  1
## 12          7.7         3.8          6.7         2.2  virginica  2
## 13          7.7         2.6          6.9         2.3  virginica  3
## 14          7.7         2.8          6.7         2.0  virginica  4
## 15          7.7         3.0          6.1         2.3  virginica  5

最后,当不需要使用 Spark 的时候,要关闭清理连接。

spark_disconnect(sc)