[27]:
# 首先创建一个 Spark Session
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark
[27]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.3.0
Master
local[*]
AppName
pyspark-shell

Filter

本章重点介绍用 PySpark 对数据进行筛选, 过滤的方法. 我们首先创建了一个简单的 IOT 物联网传感器数据集.

[28]:
pdf = spark.createDataFrame(
    [
        (1, "2022-01-01 08:00:00", "c3d79cd0", "temperature", 76.0),
        (2, "2022-01-01 09:00:00", "c3d79cd0", "temperature", 77.0),
        (3, "2022-01-01 10:00:00", "c3d79cd0", "temperature", 78.0),
        (4, "2022-01-01 11:00:00", "c3d79cd0", "temperature", 82.0),
        (5, "2022-01-01 12:00:00", "c3d79cd0", "temperature", 88.0),
        (6, "2022-01-01 08:00:00", "c3d79cd0", "humidity", 0.45),
        (7, "2022-01-01 09:00:00", "c3d79cd0", "humidity", 0.43),
        (8, "2022-01-01 10:00:00", "c3d79cd0", "humidity", 0.48),
        (9, "2022-01-01 11:00:00", "c3d79cd0", "humidity", 0.37),
        (10, "2022-01-01 12:00:00", "c3d79cd0", "humidity", 0.33),
        (11, "2022-01-01 08:00:00", "a5a8e0c2", "temperature", 66.0),
        (12, "2022-01-01 09:00:00", "a5a8e0c2", "temperature", 68.0),
        (13, "2022-01-01 10:00:00", "a5a8e0c2", "temperature", 70.0),
        (14, "2022-01-01 11:00:00", "a5a8e0c2", "temperature", 73.0),
        (15, "2022-01-01 12:00:00", "a5a8e0c2", "temperature", 77.0),
        (16, "2022-01-01 08:00:00", "a5a8e0c2", "humidity", 0.67),
        (17, "2022-01-01 09:00:00", "a5a8e0c2", "humidity", 0.53),
        (18, "2022-01-01 10:00:00", "a5a8e0c2", "humidity", 0.45),
        (19, "2022-01-01 11:00:00", "a5a8e0c2", "humidity", 0.44),
        (20, "2022-01-01 12:00:00", "a5a8e0c2", "humidity", 0.45),
    ],
    ("event_id", "date", "device_id", "measurement_type", "value"),
)
pdf.show()
+--------+-------------------+---------+----------------+-----+
|event_id|               date|device_id|measurement_type|value|
+--------+-------------------+---------+----------------+-----+
|       1|2022-01-01 08:00:00| c3d79cd0|     temperature| 76.0|
|       2|2022-01-01 09:00:00| c3d79cd0|     temperature| 77.0|
|       3|2022-01-01 10:00:00| c3d79cd0|     temperature| 78.0|
|       4|2022-01-01 11:00:00| c3d79cd0|     temperature| 82.0|
|       5|2022-01-01 12:00:00| c3d79cd0|     temperature| 88.0|
|       6|2022-01-01 08:00:00| c3d79cd0|        humidity| 0.45|
|       7|2022-01-01 09:00:00| c3d79cd0|        humidity| 0.43|
|       8|2022-01-01 10:00:00| c3d79cd0|        humidity| 0.48|
|       9|2022-01-01 11:00:00| c3d79cd0|        humidity| 0.37|
|      10|2022-01-01 12:00:00| c3d79cd0|        humidity| 0.33|
|      11|2022-01-01 08:00:00| a5a8e0c2|     temperature| 66.0|
|      12|2022-01-01 09:00:00| a5a8e0c2|     temperature| 68.0|
|      13|2022-01-01 10:00:00| a5a8e0c2|     temperature| 70.0|
|      14|2022-01-01 11:00:00| a5a8e0c2|     temperature| 73.0|
|      15|2022-01-01 12:00:00| a5a8e0c2|     temperature| 77.0|
|      16|2022-01-01 08:00:00| a5a8e0c2|        humidity| 0.67|
|      17|2022-01-01 09:00:00| a5a8e0c2|        humidity| 0.53|
|      18|2022-01-01 10:00:00| a5a8e0c2|        humidity| 0.45|
|      19|2022-01-01 11:00:00| a5a8e0c2|        humidity| 0.44|
|      20|2022-01-01 12:00:00| a5a8e0c2|        humidity| 0.45|
+--------+-------------------+---------+----------------+-----+

Filter on Column Value

基于 Column 中的值对数据进行过滤在数据分析中非常常见. 通常在 SQL 中我们会用 WHERE + 逻辑 AND, OR, NOT 来实现. 在 PySpark SQL 中我们可以用 filter 或是 where 来进行.

而过滤条件则可以用 Column 与数值或是其他 Column 的比较, 然后加上 Python 中的 & (and), | (or), ! (not) 逻辑运算法来对条件进行排列组合. 这里要注意单个的条件要先用括号括起来再参与逻辑运算. 和 Python 一样, 多个条件进行逻辑运算会使用 fast fail, 比如你的条件是 A and B and C, 那么如果 B 为 False, 就已经知道这个计算结果一定是 False, 就没有比较计算 C 了.

