[27]:
# 首先创建一个 Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark
[27]:
SparkSession - in-memory
Filter¶
本章重点介绍用 PySpark 对数据进行筛选, 过滤的方法. 我们首先创建了一个简单的 IOT 物联网传感器数据集.
[28]:
pdf = spark.createDataFrame(
[
(1, "2022-01-01 08:00:00", "c3d79cd0", "temperature", 76.0),
(2, "2022-01-01 09:00:00", "c3d79cd0", "temperature", 77.0),
(3, "2022-01-01 10:00:00", "c3d79cd0", "temperature", 78.0),
(4, "2022-01-01 11:00:00", "c3d79cd0", "temperature", 82.0),
(5, "2022-01-01 12:00:00", "c3d79cd0", "temperature", 88.0),
(6, "2022-01-01 08:00:00", "c3d79cd0", "humidity", 0.45),
(7, "2022-01-01 09:00:00", "c3d79cd0", "humidity", 0.43),
(8, "2022-01-01 10:00:00", "c3d79cd0", "humidity", 0.48),
(9, "2022-01-01 11:00:00", "c3d79cd0", "humidity", 0.37),
(10, "2022-01-01 12:00:00", "c3d79cd0", "humidity", 0.33),
(11, "2022-01-01 08:00:00", "a5a8e0c2", "temperature", 66.0),
(12, "2022-01-01 09:00:00", "a5a8e0c2", "temperature", 68.0),
(13, "2022-01-01 10:00:00", "a5a8e0c2", "temperature", 70.0),
(14, "2022-01-01 11:00:00", "a5a8e0c2", "temperature", 73.0),
(15, "2022-01-01 12:00:00", "a5a8e0c2", "temperature", 77.0),
(16, "2022-01-01 08:00:00", "a5a8e0c2", "humidity", 0.67),
(17, "2022-01-01 09:00:00", "a5a8e0c2", "humidity", 0.53),
(18, "2022-01-01 10:00:00", "a5a8e0c2", "humidity", 0.45),
(19, "2022-01-01 11:00:00", "a5a8e0c2", "humidity", 0.44),
(20, "2022-01-01 12:00:00", "a5a8e0c2", "humidity", 0.45),
],
("event_id", "date", "device_id", "measurement_type", "value"),
)
pdf.show()
+--------+-------------------+---------+----------------+-----+
|event_id| date|device_id|measurement_type|value|
+--------+-------------------+---------+----------------+-----+
| 1|2022-01-01 08:00:00| c3d79cd0| temperature| 76.0|
| 2|2022-01-01 09:00:00| c3d79cd0| temperature| 77.0|
| 3|2022-01-01 10:00:00| c3d79cd0| temperature| 78.0|
| 4|2022-01-01 11:00:00| c3d79cd0| temperature| 82.0|
| 5|2022-01-01 12:00:00| c3d79cd0| temperature| 88.0|
| 6|2022-01-01 08:00:00| c3d79cd0| humidity| 0.45|
| 7|2022-01-01 09:00:00| c3d79cd0| humidity| 0.43|
| 8|2022-01-01 10:00:00| c3d79cd0| humidity| 0.48|
| 9|2022-01-01 11:00:00| c3d79cd0| humidity| 0.37|
| 10|2022-01-01 12:00:00| c3d79cd0| humidity| 0.33|
| 11|2022-01-01 08:00:00| a5a8e0c2| temperature| 66.0|
| 12|2022-01-01 09:00:00| a5a8e0c2| temperature| 68.0|
| 13|2022-01-01 10:00:00| a5a8e0c2| temperature| 70.0|
| 14|2022-01-01 11:00:00| a5a8e0c2| temperature| 73.0|
| 15|2022-01-01 12:00:00| a5a8e0c2| temperature| 77.0|
| 16|2022-01-01 08:00:00| a5a8e0c2| humidity| 0.67|
| 17|2022-01-01 09:00:00| a5a8e0c2| humidity| 0.53|
| 18|2022-01-01 10:00:00| a5a8e0c2| humidity| 0.45|
| 19|2022-01-01 11:00:00| a5a8e0c2| humidity| 0.44|
| 20|2022-01-01 12:00:00| a5a8e0c2| humidity| 0.45|
+--------+-------------------+---------+----------------+-----+
Filter on Column Value¶
基于 Column 中的值对数据进行过滤在数据分析中非常常见. 通常在 SQL 中我们会用 WHERE + 逻辑 AND, OR, NOT 来实现. 在 PySpark SQL 中我们可以用 filter 或是 where 来进行.
而过滤条件则可以用 Column 与数值或是其他 Column 的比较, 然后加上 Python 中的 &
(and), |
(or), !
(not) 逻辑运算法来对条件进行排列组合. 这里要注意单个的条件要先用括号括起来再参与逻辑运算. 和 Python 一样, 多个条件进行逻辑运算会使用 fast fail, 比如你的条件是 A and B and C, 那么如果 B 为 False, 就已经知道这个计算结果一定是 False, 就没有比较计算 C 了.
[29]:
# 使用 filter 以及 条件过滤
pdf.filter(
(pdf.event_id > 10)
& (pdf.event_id <= 15)
).show()
+--------+-------------------+---------+----------------+-----+
|event_id| date|device_id|measurement_type|value|
+--------+-------------------+---------+----------------+-----+
| 11|2022-01-01 08:00:00| a5a8e0c2| temperature| 66.0|
| 12|2022-01-01 09:00:00| a5a8e0c2| temperature| 68.0|
| 13|2022-01-01 10:00:00| a5a8e0c2| temperature| 70.0|
| 14|2022-01-01 11:00:00| a5a8e0c2| temperature| 73.0|
| 15|2022-01-01 12:00:00| a5a8e0c2| temperature| 77.0|
+--------+-------------------+---------+----------------+-----+
PySpark 还支持将 DataFrame 注册成一个虚拟的 Table, 然后用 SQL 对数据进行分析
[30]:
# 先将 DataFrame 注册成一个 View (Table)
pdf.createOrReplaceTempView("t")
# 然后再使用 SQL query
spark.sql("""
SELECT *
FROM t
WHERE
t.event_id > 10
AND t.event_id <= 15
""").show()
+--------+-------------------+---------+----------------+-----+
|event_id| date|device_id|measurement_type|value|
+--------+-------------------+---------+----------------+-----+
| 11|2022-01-01 08:00:00| a5a8e0c2| temperature| 66.0|
| 12|2022-01-01 09:00:00| a5a8e0c2| temperature| 68.0|
| 13|2022-01-01 10:00:00| a5a8e0c2| temperature| 70.0|
| 14|2022-01-01 11:00:00| a5a8e0c2| temperature| 73.0|
| 15|2022-01-01 12:00:00| a5a8e0c2| temperature| 77.0|
+--------+-------------------+---------+----------------+-----+
[31]:
# 在 3.3.0 版本后增加了 SQL 变量的语法, 支持使用 **kwargs 注入变量
# 但这个功能还不稳定, 可能随时会变化
spark.sql(
"""
SELECT *
FROM {t}
WHERE
{t}.event_id > 10
AND {t}.event_id <= 15
""",
t=pdf,
).show()
+--------+-------------------+---------+----------------+-----+
|event_id| date|device_id|measurement_type|value|
+--------+-------------------+---------+----------------+-----+
| 11|2022-01-01 08:00:00| a5a8e0c2| temperature| 66.0|
| 12|2022-01-01 09:00:00| a5a8e0c2| temperature| 68.0|
| 13|2022-01-01 10:00:00| a5a8e0c2| temperature| 70.0|
| 14|2022-01-01 11:00:00| a5a8e0c2| temperature| 73.0|
| 15|2022-01-01 12:00:00| a5a8e0c2| temperature| 77.0|
+--------+-------------------+---------+----------------+-----+
Use UDF in Filter Condition¶
有的时候我们的条件判断逻辑很复杂, 需要自己实现. 这时我们就可以用 UDF 来自定义一个函数. 这个函数接受一个单个值作为输入, 返回 bool 值. 而我们用 udf
函数将其注册后, 该函数就变成了一个接受 Column 对象的函数. 注意我们最好显式声明该函数返回的值的类型, 以便于 Spark 计算引擎基于此进行优化.
[32]:
from typing import Union
import pyspark.sql.functions as F
import pyspark.sql.types as T
# 你可以针对每一个 Column 定义一个函数
@F.udf(T.BooleanType())
def is_temperature(measurement_type: str) -> bool:
return measurement_type == "temperature"
@F.udf(T.BooleanType())
def is_comfortable_temperature(v: [int, float]) -> bool:
return 68 <= v <= 76
pdf.filter(
is_temperature(pdf.measurement_type)
& (is_comfortable_temperature(pdf.value))
).show()
+--------+-------------------+---------+----------------+-----+
|event_id| date|device_id|measurement_type|value|
+--------+-------------------+---------+----------------+-----+
| 1|2022-01-01 08:00:00| c3d79cd0| temperature| 76.0|
| 12|2022-01-01 09:00:00| a5a8e0c2| temperature| 68.0|
| 13|2022-01-01 10:00:00| a5a8e0c2| temperature| 70.0|
| 14|2022-01-01 11:00:00| a5a8e0c2| temperature| 73.0|
+--------+-------------------+---------+----------------+-----+
[33]:
# 也可以针对多个 Column 只定义一个函数
# 只不过这个函数接受 N 个固定参数 (N 等于针对的 Column 的数量)
@F.udf(T.BooleanType())
def is_comfortable_temperature_v2(measure_type: str, value: [int, float]) -> bool:
return (measure_type == "temperature") and (68 <= value <= 76)
pdf.filter(
is_comfortable_temperature_v2(pdf.measurement_type, pdf.value)
).show()
+--------+-------------------+---------+----------------+-----+
|event_id| date|device_id|measurement_type|value|
+--------+-------------------+---------+----------------+-----+
| 1|2022-01-01 08:00:00| c3d79cd0| temperature| 76.0|
| 12|2022-01-01 09:00:00| a5a8e0c2| temperature| 68.0|
| 13|2022-01-01 10:00:00| a5a8e0c2| temperature| 70.0|
| 14|2022-01-01 11:00:00| a5a8e0c2| temperature| 73.0|
+--------+-------------------+---------+----------------+-----+
[ ]: