{ "cells": [ { "cell_type": "markdown", "id": "12d609f1-5216-4a08-b205-1d8071e30ea6", "metadata": {}, "source": [ "# Aggregation\n", "\n", "所谓聚合计算就是将数据按照某种方式聚合成组, 在每一组内部进行某种计算, 然后再根据组进行汇总. 例如: 把订单数据按照日期汇总, 计算每天的总销售额. 简单来说就是 SQL 中的 GROUP BY.\n", "\n", "在 Spark 中聚合计算的语法非常灵活以及强大." ] }, { "cell_type": "code", "execution_count": 1, "id": "f7460206-7d3e-4874-828b-5d66c77b4de8", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Current directory: /home/jovyan/docs/source/02-Aggregation\n", "Current Python version: 3.10.5\n", "Current Python interpreter: /opt/conda/bin/python\n", "Current site-packages: ['/opt/conda/lib/python3.10/site-packages']\n" ] } ], "source": [ "import os\n", "import sys\n", "import site\n", "\n", "cwd = os.getcwd()\n", "print(f\"Current directory: {cwd}\")\n", "print(f\"Current Python version: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}\")\n", "print(f\"Current Python interpreter: {sys.executable}\")\n", "print(f\"Current site-packages: {site.getsitepackages()}\")\n", "\n", "sys.path.append(os.path.join(cwd, \"site-packages\"))" ] }, { "cell_type": "code", "execution_count": 38, "id": "9b8aa5fd-2d40-4cf9-85d8-c4df81ac4a0f", "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "

SparkSession - in-memory

\n", " \n", "
\n", "

SparkContext

\n", "\n", "

Spark UI

\n", "\n", "
\n", "
Version
\n", "
v3.3.0
\n", "
Master
\n", "
local[*]
\n", "
AppName
\n", "
pyspark-shell
\n", "
\n", "
\n", " \n", "
\n", " " ], "text/plain": [ "" ] }, "execution_count": 38, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 首先创建一个 Spark Session\n", "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.builder.getOrCreate()\n", "spark" ] }, { "cell_type": "markdown", "id": "6083eaaf-9b09-4d8a-9bee-cdc1dae3fa5f", "metadata": {}, "source": [ "## Basic Syntax" ] }, { "cell_type": "markdown", "id": "1f394d72-6443-467a-ae23-cc27de074b54", "metadata": {}, "source": [ "[pyspark.sql.functions](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/functions.html) 这个模块下有非常多用于计算的函数, 这里我们先将它导入, 以便后续使用." ] }, { "cell_type": "code", "execution_count": 39, "id": "d872c6dd-5d5f-4418-a41c-cc882eac7486", "metadata": {}, "outputs": [], "source": [ "import pyspark.sql.functions as func" ] }, { "cell_type": "code", "execution_count": 40, "id": "75154b97-b347-484f-8968-a6013100a72c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+----------+------+\n", "|order_id| date|amount|\n", "+--------+----------+------+\n", "| 1|2022-01-01| 5|\n", "| 2|2022-01-01| 8|\n", "| 3|2022-01-01| 2|\n", "| 4|2022-01-02| 9|\n", "| 5|2022-01-02| 10|\n", "| 6|2022-01-02| 7|\n", "| 7|2022-01-03| 1|\n", "| 8|2022-01-03| 4|\n", "| 9|2022-01-03| 6|\n", "| 10|2022-01-03| 3|\n", "+--------+----------+------+\n", "\n" ] } ], "source": [ "# Create from Python list of tuple\n", "pdf = spark.createDataFrame(\n", " [\n", " (1, \"2022-01-01\", 5),\n", " (2, \"2022-01-01\", 8),\n", " (3, \"2022-01-01\", 2),\n", " (4, \"2022-01-02\", 9),\n", " (5, \"2022-01-02\", 10),\n", " (6, \"2022-01-02\", 7),\n", " (7, \"2022-01-03\", 1),\n", " (8, \"2022-01-03\", 4),\n", " (9, \"2022-01-03\", 6),\n", " (10, \"2022-01-03\", 3),\n", " ],\n", " (\"order_id\", \"date\", \"amount\")\n", ")\n", "pdf.show()" ] }, { "cell_type": "markdown", "id": "d36e07e6-7427-4354-8912-9e77bf92a47d", "metadata": {}, "source": [ "聚合计算实际上有两个步骤, 先是 **聚合**, 然后才是 **计算**. 在 Spark 中由于 Lazy Load 的特性, 我们可以先对数据进行聚合, 生成一个 ``pyspark.sql.group.GroupedData`` 聚合对象, 此时计算没有实际发生, 所以没有开销, 而且我们可以复用这个聚合对象进行不同的计算." ] }, { "cell_type": "code", "execution_count": 70, "id": "aff1de56-1a1e-4cab-8853-353d18d0e962", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "" ] }, "execution_count": 70, "metadata": {}, "output_type": "execute_result" } ], "source": [ "gdf = pdf.groupBy(pdf.date)\n", "gdf" ] }, { "cell_type": "markdown", "id": "f5420196-dc41-4c77-8849-53cecd9d9f1d", "metadata": {}, "source": [ "下面的例子用到了 ``GroupedData`` 对象自带的一些聚合函数. 这只适用于聚合计算针对单一列, 比较简单的情况." ] }, { "cell_type": "code", "execution_count": 71, "id": "fc2aa99e-af41-4bc6-831f-82e236d45206", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+-----+\n", "| date|count|\n", "+----------+-----+\n", "|2022-01-01| 3|\n", "|2022-01-02| 3|\n", "|2022-01-03| 4|\n", "+----------+-----+\n", "\n" ] } ], "source": [ "# ( ... ) provides better readability\n", "# 统计每个组的行数\n", "(\n", " gdf.count().alias(\"total_orders\")\n", ").show()" ] }, { "cell_type": "code", "execution_count": 72, "id": "236f15c4-ca68-4193-ad49-6877b35e9c38", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+-----------+\n", "| date|sum(amount)|\n", "+----------+-----------+\n", "|2022-01-01| 15|\n", "|2022-01-02| 26|\n", "|2022-01-03| 14|\n", "+----------+-----------+\n", "\n" ] } ], "source": [ "# 计算每个组的销售额总额\n", "(\n", " gdf.sum(\"amount\").alias(\"total_amount\")\n", ").show()" ] }, { "cell_type": "markdown", "id": "f663c08a-d61d-414a-8475-0c617c084ca0", "metadata": {}, "source": [ "如果要对多个列甚至综合起来进行计算, 那么就要用到 [agg](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.GroupedData.agg.html#pyspark.sql.GroupedData.agg) 方法. 该方法和 select 类似, 支持更复杂的计算." ] }, { "cell_type": "code", "execution_count": 73, "id": "6c0d0695-cdd6-46d6-83f9-d0910ffd2e82", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+---------------+\n", "| date|count(order_id)|\n", "+----------+---------------+\n", "|2022-01-01| 3|\n", "|2022-01-02| 3|\n", "|2022-01-03| 4|\n", "+----------+---------------+\n", "\n", "+----------+-----------+\n", "| date|sum(amount)|\n", "+----------+-----------+\n", "|2022-01-01| 15|\n", "|2022-01-02| 26|\n", "|2022-01-03| 14|\n", "+----------+-----------+\n", "\n" ] } ], "source": [ "gdf.agg(func.count(pdf.order_id)).show()\n", "gdf.agg(func.sum(pdf.amount)).show()" ] }, { "cell_type": "code", "execution_count": 74, "id": "8bd3d253-8b1a-4b00-81b2-255490120721", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+------------+-------------+\n", "| date|total_orders|total_amounts|\n", "+----------+------------+-------------+\n", "|2022-01-01| 3| 15|\n", "|2022-01-02| 3| 26|\n", "|2022-01-03| 4| 14|\n", "+----------+------------+-------------+\n", "\n" ] } ], "source": [ "# 把两个统计数据放在一起\n", "(\n", " gdf.agg(\n", " func.count(pdf.order_id).alias(\"total_orders\"),\n", " func.sum(pdf.amount).alias(\"total_amounts\")\n", " )\n", ").show()" ] }, { "cell_type": "code", "execution_count": 75, "id": "5ec50137-23af-44fd-bed8-713ce802c91e", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+-------------------------------+\n", "| date|(count(order_id) * sum(amount))|\n", "+----------+-------------------------------+\n", "|2022-01-01| 45|\n", "|2022-01-02| 78|\n", "|2022-01-03| 56|\n", "+----------+-------------------------------+\n", "\n" ] } ], "source": [ "# 可以对两个列先单独计算, 再联合起来计算\n", "# 先对两个列分别进行统计行数和求和, 最后再求乘积\n", "gdf.agg(\n", " func.count(pdf.order_id) * func.sum(pdf.amount)\n", ").show()" ] }, { "cell_type": "code", "execution_count": 76, "id": "a74c432a-7c0f-45e0-81e9-141a32d9c40b", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+------------------------+\n", "| date|sum((order_id * amount))|\n", "+----------+------------------------+\n", "|2022-01-01| 27|\n", "|2022-01-02| 128|\n", "|2022-01-03| 123|\n", "+----------+------------------------+\n", "\n" ] } ], "source": [ "# 也可以对两个列先联合起来计算, 再汇总计算\n", "# 先对每个 pair 求乘积, 最后把乘积加起来\n", "gdf.agg(\n", " func.sum(pdf.order_id * pdf.amount)\n", ").show()" ] }, { "cell_type": "markdown", "id": "8a3fb2e9-1001-4841-98aa-51aaae380393", "metadata": {}, "source": [ "我们还能使用自定义 Python 函数对聚合后的数据进行计算. 由于数据是已经被聚合的了, 那么这个函数的输入则是一个类似列表的结构. \n", "\n", "PySpark 支持两种 UDF:\n", "\n", "- 纯 Python ``udf``, 接受的参数是一个单个值. \n", "- ``pandas_udf``: 接受的参数是一个 ``pandas.Series``, 可以理解为一个带有 index 的列表.\n", "\n", "对于聚合计算我们通常使用 ``pandas_udf``. \n", "\n", "在下面的例子里我们先定义了一个 Python 函数, 接受 ``pandas.Series`` 参数. 具体的逻辑是把里面的值乘以 10 再相加. 然后我们用 ``pyspark.sql.functions.pandas_udf`` 将这个纯 Python 函数注册成一个 ``pandas_udf``, 并且我们要显式告诉 Spark 他的返回对象是一个整数类型, 而函数的类型是用于 Aggregation 聚合计算的. 此时这个 Python 函数就已经变成一个对 column 进行计算的算子 (Operator) 了." ] }, { "cell_type": "code", "execution_count": 77, "id": "70aac052-70b4-4589-867d-d43d5b54ef53", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+----------------------------+\n", "| date|time_ten_and_sum_udf(amount)|\n", "+----------+----------------------------+\n", "|2022-01-01| 150|\n", "|2022-01-02| 260|\n", "|2022-01-03| 140|\n", "+----------+----------------------------+\n", "\n" ] } ], "source": [ "import pandas as pd\n", "from pyspark.sql.types import IntegerType\n", "\n", "@func.pandas_udf(\n", " returnType=\"int\", \n", " functionType=func.PandasUDFType.GROUPED_AGG,\n", ")\n", "def time_ten_and_sum_udf(values: pd.Series):\n", " total = 0\n", " for v in values:\n", " total += v * 10\n", " return total\n", "\n", "gdf.agg(\n", " time_ten_and_sum_udf(pdf.amount)\n", ").show()" ] }, { "cell_type": "markdown", "id": "83edc467-90f8-4e8e-a11a-930badafc06c", "metadata": {}, "source": [ "## Group By Multiple Columns\n", "\n", "有时候我们需要对多个列进行聚合.\n", "\n", "在下面的例子里我们有一个超市的销售数据. 有订单号, 日期, 商品, 以及在订单内卖出去的数量.\n", "\n", "现在我们想知道在每一天里, 每个商品一共卖出去了多少个, 以及有多少个订单购买了这件商品." ] }, { "cell_type": "code", "execution_count": 78, "id": "be697220-da4d-493f-8b7d-de4bad9790a1", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+----------+------+--------+\n", "|order_id| date| item|quantity|\n", "+--------+----------+------+--------+\n", "| 1|2022-01-01| apple| 6|\n", "| 1|2022-01-01|banana| 12|\n", "| 1|2022-01-01| apple| 3|\n", "| 1|2022-01-01|banana| 7|\n", "| 2|2022-01-02| apple| 12|\n", "| 2|2022-01-02| apple| 24|\n", "| 2|2022-01-02|banana| 8|\n", "+--------+----------+------+--------+\n", "\n" ] } ], "source": [ "# Create from Python list of tuple\n", "pdf = spark.createDataFrame(\n", " [\n", " (1, \"2022-01-01\", \"apple\", 6),\n", " (1, \"2022-01-01\", \"banana\", 12),\n", " (1, \"2022-01-01\", \"apple\", 3),\n", " (1, \"2022-01-01\", \"banana\", 7),\n", " (2, \"2022-01-02\", \"apple\", 12),\n", " (2, \"2022-01-02\", \"apple\", 24),\n", " (2, \"2022-01-02\", \"banana\", 8),\n", " ],\n", " (\"order_id\", \"date\", \"item\", \"quantity\")\n", ")\n", "pdf.show()" ] }, { "cell_type": "code", "execution_count": 79, "id": "58755265-6bc4-4c28-89f0-c05eebdf200b", "metadata": {}, "outputs": [], "source": [ "gdf = pdf.groupBy(\"date\", \"item\")" ] }, { "cell_type": "code", "execution_count": 84, "id": "455ce611-8ff9-4bac-8fbb-a21d8b25b47f", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+------+----------------------+-------------+\n", "| date| item|n_orders_has_this_item|sale_quantity|\n", "+----------+------+----------------------+-------------+\n", "|2022-01-01| apple| 2| 9|\n", "|2022-01-01|banana| 2| 19|\n", "|2022-01-02| apple| 2| 36|\n", "|2022-01-02|banana| 1| 8|\n", "+----------+------+----------------------+-------------+\n", "\n" ] } ], "source": [ "gdf.agg(\n", " func.count(pdf.order_id).alias(\"n_orders_has_this_item\"),\n", " func.sum(pdf.quantity).alias(\"sale_quantity\"),\n", ").show()" ] }, { "cell_type": "markdown", "id": "e85b461c-0d37-492e-8bba-f57b32b72c33", "metadata": {}, "source": [ "## Window Function\n", "\n", "在 SQL 中的聚合查询里, 窗口函数是非常重要的功能.\n", "\n", "在下面的例子里, 我们有一个各个部门的雇员的工资数据. 我们希望知道每个部门里工资最高的雇员是谁. 这里的 Window 就是根据 department 分组, 然后我们在 Window 内根据 salary 进行排序, 然后给每一行加上序号, 最后我们只需要取出序号等于 1 的数据即可." ] }, { "cell_type": "code", "execution_count": 85, "id": "886dc958-ef23-4457-ac6c-b3467ae490dc", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+--------+------+\n", "|department|employee|salary|\n", "+----------+--------+------+\n", "| HR| alice| 70000|\n", "| HR| bob| 56000|\n", "| IT| cathy| 68000|\n", "| IT| david| 83000|\n", "+----------+--------+------+\n", "\n" ] } ], "source": [ "# Create from Python list of tuple\n", "pdf = spark.createDataFrame(\n", " [\n", " (\"HR\", \"alice\", 70000),\n", " (\"HR\", \"bob\", 56000),\n", " (\"IT\", \"cathy\", 68000),\n", " (\"IT\", \"david\", 83000),\n", " ],\n", " (\"department\", \"employee\", \"salary\")\n", ")\n", "pdf.show()" ] }, { "cell_type": "code", "execution_count": 93, "id": "508ce7dc-6915-42f4-b8ca-eeaf22c7e97d", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+--------+-------------------+\n", "|department|employee|in_dept_salary_rank|\n", "+----------+--------+-------------------+\n", "| HR| alice| 1|\n", "| HR| bob| 2|\n", "| IT| david| 1|\n", "| IT| cathy| 2|\n", "+----------+--------+-------------------+\n", "\n" ] } ], "source": [ "# 先看看给每一行标上按工资排序的行号是什么样子\n", "from pyspark.sql.window import Window\n", "\n", "pdf.select(\n", " pdf.department,\n", " pdf.employee,\n", " func.row_number().over(\n", " Window.partitionBy(\"department\").orderBy(func.col(\"salary\").desc())\n", " ).alias(\"in_dept_salary_rank\")\n", ").show()" ] }, { "cell_type": "code", "execution_count": 94, "id": "5bac34ae-5a27-42d5-8b9e-df4e356902f1", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------+--------+-------------------+\n", "|department|employee|in_dept_salary_rank|\n", "+----------+--------+-------------------+\n", "| HR| alice| 1|\n", "| IT| david| 1|\n", "+----------+--------+-------------------+\n", "\n" ] } ], "source": [ "# 最后输出数据. 和 SQL 中我们需要用 Sub Query 不同, 我们可以直接用 .filter 来对中间状态的表筛选数据\n", "(\n", " pdf.select(\n", " pdf.department,\n", " pdf.employee,\n", " func.row_number().over(\n", " Window.partitionBy(\"department\").orderBy(func.col(\"salary\").desc())\n", " ).alias(\"in_dept_salary_rank\")\n", " )\n", " .filter(func.col(\"in_dept_salary_rank\") == 1)\n", " .show()\n", ")" ] }, { "cell_type": "code", "execution_count": null, "id": "ae861e36-f2af-4203-9e69-f7ba004aac52", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.5" } }, "nbformat": 4, "nbformat_minor": 5 }