[29]:
# 使用 filter 以及 条件过滤
pdf.filter(
    (pdf.event_id > 10)
    & (pdf.event_id <= 15)
).show()
+--------+-------------------+---------+----------------+-----+
|event_id|               date|device_id|measurement_type|value|
+--------+-------------------+---------+----------------+-----+
|      11|2022-01-01 08:00:00| a5a8e0c2|     temperature| 66.0|
|      12|2022-01-01 09:00:00| a5a8e0c2|     temperature| 68.0|
|      13|2022-01-01 10:00:00| a5a8e0c2|     temperature| 70.0|
|      14|2022-01-01 11:00:00| a5a8e0c2|     temperature| 73.0|
|      15|2022-01-01 12:00:00| a5a8e0c2|     temperature| 77.0|
+--------+-------------------+---------+----------------+-----+

PySpark 还支持将 DataFrame 注册成一个虚拟的 Table, 然后用 SQL 对数据进行分析

[30]:
# 先将 DataFrame 注册成一个 View (Table)
pdf.createOrReplaceTempView("t")

# 然后再使用 SQL query
spark.sql("""
SELECT *
FROM t
WHERE
    t.event_id > 10
    AND t.event_id <= 15
""").show()
+--------+-------------------+---------+----------------+-----+
|event_id|               date|device_id|measurement_type|value|
+--------+-------------------+---------+----------------+-----+
|      11|2022-01-01 08:00:00| a5a8e0c2|     temperature| 66.0|
|      12|2022-01-01 09:00:00| a5a8e0c2|     temperature| 68.0|
|      13|2022-01-01 10:00:00| a5a8e0c2|     temperature| 70.0|
|      14|2022-01-01 11:00:00| a5a8e0c2|     temperature| 73.0|
|      15|2022-01-01 12:00:00| a5a8e0c2|     temperature| 77.0|
+--------+-------------------+---------+----------------+-----+

[31]:
# 在 3.3.0 版本后增加了 SQL 变量的语法, 支持使用 **kwargs 注入变量
# 但这个功能还不稳定, 可能随时会变化
spark.sql(
    """
    SELECT *
    FROM {t}
    WHERE
        {t}.event_id > 10
        AND {t}.event_id <= 15
    """,
    t=pdf,
).show()
+--------+-------------------+---------+----------------+-----+
|event_id|               date|device_id|measurement_type|value|
+--------+-------------------+---------+----------------+-----+
|      11|2022-01-01 08:00:00| a5a8e0c2|     temperature| 66.0|
|      12|2022-01-01 09:00:00| a5a8e0c2|     temperature| 68.0|
|      13|2022-01-01 10:00:00| a5a8e0c2|     temperature| 70.0|
|      14|2022-01-01 11:00:00| a5a8e0c2|     temperature| 73.0|
|      15|2022-01-01 12:00:00| a5a8e0c2|     temperature| 77.0|
+--------+-------------------+---------+----------------+-----+

Use UDF in Filter Condition

有的时候我们的条件判断逻辑很复杂, 需要自己实现. 这时我们就可以用 UDF 来自定义一个函数. 这个函数接受一个单个值作为输入, 返回 bool 值. 而我们用 udf 函数将其注册后, 该函数就变成了一个接受 Column 对象的函数. 注意我们最好显式声明该函数返回的值的类型, 以便于 Spark 计算引擎基于此进行优化.

[32]:
from typing import Union
import pyspark.sql.functions as F
import pyspark.sql.types as T

# 你可以针对每一个 Column 定义一个函数
@F.udf(T.BooleanType())
def is_temperature(measurement_type: str) -> bool:
    return measurement_type == "temperature"


@F.udf(T.BooleanType())
def is_comfortable_temperature(v: [int, float]) -> bool:
    return 68 <= v <= 76


pdf.filter(
    is_temperature(pdf.measurement_type)
    & (is_comfortable_temperature(pdf.value))
).show()
+--------+-------------------+---------+----------------+-----+
|event_id|               date|device_id|measurement_type|value|
+--------+-------------------+---------+----------------+-----+
|       1|2022-01-01 08:00:00| c3d79cd0|     temperature| 76.0|
|      12|2022-01-01 09:00:00| a5a8e0c2|     temperature| 68.0|
|      13|2022-01-01 10:00:00| a5a8e0c2|     temperature| 70.0|
|      14|2022-01-01 11:00:00| a5a8e0c2|     temperature| 73.0|
+--------+-------------------+---------+----------------+-----+

[33]:
# 也可以针对多个 Column 只定义一个函数
# 只不过这个函数接受 N 个固定参数 (N 等于针对的 Column 的数量)
@F.udf(T.BooleanType())
def is_comfortable_temperature_v2(measure_type: str, value: [int, float]) -> bool:
    return (measure_type == "temperature") and (68 <= value <= 76)

pdf.filter(
    is_comfortable_temperature_v2(pdf.measurement_type, pdf.value)
).show()
+--------+-------------------+---------+----------------+-----+
|event_id|               date|device_id|measurement_type|value|
+--------+-------------------+---------+----------------+-----+
|       1|2022-01-01 08:00:00| c3d79cd0|     temperature| 76.0|
|      12|2022-01-01 09:00:00| a5a8e0c2|     temperature| 68.0|
|      13|2022-01-01 10:00:00| a5a8e0c2|     temperature| 70.0|
|      14|2022-01-01 11:00:00| a5a8e0c2|     temperature| 73.0|
+--------+-------------------+---------+----------------+-----+

[ ]: