{ "cells": [ { "cell_type": "code", "execution_count": 27, "id": "83b55789-8f71-4827-9ca1-4a4d265d1dcd", "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "

SparkSession - in-memory

\n", " \n", "
\n", "

SparkContext

\n", "\n", "

Spark UI

\n", "\n", "
\n", "
Version
\n", "
v3.3.0
\n", "
Master
\n", "
local[*]
\n", "
AppName
\n", "
pyspark-shell
\n", "
\n", "
\n", " \n", "
\n", " " ], "text/plain": [ "" ] }, "execution_count": 27, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 首先创建一个 Spark Session\n", "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.builder.getOrCreate()\n", "spark" ] }, { "cell_type": "markdown", "id": "4c861b90-7755-46bf-82b1-8d701812623b", "metadata": {}, "source": [ "# Filter\n", "\n", "本章重点介绍用 PySpark 对数据进行筛选, 过滤的方法. 我们首先创建了一个简单的 IOT 物联网传感器数据集." ] }, { "cell_type": "code", "execution_count": 28, "id": "e7d60157-698e-4b52-95e5-abfc2c2a9849", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+-------------------+---------+----------------+-----+\n", "|event_id| date|device_id|measurement_type|value|\n", "+--------+-------------------+---------+----------------+-----+\n", "| 1|2022-01-01 08:00:00| c3d79cd0| temperature| 76.0|\n", "| 2|2022-01-01 09:00:00| c3d79cd0| temperature| 77.0|\n", "| 3|2022-01-01 10:00:00| c3d79cd0| temperature| 78.0|\n", "| 4|2022-01-01 11:00:00| c3d79cd0| temperature| 82.0|\n", "| 5|2022-01-01 12:00:00| c3d79cd0| temperature| 88.0|\n", "| 6|2022-01-01 08:00:00| c3d79cd0| humidity| 0.45|\n", "| 7|2022-01-01 09:00:00| c3d79cd0| humidity| 0.43|\n", "| 8|2022-01-01 10:00:00| c3d79cd0| humidity| 0.48|\n", "| 9|2022-01-01 11:00:00| c3d79cd0| humidity| 0.37|\n", "| 10|2022-01-01 12:00:00| c3d79cd0| humidity| 0.33|\n", "| 11|2022-01-01 08:00:00| a5a8e0c2| temperature| 66.0|\n", "| 12|2022-01-01 09:00:00| a5a8e0c2| temperature| 68.0|\n", "| 13|2022-01-01 10:00:00| a5a8e0c2| temperature| 70.0|\n", "| 14|2022-01-01 11:00:00| a5a8e0c2| temperature| 73.0|\n", "| 15|2022-01-01 12:00:00| a5a8e0c2| temperature| 77.0|\n", "| 16|2022-01-01 08:00:00| a5a8e0c2| humidity| 0.67|\n", "| 17|2022-01-01 09:00:00| a5a8e0c2| humidity| 0.53|\n", "| 18|2022-01-01 10:00:00| a5a8e0c2| humidity| 0.45|\n", "| 19|2022-01-01 11:00:00| a5a8e0c2| humidity| 0.44|\n", "| 20|2022-01-01 12:00:00| a5a8e0c2| humidity| 0.45|\n", "+--------+-------------------+---------+----------------+-----+\n", "\n" ] } ], "source": [ "pdf = spark.createDataFrame(\n", " [\n", " (1, \"2022-01-01 08:00:00\", \"c3d79cd0\", \"temperature\", 76.0),\n", " (2, \"2022-01-01 09:00:00\", \"c3d79cd0\", \"temperature\", 77.0),\n", " (3, \"2022-01-01 10:00:00\", \"c3d79cd0\", \"temperature\", 78.0),\n", " (4, \"2022-01-01 11:00:00\", \"c3d79cd0\", \"temperature\", 82.0),\n", " (5, \"2022-01-01 12:00:00\", \"c3d79cd0\", \"temperature\", 88.0),\n", " (6, \"2022-01-01 08:00:00\", \"c3d79cd0\", \"humidity\", 0.45),\n", " (7, \"2022-01-01 09:00:00\", \"c3d79cd0\", \"humidity\", 0.43),\n", " (8, \"2022-01-01 10:00:00\", \"c3d79cd0\", \"humidity\", 0.48),\n", " (9, \"2022-01-01 11:00:00\", \"c3d79cd0\", \"humidity\", 0.37),\n", " (10, \"2022-01-01 12:00:00\", \"c3d79cd0\", \"humidity\", 0.33),\n", " (11, \"2022-01-01 08:00:00\", \"a5a8e0c2\", \"temperature\", 66.0),\n", " (12, \"2022-01-01 09:00:00\", \"a5a8e0c2\", \"temperature\", 68.0),\n", " (13, \"2022-01-01 10:00:00\", \"a5a8e0c2\", \"temperature\", 70.0),\n", " (14, \"2022-01-01 11:00:00\", \"a5a8e0c2\", \"temperature\", 73.0),\n", " (15, \"2022-01-01 12:00:00\", \"a5a8e0c2\", \"temperature\", 77.0),\n", " (16, \"2022-01-01 08:00:00\", \"a5a8e0c2\", \"humidity\", 0.67),\n", " (17, \"2022-01-01 09:00:00\", \"a5a8e0c2\", \"humidity\", 0.53),\n", " (18, \"2022-01-01 10:00:00\", \"a5a8e0c2\", \"humidity\", 0.45),\n", " (19, \"2022-01-01 11:00:00\", \"a5a8e0c2\", \"humidity\", 0.44),\n", " (20, \"2022-01-01 12:00:00\", \"a5a8e0c2\", \"humidity\", 0.45),\n", " ],\n", " (\"event_id\", \"date\", \"device_id\", \"measurement_type\", \"value\"),\n", ")\n", "pdf.show()" ] }, { "cell_type": "markdown", "id": "214ef1f3-28a8-4795-9883-60c66721896e", "metadata": {}, "source": [ "## Filter on Column Value\n", "\n", "基于 Column 中的值对数据进行过滤在数据分析中非常常见. 通常在 SQL 中我们会用 WHERE + 逻辑 AND, OR, NOT 来实现. 在 PySpark SQL 中我们可以用 [filter](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.filter.html#pyspark.sql.DataFrame.filter) 或是 [where](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.where.html#pyspark.sql.DataFrame.where) 来进行.\n", "\n", "而过滤条件则可以用 Column 与数值或是其他 Column 的比较, 然后加上 Python 中的 ``&`` (and), ``|`` (or), ``!`` (not) 逻辑运算法来对条件进行排列组合. 这里要注意单个的条件要先用括号括起来再参与逻辑运算. 和 Python 一样, 多个条件进行逻辑运算会使用 fast fail, 比如你的条件是 A and B and C, 那么如果 B 为 False, 就已经知道这个计算结果一定是 False, 就没有比较计算 C 了.\n" ] }, { "cell_type": "code", "execution_count": 29, "id": "90434df6-fe5a-43e8-8568-1be0d6428a25", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+-------------------+---------+----------------+-----+\n", "|event_id| date|device_id|measurement_type|value|\n", "+--------+-------------------+---------+----------------+-----+\n", "| 11|2022-01-01 08:00:00| a5a8e0c2| temperature| 66.0|\n", "| 12|2022-01-01 09:00:00| a5a8e0c2| temperature| 68.0|\n", "| 13|2022-01-01 10:00:00| a5a8e0c2| temperature| 70.0|\n", "| 14|2022-01-01 11:00:00| a5a8e0c2| temperature| 73.0|\n", "| 15|2022-01-01 12:00:00| a5a8e0c2| temperature| 77.0|\n", "+--------+-------------------+---------+----------------+-----+\n", "\n" ] } ], "source": [ "# 使用 filter 以及 条件过滤\n", "pdf.filter(\n", " (pdf.event_id > 10)\n", " & (pdf.event_id <= 15)\n", ").show()" ] }, { "cell_type": "markdown", "id": "698b1e3d-d1b3-4afe-97c9-cce96a70bbb7", "metadata": {}, "source": [ "PySpark 还支持将 DataFrame 注册成一个虚拟的 Table, 然后用 SQL 对数据进行分析" ] }, { "cell_type": "code", "execution_count": 30, "id": "6b1a1261-e452-4d7a-aafc-03856397da13", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+-------------------+---------+----------------+-----+\n", "|event_id| date|device_id|measurement_type|value|\n", "+--------+-------------------+---------+----------------+-----+\n", "| 11|2022-01-01 08:00:00| a5a8e0c2| temperature| 66.0|\n", "| 12|2022-01-01 09:00:00| a5a8e0c2| temperature| 68.0|\n", "| 13|2022-01-01 10:00:00| a5a8e0c2| temperature| 70.0|\n", "| 14|2022-01-01 11:00:00| a5a8e0c2| temperature| 73.0|\n", "| 15|2022-01-01 12:00:00| a5a8e0c2| temperature| 77.0|\n", "+--------+-------------------+---------+----------------+-----+\n", "\n" ] } ], "source": [ "# 先将 DataFrame 注册成一个 View (Table)\n", "pdf.createOrReplaceTempView(\"t\")\n", "\n", "# 然后再使用 SQL query\n", "spark.sql(\"\"\"\n", "SELECT * \n", "FROM t\n", "WHERE \n", " t.event_id > 10\n", " AND t.event_id <= 15\n", "\"\"\").show()" ] }, { "cell_type": "code", "execution_count": 31, "id": "7e8a94a1-37a3-471d-a4f2-a37974b123b5", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+-------------------+---------+----------------+-----+\n", "|event_id| date|device_id|measurement_type|value|\n", "+--------+-------------------+---------+----------------+-----+\n", "| 11|2022-01-01 08:00:00| a5a8e0c2| temperature| 66.0|\n", "| 12|2022-01-01 09:00:00| a5a8e0c2| temperature| 68.0|\n", "| 13|2022-01-01 10:00:00| a5a8e0c2| temperature| 70.0|\n", "| 14|2022-01-01 11:00:00| a5a8e0c2| temperature| 73.0|\n", "| 15|2022-01-01 12:00:00| a5a8e0c2| temperature| 77.0|\n", "+--------+-------------------+---------+----------------+-----+\n", "\n" ] } ], "source": [ "# 在 3.3.0 版本后增加了 SQL 变量的语法, 支持使用 **kwargs 注入变量\n", "# 但这个功能还不稳定, 可能随时会变化\n", "spark.sql(\n", " \"\"\"\n", " SELECT * \n", " FROM {t}\n", " WHERE \n", " {t}.event_id > 10\n", " AND {t}.event_id <= 15\n", " \"\"\",\n", " t=pdf,\n", ").show()" ] }, { "cell_type": "markdown", "id": "22782f53-8ea5-4953-b4c5-66e157434802", "metadata": {}, "source": [ "## Use UDF in Filter Condition\n", "\n", "有的时候我们的条件判断逻辑很复杂, 需要自己实现. 这时我们就可以用 UDF 来自定义一个函数. 这个函数接受一个单个值作为输入, 返回 bool 值. 而我们用 ``udf`` 函数将其注册后, 该函数就变成了一个接受 Column 对象的函数. 注意我们最好显式声明该函数返回的值的类型, 以便于 Spark 计算引擎基于此进行优化.\n" ] }, { "cell_type": "code", "execution_count": 32, "id": "e0260078-738b-4cbe-b8a6-618d6d519b77", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+-------------------+---------+----------------+-----+\n", "|event_id| date|device_id|measurement_type|value|\n", "+--------+-------------------+---------+----------------+-----+\n", "| 1|2022-01-01 08:00:00| c3d79cd0| temperature| 76.0|\n", "| 12|2022-01-01 09:00:00| a5a8e0c2| temperature| 68.0|\n", "| 13|2022-01-01 10:00:00| a5a8e0c2| temperature| 70.0|\n", "| 14|2022-01-01 11:00:00| a5a8e0c2| temperature| 73.0|\n", "+--------+-------------------+---------+----------------+-----+\n", "\n" ] } ], "source": [ "from typing import Union\n", "import pyspark.sql.functions as F\n", "import pyspark.sql.types as T\n", "\n", "# 你可以针对每一个 Column 定义一个函数\n", "@F.udf(T.BooleanType())\n", "def is_temperature(measurement_type: str) -> bool:\n", " return measurement_type == \"temperature\"\n", "\n", "\n", "@F.udf(T.BooleanType())\n", "def is_comfortable_temperature(v: [int, float]) -> bool:\n", " return 68 <= v <= 76\n", "\n", "\n", "pdf.filter(\n", " is_temperature(pdf.measurement_type)\n", " & (is_comfortable_temperature(pdf.value))\n", ").show()" ] }, { "cell_type": "code", "execution_count": 33, "id": "85879254-1d82-4ec7-9d5c-cad256bd1495", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+-------------------+---------+----------------+-----+\n", "|event_id| date|device_id|measurement_type|value|\n", "+--------+-------------------+---------+----------------+-----+\n", "| 1|2022-01-01 08:00:00| c3d79cd0| temperature| 76.0|\n", "| 12|2022-01-01 09:00:00| a5a8e0c2| temperature| 68.0|\n", "| 13|2022-01-01 10:00:00| a5a8e0c2| temperature| 70.0|\n", "| 14|2022-01-01 11:00:00| a5a8e0c2| temperature| 73.0|\n", "+--------+-------------------+---------+----------------+-----+\n", "\n" ] } ], "source": [ "# 也可以针对多个 Column 只定义一个函数\n", "# 只不过这个函数接受 N 个固定参数 (N 等于针对的 Column 的数量)\n", "@F.udf(T.BooleanType())\n", "def is_comfortable_temperature_v2(measure_type: str, value: [int, float]) -> bool:\n", " return (measure_type == \"temperature\") and (68 <= value <= 76)\n", "\n", "pdf.filter(\n", " is_comfortable_temperature_v2(pdf.measurement_type, pdf.value)\n", ").show()" ] }, { "cell_type": "code", "execution_count": null, "id": "79d2d4a0-806a-47a4-8fe8-759949fc9d0f", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.5" } }, "nbformat": 4, "nbformat_minor": 5 }