在生产环境中,数仓团队根据业务需求,生产一张张表(明细表 detail、维度表 dim、主题表 topic、聚合表 aggr、应用表 app),落在 Hive 管理的数据仓库中。Spark 是大规模分布式计算引擎,将数据加载到内存中进行计算。在大规模复杂的计算中,会用到 Hive SQL 或 Spark SQL,前者内存资源需求少但计算慢,后者吃内存资源但计算更快。
1 配置 Spark
首先,安装配置 Java 开发环境。
# 安装 Java 开发环境
brew install microsoft-openjdk@17
接着,配置 R 软件发现 Java 开发环境,一些依赖 Java 的 R 包需要,比如 rJava 包。
sudo R CMD javareconf
从源码安装 rJava 包,这样便可以利用系统安装的 Java 开发环境。
install.packages("rJava", type = "source")
install.packages(c("sparklyr", "apache.sedona"), type = "source")
library(sparklyr)
##
## Attaching package: 'sparklyr'
## The following object is masked from 'package:stats':
##
## filter
# 后续还要介绍 Sedona
library(apache.sedona)
# Spark 配置
config <- spark_config()
config
## $spark.kryo.registrator
## [1] "org.apache.sedona.viz.core.Serde.SedonaVizKryoRegistrator"
##
## $spark.serializer
## [1] "org.apache.spark.serializer.KryoSerializer"
##
## $spark.env.SPARK_LOCAL_IP.local
## [1] "127.0.0.1"
##
## $sparklyr.connect.csv.embedded
## [1] "^1.*"
##
## $spark.sql.legacy.utcTimestampFunc.enabled
## [1] TRUE
##
## $sparklyr.connect.cores.local
## [1] 10
##
## $spark.sql.shuffle.partitions.local
## [1] 10
选择与 Spark 配套的 Hadoop 的版本,这个安装包几百兆,可以考虑先在 Spark 官网单独下载,再解压到本地目录 ~/spark/ 下面。
mkdir -p ~/spark
tar -xzf spark-3.5.8-bin-hadoop3.tgz -C ~/spark/
2 Apache Spark
# Spark 安装目录
Sys.setenv(SPARK_HOME = "~/spark/spark-3.5.8-bin-hadoop3")
# 初次连接 Spark 时会下载一些与 Apache Sedona 相关的 Jar 包
sc <- spark_connect(
master = "local", # spark_install() 本地安装的 Spark
method = "shell" # 对应 spark-submit
)
## ℹ Using Sedona jar version: org.apache.sedona:sedona-spark-shaded-3.5_2.12:1.9.0
# 拷贝一个数据集到 Spark 环境中
iris_tbl <- copy_to(sc, iris, overwrite = TRUE)
# 查看已连接的表
dplyr::src_tbls(sc)
## [1] "iris"
鸢尾花数据集 iris 导入 Spark 后,可以用 Spark SQL 查询数据了。
library(DBI)
iris_preview <- dbGetQuery(sc, "SELECT * FROM iris LIMIT 10")
iris_preview
## Sepal_Length Sepal_Width Petal_Length Petal_Width Species
## 1 5.1 3.5 1.4 0.2 setosa
## 2 4.9 3.0 1.4 0.2 setosa
## 3 4.7 3.2 1.3 0.2 setosa
## 4 4.6 3.1 1.5 0.2 setosa
## 5 5.0 3.6 1.4 0.2 setosa
## 6 5.4 3.9 1.7 0.4 setosa
## 7 4.6 3.4 1.4 0.3 setosa
## 8 5.0 3.4 1.5 0.2 setosa
## 9 4.4 2.9 1.4 0.2 setosa
## 10 4.9 3.1 1.5 0.1 setosa
在 Spark 环境中,列名不能含有点号,因为点号在数据库中有特殊的含义。表名 a.b 中 a 常表示数据库,b 表示该数据库下的表。
2.1 分组聚合统计
按 Species 变量分组统计各类鸢尾花的数量,以及 Sepal_Length 的平均值。
dbGetQuery(sc, "
SELECT count(1) as cnt, avg(Sepal_Length) as avg_Sepal_Length, Species
FROM iris GROUP BY Species")
## cnt avg_Sepal_Length Species
## 1 50 6.588 virginica
## 2 50 5.936 versicolor
## 3 50 5.006 setosa
分组取 Sepal_Length最长的样本。
dbGetQuery(sc, "
SELECT max(Sepal_Length) as max_Sepal_Length, Species
FROM iris GROUP BY Species")
## max_Sepal_Length Species
## 1 7.9 virginica
## 2 7.0 versicolor
## 3 5.8 setosa
# 等价于
aggregate(iris, Sepal.Length ~ Species, max)
## Species Sepal.Length
## 1 setosa 5.8
## 2 versicolor 7.0
## 3 virginica 7.9
分组计算某列的中位数。
dbGetQuery(sc, "
SELECT median(Sepal_Length) as med_Sepal_Length, Species
FROM iris GROUP BY Species")
## med_Sepal_Length Species
## 1 6.5 virginica
## 2 5.9 versicolor
## 3 5.0 setosa
计算某列的分布。
dbGetQuery(sc, "
SELECT histogram_numeric(Sepal_Length, 5) as hn
FROM iris")
## hn
## 1 4.586, 14.000, 5.110, 40.000, 5.771, 38.000, 6.511, 47.000, 7.509, 11.000
返回值是(x,y) x 表示柱子的中心位置,y 表示柱子的高度(频数)
hist(iris$Sepal.Length, breaks = 6, plot = F)
## $breaks
## [1] 4.0 4.5 5.0 5.5 6.0 6.5 7.0 7.5 8.0
##
## $counts
## [1] 5 27 27 30 31 18 6 6
##
## $density
## [1] 0.06667 0.36000 0.36000 0.40000 0.41333 0.24000 0.08000 0.08000
##
## $mids
## [1] 4.25 4.75 5.25 5.75 6.25 6.75 7.25 7.75
##
## $xname
## [1] "iris$Sepal.Length"
##
## $equidist
## [1] TRUE
##
## attr(,"class")
## [1] "histogram"
分组计算某列的分布。
dbGetQuery(sc, "
SELECT histogram_numeric(Sepal_Length, 5) as hn, Species
FROM iris GROUP BY Species")
## hn
## 1 4.900, 1.000, 5.880, 10.000, 6.457, 23.000, 7.090, 10.000, 7.717, 6.000
## 2 5.040, 5.000, 5.588, 16.000, 5.969, 13.000, 6.380, 10.000, 6.800, 6.000
## 3 4.400, 5.000, 4.689, 9.000, 5.000, 22.000, 5.355, 11.000, 5.733, 3.000
## Species
## 1 virginica
## 2 versicolor
## 3 setosa
aggregate(iris, Sepal.Length ~ Species, FUN = function(x) hist(x, breaks = 5, plot = F)$mids)
## Species Sepal.Length
## 1 setosa 4.25, 4.75, 5.25, 5.75
## 2 versicolor 4.75, 5.25, 5.75, 6.25, 6.75
## 3 virginica 4.75, 5.25, 5.75, 6.25, 6.75, 7.25, 7.75
aggregate(iris, Sepal.Length ~ Species, FUN = function(x) hist(x, breaks = 5, plot = F)$counts)
## Species Sepal.Length
## 1 setosa 5, 23, 19, 3
## 2 versicolor 3, 8, 19, 12, 8
## 3 virginica 1, 0, 8, 19, 10, 6, 6
分箱的算法不同,结果也不同。
2.2 窗口函数
窗口函数是为了应对复杂查询而设计的,下面以 RANK() 和 row_number() 为例。
目标:每类鸢尾花中 Sepal.Length (萼片长度)最长的 5 个样本。
- 按 Sepal.Length (萼片长度)对样本排序。
dbGetQuery(sc, "
SELECT *, RANK() OVER (PARTITION BY Species ORDER BY Sepal_Length DESC) AS rank
FROM iris
LIMIT 5
")
## Sepal_Length Sepal_Width Petal_Length Petal_Width Species rank
## 1 5.8 4.0 1.2 0.2 setosa 1
## 2 5.7 4.4 1.5 0.4 setosa 2
## 3 5.7 3.8 1.7 0.3 setosa 2
## 4 5.5 4.2 1.4 0.2 setosa 4
## 5 5.5 3.5 1.3 0.2 setosa 4
dbGetQuery(sc, "
SELECT *, row_number() OVER (PARTITION BY Species ORDER BY Sepal_Length DESC) AS rn
FROM iris
LIMIT 5
")
## Sepal_Length Sepal_Width Petal_Length Petal_Width Species rn
## 1 5.8 4.0 1.2 0.2 setosa 1
## 2 5.7 4.4 1.5 0.4 setosa 2
## 3 5.7 3.8 1.7 0.3 setosa 3
## 4 5.5 4.2 1.4 0.2 setosa 4
## 5 5.5 3.5 1.3 0.2 setosa 5
RANK() 和 row_number() 都是窗口函数,前者对某列排序,值相同的给相同的序号,后者排完序后,序号是唯一的。都是在原数据上新增一列记录排序结果。
- 筛选排序结果。
# RANK()
dbGetQuery(sc, "
SELECT * FROM
(
SELECT *, RANK() OVER (PARTITION BY Species ORDER BY Sepal_Length DESC) AS rank
FROM iris
) a
WHERE a.rank < 6
")
## Sepal_Length Sepal_Width Petal_Length Petal_Width Species rank
## 1 5.8 4.0 1.2 0.2 setosa 1
## 2 5.7 4.4 1.5 0.4 setosa 2
## 3 5.7 3.8 1.7 0.3 setosa 2
## 4 5.5 4.2 1.4 0.2 setosa 4
## 5 5.5 3.5 1.3 0.2 setosa 4
## 6 7.0 3.2 4.7 1.4 versicolor 1
## 7 6.9 3.1 4.9 1.5 versicolor 2
## 8 6.8 2.8 4.8 1.4 versicolor 3
## 9 6.7 3.1 4.4 1.4 versicolor 4
## 10 6.7 3.0 5.0 1.7 versicolor 4
## 11 6.7 3.1 4.7 1.5 versicolor 4
## 12 7.9 3.8 6.4 2.0 virginica 1
## 13 7.7 3.8 6.7 2.2 virginica 2
## 14 7.7 2.6 6.9 2.3 virginica 2
## 15 7.7 2.8 6.7 2.0 virginica 2
## 16 7.7 3.0 6.1 2.3 virginica 2
# row_number()
dbGetQuery(sc, "
SELECT * FROM
(
SELECT *, row_number() OVER (PARTITION BY Species ORDER BY Sepal_Length DESC) AS rn
FROM iris
) a
WHERE a.rn < 6
")
## Sepal_Length Sepal_Width Petal_Length Petal_Width Species rn
## 1 5.8 4.0 1.2 0.2 setosa 1
## 2 5.7 4.4 1.5 0.4 setosa 2
## 3 5.7 3.8 1.7 0.3 setosa 3
## 4 5.5 4.2 1.4 0.2 setosa 4
## 5 5.5 3.5 1.3 0.2 setosa 5
## 6 7.0 3.2 4.7 1.4 versicolor 1
## 7 6.9 3.1 4.9 1.5 versicolor 2
## 8 6.8 2.8 4.8 1.4 versicolor 3
## 9 6.7 3.1 4.4 1.4 versicolor 4
## 10 6.7 3.0 5.0 1.7 versicolor 5
## 11 7.9 3.8 6.4 2.0 virginica 1
## 12 7.7 3.8 6.7 2.2 virginica 2
## 13 7.7 2.6 6.9 2.3 virginica 3
## 14 7.7 2.8 6.7 2.0 virginica 4
## 15 7.7 3.0 6.1 2.3 virginica 5
3 Apache Sedona
Apache Sedona 是一个处理大规模空间数据的分布式集群计算系统,站在 Apache Spark 和 Apache Flink 的肩膀上,提供一套开箱即用的分布式空间数据操作和查询工具,可以高效加载、处理和分析大规模空间数据。
# Sedona 1.8.1 支持 Spark 3.x 和 JDK 17
library(apache.sedona)
下面的示例和数据来自 geospark 包,这里仅使用 geospark 的数据集,geospark 包已经被 apache.sedona 取代了。
remotes::install_github("harryprince/geospark")
# 空间多边形数据
polygons <- read.table(
system.file(package = "geospark", "examples/polygons.txt"),
sep = "|",
col.names = c("area", "geom")
)
# 空间点数据
points <- read.table(
system.file(package = "geospark", "examples/points.txt"),
sep = "|",
col.names = c("city", "state", "geom")
)
# 将数据集导入 Spark 环境
polygons_wkt <- copy_to(sc, polygons)
points_wkt <- copy_to(sc, points)
查看数据集 polygons_wkt 和 points_wkt 的类型。
class(polygons_wkt)
## [1] "tbl_spark" "tbl_sql" "tbl_lazy" "tbl"
class(points_wkt)
## [1] "tbl_spark" "tbl_sql" "tbl_lazy" "tbl"
查看数据集。
print(polygons_wkt)
## # Source: table<`polygons`> [?? x 2]
## # Database: spark_connection
## area geom
## <chr> <chr>
## 1 california area POLYGON ((-126.4746 32.99024, -126.4746 42.55308, -115.4004 4…
## 2 new york area POLYGON ((-80.50781 36.24427, -80.50781 41.96766, -70.75195 4…
## 3 texas area POLYGON ((-106.5234 25.40358, -106.5234 36.66842, -91.14258 3…
## 4 dakota area POLYGON ((-106.084 44.21371, -106.084 49.66763, -95.71289 49.…
print(points_wkt)
## # Source: table<`points`> [?? x 3]
## # Database: spark_connection
## city state geom
## <chr> <chr> <chr>
## 1 New York NY POINT (-73.97759 40.74618)
## 2 New York NY POINT (-73.97231 40.75216)
## 3 New York NY POINT (-73.99337 40.7551)
## 4 West Nyack NY POINT (-74.06083 41.16094)
## 5 West Point NY POINT (-73.9788 41.37611)
## 6 West Point NY POINT (-74.3547 41.38782)
## 7 Westtown NY POINT (-74.54593 41.33403)
## 8 Floral Park NY POINT (-73.70475 40.7232)
## 9 Floral Park NY POINT (-73.60177 40.75476)
## 10 Elmira NY POINT (-76.79217 42.09192)
## # ℹ more rows
SedonaSQL 支持 SpatialSQL 查询、空间计算操作,如 ST_GeomFromWKT / ST_Contains 等,详细介绍见 SedonaSQL 文档。
# 按 area, state 分组统计兴趣点在多边形区域内的数量
DBI::dbGetQuery(sc, "
SELECT area, state, count(*) cnt FROM
(SELECT area, ST_GeomFromWKT(polygons.geom) as y FROM polygons) polygons
INNER JOIN
(SELECT ST_GeomFromWKT(points.geom) as x, state, city FROM points) points
WHERE ST_Contains(polygons.y, points.x) GROUP BY area, state")
## area state cnt
## 1 dakota area SD 1
## 2 new york area NY 9
## 3 california area CA 10
## 4 texas area TX 10
## 5 dakota area ND 10
dplyr 的好处是提供一套将 R 代码转化为 SQL 代码的数据操作语法,对 R 语言用户过渡到 SQL 有些帮助。笔者推荐读者直接写 SQL 代码,在小数据集上完成开发,测试,而后可部署到生产环境,较之 R 代码,SQL 代码更加流行、更方便交流,在调试阶段,更不用担心 dplyr 层翻译出错的问题,降低排错的成本。
# ST_GeomFromWKT 也可以写做 st_geomfromwkt
polygons_wkt <- dplyr::mutate(polygons_wkt, y = ST_GeomFromWKT(geom))
points_wkt <- dplyr::mutate(points_wkt, x = ST_GeomFromWKT(geom))
# 按 area, state 分组统计各个多边形区域内包含的空间点的数量
sc_res <- dplyr::inner_join(
polygons_wkt, points_wkt,
sql_on = dplyr::sql("ST_Contains(y,x)")
) |>
dplyr::group_by(area, state) |>
dplyr::summarise(cnt = n())
查看统计的结果,顺便一说, dplyr 采用惰性求值,真正开始查询计算是执行下面一行命令的时候。
dplyr::collect(sc_res)
## `summarise()` has grouped output by "area". You can override using the
## `.groups` argument.
## # A tibble: 5 × 3
## area state cnt
## <chr> <chr> <dbl>
## 1 dakota area SD 1
## 2 new york area NY 9
## 3 california area CA 10
## 4 texas area TX 10
## 5 dakota area ND 10
调用 gglite 包可视化查询结果,适用于常规数据可视化图形。
library(gglite)
# 柱形图展示结果
dplyr::collect(sc_res) |>
g2(cnt ~ area) |>
mark_interval()
在 Sedona 中进行大规模空间数据操作(查询),将查询结果转为 simple feature 对象,用 sf 包操作,最后,由 R 语言来做进一步数据探索、分析、建模、可视化、报告。
# 查询结果转为 sf
tb_sf <- merge(
x = collect(sc_res),
y = polygons, by = "area", all.y = T,
sort = F # 不对结果排序
) |>
sf::st_as_sf(wkt = "geom")
tb_sf
## Simple feature collection with 5 features and 3 fields
## Geometry type: POLYGON
## Dimension: XY
## Bounding box: xmin: -126.5 ymin: 25.4 xmax: -70.75 ymax: 49.67
## CRS: NA
## area state cnt geom
## 1 dakota area SD 1 POLYGON ((-106.1 44.21, -10...
## 2 dakota area ND 10 POLYGON ((-106.1 44.21, -10...
## 3 new york area NY 9 POLYGON ((-80.51 36.24, -80...
## 4 california area CA 10 POLYGON ((-126.5 32.99, -12...
## 5 texas area TX 10 POLYGON ((-106.5 25.4, -106...
最后,当不需要使用 Spark 的时候,要关闭清理连接。
# 关闭连接
spark_disconnect(sc)
本文运行环境如下:
sessionInfo()
## R version 4.6.0 (2026-04-24)
## Platform: aarch64-apple-darwin23
## Running under: macOS Tahoe 26.4.1
##
## Matrix products: default
## BLAS: /Library/Frameworks/R.framework/Versions/4.6/Resources/lib/libRblas.0.dylib
## LAPACK: /Library/Frameworks/R.framework/Versions/4.6/Resources/lib/libRlapack.dylib; LAPACK version 3.12.1
##
## locale:
## [1] en_US.UTF-8/en_US.UTF-8/en_US.UTF-8/C/en_US.UTF-8/en_US.UTF-8
##
## time zone: Asia/Shanghai
## tzcode source: internal
##
## attached base packages:
## [1] stats graphics grDevices utils datasets methods base
##
## other attached packages:
## [1] gglite_0.0.31 DBI_1.3.0 apache.sedona_1.9.0
## [4] sparklyr_1.9.4
##
## loaded via a namespace (and not attached):
## [1] config_0.3.2 sass_0.4.10 utf8_1.2.6 generics_0.1.4
## [5] tidyr_1.3.2 class_7.3-23 KernSmooth_2.23-26 blogdown_1.23
## [9] digest_0.6.39 magrittr_2.0.5 evaluate_1.0.5 grid_4.6.0
## [13] bookdown_0.46 fastmap_1.2.0 blob_1.3.0 jsonlite_2.0.0
## [17] e1071_1.7-17 httr_1.4.8 purrr_1.2.2 jquerylib_0.1.4
## [21] cli_3.6.6 rlang_1.2.0 units_1.0-1 dbplyr_2.5.2
## [25] withr_3.0.2 cachem_1.1.0 yaml_2.3.12 otel_0.2.0
## [29] tools_4.6.0 parallel_4.6.0 dplyr_1.2.1 vctrs_0.7.3
## [33] R6_2.6.1 proxy_0.4-29 lifecycle_1.0.5 classInt_0.4-11
## [37] pkgconfig_2.0.3 pillar_1.11.1 bslib_0.10.0 Rcpp_1.1.1-1.1
## [41] glue_1.8.1 sf_1.1-0 xfun_0.57 tibble_3.3.1
## [45] tidyselect_1.2.1 rstudioapi_0.18.0 knitr_1.51 htmltools_0.5.9
## [49] rmarkdown_2.31 compiler_4.6.0 askpass_1.2.1 openssl_2.4.0