{ "cells": [ { "cell_type": "markdown", "id": "12d609f1-5216-4a08-b205-1d8071e30ea6", "metadata": {}, "source": [ "# Functions\n", "\n", "PySpark 中最为复杂的要属内置的海量函数了." ] }, { "cell_type": "code", "execution_count": 1, "id": "f7460206-7d3e-4874-828b-5d66c77b4de8", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Current directory: /home/jovyan/docs/source/03-Functions\n", "Current Python version: 3.10.5\n", "Current Python interpreter: /opt/conda/bin/python\n", "Current site-packages: ['/opt/conda/lib/python3.10/site-packages']\n" ] } ], "source": [ "import os\n", "import sys\n", "import site\n", "\n", "cwd = os.getcwd()\n", "print(f\"Current directory: {cwd}\")\n", "print(f\"Current Python version: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}\")\n", "print(f\"Current Python interpreter: {sys.executable}\")\n", "print(f\"Current site-packages: {site.getsitepackages()}\")\n", "\n", "sys.path.append(os.path.join(cwd, \"site-packages\"))" ] }, { "cell_type": "code", "execution_count": 2, "id": "9b8aa5fd-2d40-4cf9-85d8-c4df81ac4a0f", "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "

SparkSession - in-memory

\n", " \n", "
\n", "

SparkContext

\n", "\n", "

Spark UI

\n", "\n", "
\n", "
Version
\n", "
v3.3.0
\n", "
Master
\n", "
local[*]
\n", "
AppName
\n", "
pyspark-shell
\n", "
\n", "
\n", " \n", "
\n", " " ], "text/plain": [ "" ] }, "execution_count": 2, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 首先创建一个 Spark Session\n", "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.builder.getOrCreate()\n", "spark" ] }, { "cell_type": "code", "execution_count": 3, "id": "d872c6dd-5d5f-4418-a41c-cc882eac7486", "metadata": {}, "outputs": [], "source": [ "# 然后我们先将这些函数统一导入, 以供后续使用\n", "\n", "import pyspark.sql.functions as func" ] }, { "cell_type": "markdown", "id": "8fb77541-8074-4996-854d-0360a3d4ecd8", "metadata": {}, "source": [ "## Convert Data Type\n", "\n", "PySpark 中对数据类型进行转化的关键函数是 [Column.cast](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Column.cast.html#pyspark.sql.Column.cast). 而你可以在 [这里](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/data_types.html) 找到所有支持的数据类型.\n", "\n", "- str -> int\n", "- int -> str\n", "- str -> date\n", "- date -> str\n", "- int -> datetime\n", "- str -> datetime\n", "\n", "- str: T.StringType, \"string\"\n", "- int: T.IntegerType, \"int\"\n", "- date: T.DateType, \"date\"\n", "- datetime: T.TimestampType, \"tmestamp\"" ] }, { "cell_type": "code", "execution_count": 29, "id": "cafb76de-bd63-4763-b757-0b4bc674aea9", "metadata": {}, "outputs": [], "source": [ "import pyspark.sql.types as T" ] }, { "cell_type": "markdown", "id": "ae791863-ef27-43c5-97ef-ad074af1d377", "metadata": {}, "source": [ "### Int and String\n", "\n", "整数和字符串的相互转化非常常见." ] }, { "cell_type": "code", "execution_count": 42, "id": "75154b97-b347-484f-8968-a6013100a72c", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(a_int=1, a_str='3'), Row(a_int=2, a_str='4')]" ] }, "execution_count": 42, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pdf = spark.createDataFrame(\n", " [\n", " (1, \"3\"),\n", " (2, \"4\"),\n", " ],\n", " (\"a_int\", \"a_str\")\n", ")\n", "pdf.collect()" ] }, { "cell_type": "code", "execution_count": 44, "id": "97dc4bbe-211e-4ef2-80a5-bc6b4359c628", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(new='1'), Row(new='2')]" ] }, "execution_count": 44, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 整数 -> 字符串\n", "pdf.select(\n", " pdf.a_int.astype(T.StringType()).alias(\"new\")\n", ").collect()" ] }, { "cell_type": "code", "execution_count": 45, "id": "7e215b58-2adc-4c9f-871b-8163cd6e2dee", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(new=3), Row(new=4)]" ] }, "execution_count": 45, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 字符串 -> 整数\n", "pdf.select(\n", " pdf.a_str.astype(T.IntegerType()).alias(\"new\")\n", ").collect()" ] }, { "cell_type": "markdown", "id": "e7ec587f-b332-4d4a-afff-a13f4bfc11b5", "metadata": {}, "source": [ "如果有些字符串无法转化为整数, 那么就会返回 ``NULL``" ] }, { "cell_type": "code", "execution_count": 46, "id": "ad4e88c0-5e02-49a9-ae5d-c4441597f8f4", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(a_str='1'), Row(a_str='alice')]" ] }, "execution_count": 46, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pdf = spark.createDataFrame(\n", " [\n", " (\"1\",),\n", " (\"alice\",),\n", " ],\n", " (\"a_str\",)\n", ")\n", "pdf.collect()" ] }, { "cell_type": "code", "execution_count": 48, "id": "23c2fb1a-6dbf-41b0-b611-37dddfe3211d", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(new=1), Row(new=None)]" ] }, "execution_count": 48, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 字符串 -> 整数\n", "pdf.select(\n", " pdf.a_str.astype(\"int\").alias(\"new\")\n", ").collect()" ] }, { "cell_type": "markdown", "id": "f42b7dd3-3c25-474e-a425-0f3d1f2f7947", "metadata": {}, "source": [ "### Date and Str and Integer\n", "\n", "日期和字符串是可以来回转换的. 在 Python 中日期可以转换成从 1970-01-01 开始的天数, 以节约空间. 但是 PySpark 没有这个的内置支持." ] }, { "cell_type": "code", "execution_count": 75, "id": "22b62dd4-53a7-4b96-a57b-bb8593a8216a", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(a_str='2022-01-01', a_date=datetime.date(2022, 1, 1)),\n", " Row(a_str='2022-01-02', a_date=datetime.date(2022, 1, 2))]" ] }, "execution_count": 75, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from datetime import date\n", "\n", "pdf = spark.createDataFrame(\n", " [\n", " (\"2022-01-01\", date(2022, 1, 1)),\n", " (\"2022-01-02\", date(2022, 1, 2)),\n", " ],\n", " (\"a_str\", \"a_date\")\n", ")\n", "pdf.collect()" ] }, { "cell_type": "code", "execution_count": 76, "id": "ae861e36-f2af-4203-9e69-f7ba004aac52", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(new='2022-01-01'), Row(new='2022-01-02')]" ] }, "execution_count": 76, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 日期 -> 字符串\n", "pdf.select(\n", " pdf.a_date.astype(T.StringType()).alias(\"new\")\n", ").collect()" ] }, { "cell_type": "code", "execution_count": 83, "id": "72ec2197-f009-47fd-9f2f-9c0abaf46e56", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(new=None), Row(new=None)]" ] }, "execution_count": 83, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 日期 -> 字符串\n", "pdf.select(\n", " pdf.a_date.astype(T.IntegerType()).alias(\"new\")\n", ").collect()" ] }, { "cell_type": "code", "execution_count": 78, "id": "a6b0dad2-b9a7-41cd-baba-a555a0e274fb", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(new=datetime.date(2022, 1, 1)), Row(new=datetime.date(2022, 1, 2))]" ] }, "execution_count": 78, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 字符串 -> 日期\n", "pdf.select(\n", " pdf.a_str.astype(T.DateType()).alias(\"new\")\n", ").collect()" ] }, { "cell_type": "markdown", "id": "d8fa79d7-4dfb-4e34-bb86-1ed4e60745f4", "metadata": {}, "source": [ "### Datetime and Str and Integer\n", "\n", "Python 中的时间对象是可以和字符串和数字相互转换的. 如果是整数则是精度为秒的时间戳. 如果是 Double (Float 不行) 则是精度为微秒 (10 负 6 次方) 的时间戳. PySpark 中的时间默认都是 UTC 时间." ] }, { "cell_type": "code", "execution_count": 84, "id": "a4f35634-72e0-4ee0-a389-bb8f194e55ce", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(a_str='2022-01-01 08:00:00.123', a_dt=datetime.datetime(2022, 1, 1, 8, 0, 0, 123)),\n", " Row(a_str='2022-01-02 08:00:00.123456', a_dt=datetime.datetime(2022, 1, 2, 8, 0, 0, 123456))]" ] }, "execution_count": 84, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from datetime import datetime, timezone\n", "\n", "pdf = spark.createDataFrame(\n", " [\n", " (\"2022-01-01 08:00:00.123\", datetime(2022, 1, 1, 8, 0, 0, 123, tzinfo=timezone.utc)),\n", " (\"2022-01-02 08:00:00.123456\", datetime(2022, 1, 2, 8, 0, 0, 123456, tzinfo=timezone.utc)),\n", " ],\n", " (\"a_str\", \"a_dt\")\n", ")\n", "pdf.collect()" ] }, { "cell_type": "code", "execution_count": 85, "id": "e26ab454-5b89-4500-b2af-f50443775b26", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(new='2022-01-01 08:00:00.000123'), Row(new='2022-01-02 08:00:00.123456')]" ] }, "execution_count": 85, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 时间 -> 字符串\n", "pdf.select(\n", " pdf.a_dt.astype(T.StringType()).alias(\"new\")\n", ").collect()" ] }, { "cell_type": "code", "execution_count": 86, "id": "f36322cd-b983-454c-b684-4757156da591", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(new=1641024000), Row(new=1641110400)]" ] }, "execution_count": 86, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 时间 -> 整数, 精度为秒\n", "pdf.select(\n", " pdf.a_dt.astype(T.IntegerType()).alias(\"new\")\n", ").collect()" ] }, { "cell_type": "code", "execution_count": 87, "id": "b401686f-fdcf-4b8f-851e-7c90fca4a6b8", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(new=1641024000.000123), Row(new=1641110400.123456)]" ] }, "execution_count": 87, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 时间 -> 整数, 精度为微秒\n", "pdf.select(\n", " pdf.a_dt.astype(T.DoubleType()).alias(\"new\")\n", ").collect()" ] }, { "cell_type": "code", "execution_count": 88, "id": "ec8c9779-4f2d-4a5e-af3d-a72d21bebca1", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(new=datetime.datetime(2022, 1, 1, 8, 0, 0, 123000)),\n", " Row(new=datetime.datetime(2022, 1, 2, 8, 0, 0, 123456))]" ] }, "execution_count": 88, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 字符串 -> 时间\n", "pdf.select(\n", " pdf.a_str.astype(T.TimestampType()).alias(\"new\")\n", ").collect()" ] }, { "cell_type": "markdown", "id": "786c04a9-a3b8-48f2-868b-d95be5fd3718", "metadata": {}, "source": [ "### Int and Boolean\n", "\n", "在 Python 中整数是可以有布尔值含义的. 0 是 False, 非 0 数都是 True. 反过来 True 是 1, False 是 0." ] }, { "cell_type": "code", "execution_count": 57, "id": "ac22e57a-714c-4aab-b01b-87e96d4bc443", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(a_int=0, a_bool=False),\n", " Row(a_int=1, a_bool=True),\n", " Row(a_int=2, a_bool=None),\n", " Row(a_int=-1, a_bool=None)]" ] }, "execution_count": 57, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pdf = spark.createDataFrame(\n", " [\n", " (0, False,),\n", " (1, True,),\n", " (2, None),\n", " (-1, None),\n", " ],\n", " (\"a_int\", \"a_bool\")\n", ")\n", "pdf.collect()" ] }, { "cell_type": "code", "execution_count": 58, "id": "892ed3fd-5cd5-4fb8-981c-67583f0576f2", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(new=False), Row(new=True), Row(new=True), Row(new=True)]" ] }, "execution_count": 58, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 整数 -> 布尔值, 跟 Python 一样, 0 是 False, 其他都是 True\n", "pdf.select(\n", " pdf.a_int.astype(T.BooleanType()).alias(\"new\")\n", ").collect()" ] }, { "cell_type": "code", "execution_count": 60, "id": "a2864145-0ffb-4d09-9ca5-e53ad8343251", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(new=0), Row(new=1), Row(new=None), Row(new=None)]" ] }, "execution_count": 60, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 布尔值 -> 整数, 跟 Python 一样, True 是 1, False 是 0\n", "pdf.select(\n", " pdf.a_bool.astype(T.IntegerType()).alias(\"new\")\n", ").collect()" ] }, { "cell_type": "markdown", "id": "63e283b5-0a87-4ab2-ac93-0aa1ee3b24f2", "metadata": {}, "source": [ "### Int and Float\n", "\n", "整数和小数之间的转换跟 Python 中的逻辑一致. 整数 -> 小数 后面加 ``.0``. 小数 -> 整数 则是向下取整." ] }, { "cell_type": "code", "execution_count": 95, "id": "9ad0a308-25fb-44f7-9581-117a93e9f0d4", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(a_int=1, a_float=3.14),\n", " Row(a_int=2, a_float=2.0),\n", " Row(a_int=3, a_float=2.718),\n", " Row(a_int=0, a_float=9.999999)]" ] }, "execution_count": 95, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pdf = spark.createDataFrame(\n", " [\n", " (1, 3.14),\n", " (2, 2.0),\n", " (3, 2.718),\n", " (0, 9.999999),\n", " ],\n", " (\"a_int\", \"a_float\")\n", ")\n", "pdf.collect()" ] }, { "cell_type": "code", "execution_count": 96, "id": "8f547b9c-0c33-4902-bc5a-a1da81e23d9e", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(new=1.0), Row(new=2.0), Row(new=3.0), Row(new=0.0)]" ] }, "execution_count": 96, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 布尔值 -> 整数, 跟 Python 一样, True 是 1, False 是 0\n", "pdf.select(\n", " pdf.a_int.astype(T.FloatType()).alias(\"new\")\n", ").collect()" ] }, { "cell_type": "code", "execution_count": 97, "id": "bb10a3eb-5568-4713-92a2-f903790e0f87", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(new=3), Row(new=2), Row(new=2), Row(new=9)]" ] }, "execution_count": 97, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 布尔值 -> 整数, 跟 Python 一样, True 是 1, False 是 0\n", "pdf.select(\n", " pdf.a_float.astype(T.IntegerType()).alias(\"new\")\n", ").collect()" ] }, { "cell_type": "markdown", "id": "7837bcdb-a5e2-44ae-b222-290fe2061f9b", "metadata": {}, "source": [ "## String Functions\n", "\n", "PySpark 内置了许多对字符串的处理函数. 和 Python 标准库里的 str 类似, 基本上都能找到一一对应的函数." ] }, { "cell_type": "markdown", "id": "9c5cd702-b7b4-4cb4-8748-443fbd717f86", "metadata": {}, "source": [ "### Concatenant\n", "\n", "拼接字符串. 对于数据类型不是字符串的 Column 会先转化成字符串." ] }, { "cell_type": "code", "execution_count": 129, "id": "2f68990e-e720-4c94-ba52-d222a23614ca", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----------------+\n", "| new|\n", "+----------------+\n", "|a-b-1-2022-01-01|\n", "+----------------+\n", "\n" ] } ], "source": [ "pdf = spark.createDataFrame(\n", " [\n", " (\"a\", \"b\", 1, date(2022, 1, 1)),\n", " ],\n", " (\"col1\", \"col2\", \"col3\", \"col4\")\n", ")\n", "pdf.select(\n", " func.concat_ws(\"-\", pdf.col1, pdf.col2, pdf.col3, pdf.col4).alias(\"new\")\n", ").show()\n" ] }, { "cell_type": "markdown", "id": "19d7ba06-8711-476c-abf5-ad5aa5ffa426", "metadata": {}, "source": [ "### Base64 Encoding\n", "\n", "对数据进行 Base64 编码. 常用于把被序列化后的数据编码成字符串通过网络传输" ] }, { "cell_type": "code", "execution_count": 131, "id": "f597b666-15dc-439b-9f5e-c603050200ba", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------------+------------+------------------------+\n", "|base64(col1)|base64(col2)|base64(col3) |\n", "+------------+------------+------------------------+\n", "|aGVsbG8= |aGVsbG8= |eyJrZXkiOiAidmFsdWUifQ==|\n", "+------------+------------+------------------------+\n", "\n" ] } ], "source": [ "import json\n", "\n", "pdf = spark.createDataFrame(\n", " [\n", " (b\"hello\", \"hello\", json.dumps({\"key\": \"value\"})),\n", " ],\n", " (\"col1\", \"col2\", \"col3\")\n", ")\n", "pdf.select(\n", " func.base64(pdf.col1),\n", " func.base64(pdf.col2),\n", " func.base64(pdf.col3),\n", ").show(truncate=False)\n" ] }, { "cell_type": "markdown", "id": "c1e4d06f-aa88-44f6-8841-80b0124c662a", "metadata": {}, "source": [ "### Format String\n", "\n", "和 Python 中的 F-String Template 类似, 使用格式化字符串输出." ] }, { "cell_type": "code", "execution_count": 120, "id": "b0c7840e-a551-4d43-9134-ddc6b6c27845", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+\n", "|new|\n", "+---+\n", "|a-1|\n", "+---+\n", "\n" ] } ], "source": [ "pdf = spark.createDataFrame(\n", " [\n", " (\"a\", 1),\n", " ],\n", " (\"key\", \"value\")\n", ")\n", "pdf.select(\n", " func.format_string(\"%s-%s\", pdf.key, pdf.value).alias(\"new\"),\n", ").show()\n" ] }, { "cell_type": "markdown", "id": "a8fc0457-e6e7-47f8-9953-cada6b9f20c4", "metadata": {}, "source": [ "### SubString Replace\n", "\n", "对子字符串进行替换. 和 Python 中的 ``str.replace(...)`` 类似" ] }, { "cell_type": "code", "execution_count": 123, "id": "84c33cd9-74e6-4608-838a-6f0b1a96b46b", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------------------+\n", "|new |\n", "+---------------------+\n", "|/home/bob/config.json|\n", "+---------------------+\n", "\n" ] } ], "source": [ "pdf = spark.createDataFrame(\n", " [\n", " (\"/home/alice/config.json\",),\n", " ],\n", " (\"col\", )\n", ")\n", "pdf.select(\n", " func.regexp_replace(pdf.col, \"/home/alice\", \"/home/bob\").alias(\"new\"),\n", ").show(truncate=False)" ] }, { "cell_type": "markdown", "id": "0cc5009d-89fa-4dd8-b5ae-8751bd8b4461", "metadata": {}, "source": [ "### Padding\n", "\n", "对字符串左边或者右边填补. 通常用于确保数字拥有同样的字符串长度, 以用于排序." ] }, { "cell_type": "code", "execution_count": 125, "id": "91872e84-e26f-47f7-9299-37785dd81695", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------+\n", "| new|\n", "+------+\n", "|000001|\n", "|000002|\n", "+------+\n", "\n" ] } ], "source": [ "pdf = spark.createDataFrame(\n", " [\n", " (1,),\n", " (2,),\n", " ],\n", " (\"col\", )\n", ")\n", "pdf.select(\n", " func.lpad(pdf.col, 6, \"0\").alias(\"new\"),\n", ").show()" ] }, { "cell_type": "markdown", "id": "4ffb2969-8424-4138-8570-72200456aa87", "metadata": {}, "source": [ "## Collection / Array / Map Functions" ] }, { "cell_type": "code", "execution_count": 135, "id": "aeb8881f-1ba5-4912-98e9-2fdbe5130f83", "metadata": {}, "outputs": [], "source": [ "pdf = spark.createDataFrame(\n", " [\n", " (1, [11, 12], [\"a\", \"b\"]),\n", " (2, [21, 22], [\"c\", \"d\"]),\n", " ],\n", " (\"id\", \"int_arr\", \"str_arr\")\n", ")" ] }, { "cell_type": "code", "execution_count": 138, "id": "ca894261-7cd3-45a1-bd3d-2ccf4a6a4005", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-----+\n", "| new|\n", "+-----+\n", "| true|\n", "|false|\n", "+-----+\n", "\n", "+-----+\n", "| new|\n", "+-----+\n", "|false|\n", "| true|\n", "+-----+\n", "\n" ] } ], "source": [ "pdf.select(\n", " func.array_contains(pdf.int_arr, 11).alias(\"new\"),\n", ").show()\n", "\n", "pdf.select(\n", " func.array_contains(pdf.str_arr, \"d\").alias(\"new\"),\n", ").show()" ] }, { "cell_type": "code", "execution_count": 142, "id": "766f3909-181e-41fa-9d88-91a8ff59178e", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+\n", "| new|\n", "+----+\n", "|true|\n", "+----+\n", "\n" ] } ], "source": [ "pdf = spark.createDataFrame(\n", " [\n", " ([1, 2], [2, 3])\n", " ],\n", " (\"c1\", \"c2\")\n", ")\n", "pdf.select(\n", " func.arrays_overlap(\n", " pdf.c1, \n", " pdf.c2,\n", " ).alias(\"new\"),\n", ").show()\n" ] }, { "cell_type": "code", "execution_count": 145, "id": "7afe6314-f3dc-40e2-a989-caef63ea5faf", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------+\n", "| new|\n", "+---------+\n", "|[1, 2, 3]|\n", "+---------+\n", "\n", "+---------+\n", "| new|\n", "+---------+\n", "|[3, 4, 5]|\n", "+---------+\n", "\n" ] } ], "source": [ "pdf = spark.createDataFrame(\n", " [\n", " ([1, 2, 3, 4, 5],)\n", " ],\n", " (\"c\",)\n", ")\n", "\n", "pdf.select(\n", " func.slice(\n", " pdf.c, \n", " 1,\n", " 3,\n", " ).alias(\"new\"),\n", ").show()\n", "\n", "pdf.select(\n", " func.slice(\n", " pdf.c, \n", " 3,\n", " 5,\n", " ).alias(\"new\"),\n", ").show()\n" ] }, { "cell_type": "markdown", "id": "cf9adfcc-7683-485f-87c2-1a547e9099de", "metadata": {}, "source": [ "### array_join" ] }, { "cell_type": "code", "execution_count": 149, "id": "416e8c1e-4532-40f7-9edd-0e004f0ca081", "metadata": {}, "outputs": [], "source": [ "pdf = spark.createDataFrame(\n", " [\n", " ([\"a\", None, \"c\"],)\n", " ],\n", " (\"c\",)\n", ")" ] }, { "cell_type": "code", "execution_count": 150, "id": "6a8a0d30-5fa7-4d61-ae31-9cbe1912df55", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+\n", "|new|\n", "+---+\n", "|a-c|\n", "+---+\n", "\n" ] } ], "source": [ "pdf.select(\n", " func.array_join(\n", " pdf.c, \n", " \"-\",\n", " ).alias(\"new\"),\n", ").show()" ] }, { "cell_type": "code", "execution_count": 151, "id": "f131246a-74ca-4e55-a0e7-65d36c685e9a", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+\n", "| new|\n", "+--------+\n", "|a-none-c|\n", "+--------+\n", "\n" ] } ], "source": [ "pdf.select(\n", " func.array_join(\n", " pdf.c, \n", " \"-\",\n", " \"none\",\n", " ).alias(\"new\"),\n", ").show()\n" ] }, { "cell_type": "markdown", "id": "cd14e062-90f1-4ddc-ad79-465f752f498a", "metadata": {}, "source": [ "### Set Operations\n", "\n", "对集合的操作有:\n", "\n", "- 交 ([Intersect](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.array_union.html#pyspark.sql.functions.array_intersect)): 两者的共同部分\n", "- 并 ([Union](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.array_union.html#pyspark.sql.functions.array_union)): 两者合起来\n", "- 补 ([Except](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.array_union.html#pyspark.sql.functions.array_except)): 从一个集合里移除另一个集合的元素\n", "\n", "**首先我们来看两个列都是 Array 的情况**.\n", "\n", "在 Spark 中并没有专门为 array 和 set 设计数据结构. 而在用 Spark 集合操作函数的时候, 会自动先对其去重." ] }, { "cell_type": "code", "execution_count": 172, "id": "f7379faf-2c7f-4fd4-9567-69b69fca6b33", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------+---------+\n", "| c1| c2|\n", "+---------+---------+\n", "|[a, b, b]|[b, b, c]|\n", "+---------+---------+\n", "\n" ] } ], "source": [ "pdf = spark.createDataFrame(\n", " [\n", " ([\"a\", \"b\", \"b\"], [\"b\", \"b\", \"c\"])\n", " ],\n", " (\"c1\", \"c2\"),\n", ")\n", "pdf.show()" ] }, { "cell_type": "code", "execution_count": 164, "id": "73a1ce37-0d2b-4b5a-a5be-405517c09337", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---------+\n", "| new|\n", "+---------+\n", "|[a, b, c]|\n", "+---------+\n", "\n" ] } ], "source": [ "pdf.select(\n", " func.array_union(\n", " pdf.c1,\n", " pdf.c2,\n", " ).alias(\"new\"),\n", ").show()" ] }, { "cell_type": "code", "execution_count": 165, "id": "66d56f63-b397-4336-88a9-06474b41bacc", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+\n", "|new|\n", "+---+\n", "|[b]|\n", "+---+\n", "\n" ] } ], "source": [ "pdf.select(\n", " func.array_intersect(\n", " pdf.c1,\n", " pdf.c2,\n", " ).alias(\"new\"),\n", ").show()" ] }, { "cell_type": "code", "execution_count": 166, "id": "7d7b1b73-7c25-439b-ae32-833f035ba34c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+\n", "|new|\n", "+---+\n", "|[a]|\n", "+---+\n", "\n" ] } ], "source": [ "pdf.select(\n", " func.array_except(\n", " pdf.c1,\n", " pdf.c2,\n", " ).alias(\"new\"),\n", ").show()" ] }, { "cell_type": "code", "execution_count": 167, "id": "4e7c6184-b10d-4c13-9156-7fecbde1773a", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+\n", "|new|\n", "+---+\n", "|[c]|\n", "+---+\n", "\n" ] } ], "source": [ "pdf.select(\n", " func.array_except(\n", " pdf.c2,\n", " pdf.c1,\n", " ).alias(\"new\"),\n", ").show()" ] }, { "cell_type": "markdown", "id": "116b8b25-e6d6-41ce-b0c6-c492f2c73d8e", "metadata": {}, "source": [ "**然后我们来看两个列都是字符串, 把列中的所有元素看成一个 Array 进行集合运算**\n", "\n", "对于这种情况有两种方法.\n", "\n", "**方法 1** 是先用 [collect_set](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.collect_set.html#pyspark.sql.functions.collect_set) 函数将列转化成 Array 并去重, 于是一个 M 行 2 列的 DataFrame 就变成了一个 1 行 2 列的 DataFrame, 也就是之前的情况. 接下来用 ``array_union, array_intersect, array_except`` 函数就可以了." ] }, { "cell_type": "code", "execution_count": 193, "id": "90631827-38e5-4475-a71c-aae5290dfc8c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+---+\n", "| c1| c2|\n", "+---+---+\n", "| a| b|\n", "| b| b|\n", "| b| c|\n", "+---+---+\n", "\n" ] } ], "source": [ "pdf = spark.createDataFrame(\n", " [\n", " (\"a\", \"b\"),\n", " (\"b\", \"b\"),\n", " (\"b\", \"c\"),\n", " ],\n", " (\"c1\", \"c2\"),\n", ")\n", "pdf.show()" ] }, { "cell_type": "code", "execution_count": 196, "id": "eff38f9f-2028-4589-ba6a-551ca739d7b0", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------+------+\n", "|c1_arr|c2_arr|\n", "+------+------+\n", "|[b, a]|[c, b]|\n", "+------+------+\n", "\n" ] } ], "source": [ "pdf1 = pdf.select(\n", " func.collect_set(pdf.c1).alias(\"c1_arr\"),\n", " func.collect_set(pdf.c2).alias(\"c2_arr\"),\n", ").show()" ] }, { "cell_type": "markdown", "id": "f18a7067-0920-4afc-b816-59b410b015b1", "metadata": {}, "source": [ "**方法 2** 是利把一个 DataFrame 中的两列变成两个 DataFrame, 然后用 JOIN 进行计算." ] }, { "cell_type": "code", "execution_count": 197, "id": "72813579-cfe0-4b61-b6ff-b6637bb57665", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+\n", "| c1|\n", "+---+\n", "| a|\n", "| b|\n", "| b|\n", "+---+\n", "\n", "+---+\n", "| c2|\n", "+---+\n", "| b|\n", "| b|\n", "| c|\n", "+---+\n", "\n" ] } ], "source": [ "pdf1 = spark.createDataFrame(\n", " [\n", " (\"a\", ),\n", " (\"b\", ),\n", " (\"b\", ),\n", " ],\n", " (\"c1\", ),\n", ")\n", "pdf1.show()\n", "\n", "pdf2 = spark.createDataFrame(\n", " [\n", " (\"b\", ),\n", " (\"b\", ),\n", " (\"c\", ),\n", " ],\n", " (\"c2\", ),\n", ")\n", "pdf2.show()" ] }, { "cell_type": "code", "execution_count": 198, "id": "1beba078-e0ac-48d3-ad2a-134b14b304f9", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+\n", "| c1|\n", "+---+\n", "| a|\n", "| b|\n", "+---+\n", "\n", "+---+\n", "| c2|\n", "+---+\n", "| b|\n", "| c|\n", "+---+\n", "\n" ] } ], "source": [ "# 先对行进行去重\n", "pdf11 = pdf1.drop_duplicates([\"c1\",])\n", "pdf11.show()\n", "\n", "pdf22 = pdf2.drop_duplicates([\"c2\",])\n", "pdf22.show()" ] }, { "cell_type": "code", "execution_count": 199, "id": "c01eb954-0832-4052-9f97-20c2780ed088", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+\n", "| c1|\n", "+---+\n", "| b|\n", "+---+\n", "\n" ] } ], "source": [ "# inner join 等于 intersection\n", "(\n", " pdf11.join(\n", " pdf22,\n", " on=pdf11.c1==pdf22.c2,\n", " how=\"inner\"\n", " ).select(\"c1\").show()\n", ")" ] }, { "cell_type": "code", "execution_count": 205, "id": "40825015-d83b-4eea-964a-b80b9bc817fe", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+----+----+\n", "| c1| c2|\n", "+----+----+\n", "| a|null|\n", "| b| b|\n", "|null| c|\n", "+----+----+\n", "\n" ] } ], "source": [ "# outer join 等于 union, 但是值会分布在两个列中, 你需要 collect_list 然后再集合\n", "(\n", " pdf11.join(\n", " pdf22,\n", " on=pdf11.c1==pdf22.c2,\n", " how=\"outer\"\n", " ).select(\"c1\", \"c2\").show()\n", ")" ] }, { "cell_type": "code", "execution_count": 203, "id": "59ca21eb-cd63-42fd-ad7d-b10fe7eea9e1", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+\n", "| c1|\n", "+---+\n", "| a|\n", "+---+\n", "\n" ] } ], "source": [ "# left_anti 相当于 except\n", "# 从 c1 里删除 c2 中的元素\n", "(\n", " pdf11.join(\n", " pdf22,\n", " on=pdf11.c1==pdf22.c2,\n", " how=\"left_anti\"\n", " ).select(\"c1\").show()\n", ")" ] }, { "cell_type": "code", "execution_count": 204, "id": "0a991a2f-487a-4c49-b74d-3674ddaca070", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+\n", "| c2|\n", "+---+\n", "| c|\n", "+---+\n", "\n" ] } ], "source": [ "# 从 c2 里删除 c1 中的元素\n", "(\n", " pdf22.join(\n", " pdf11,\n", " on=pdf22.c2==pdf11.c1,\n", " how=\"left_anti\"\n", " ).select(\"c2\").show()\n", ")" ] }, { "cell_type": "markdown", "id": "e5f44de8-0465-4d4c-bb11-f973451081fa", "metadata": {}, "source": [ "### Deduplicate / Distinct" ] }, { "cell_type": "code", "execution_count": 229, "id": "0f9e83f3-59b3-4b33-a331-6eb25fed4a9e", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+\n", "|col|\n", "+---+\n", "| a|\n", "| b|\n", "| b|\n", "| c|\n", "| c|\n", "| c|\n", "| d|\n", "| d|\n", "| e|\n", "+---+\n", "\n" ] } ], "source": [ "pdf = spark.createDataFrame(\n", " [\n", " (\"a\", ),\n", " (\"b\", ),\n", " (\"b\", ),\n", " (\"c\", ),\n", " (\"c\", ),\n", " (\"c\", ),\n", " (\"d\", ),\n", " (\"d\", ),\n", " (\"e\", ),\n", " ],\n", " (\"col\", ),\n", ")\n", "pdf.show()" ] }, { "cell_type": "code", "execution_count": 230, "id": "271483ba-a2fd-4cbc-8352-e7a8b9633a04", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+\n", "|col|\n", "+---+\n", "| b|\n", "| a|\n", "| c|\n", "| e|\n", "| d|\n", "+---+\n", "\n" ] } ], "source": [ "pdf.drop_duplicates([\"col\"]).show()" ] }, { "cell_type": "code", "execution_count": 231, "id": "70d555ef-40be-424f-aa9b-faccfd9bb904", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+-------------------+\n", "|count(DISTINCT col)|\n", "+-------------------+\n", "| 5|\n", "+-------------------+\n", "\n" ] } ], "source": [ "pdf.select(\n", " func.count_distinct(pdf.col),\n", ").show()" ] }, { "cell_type": "code", "execution_count": 226, "id": "d326fe4b-b33a-4924-8189-0470d550019e", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-----+\n", "|key|value|\n", "+---+-----+\n", "| a| 1|\n", "| b| 21|\n", "| b| 22|\n", "| c| 3|\n", "+---+-----+\n", "\n" ] } ], "source": [ "pdf = spark.createDataFrame(\n", " [\n", " (\"a\", 1),\n", " (\"b\", 21),\n", " (\"b\", 22),\n", " (\"c\", 3),\n", " ],\n", " (\"key\", \"value\"),\n", ")\n", "pdf.show()" ] }, { "cell_type": "code", "execution_count": 227, "id": "3d8b42d6-64aa-45dc-af0e-13585cf95414", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-----+\n", "|key|value|\n", "+---+-----+\n", "| a| 1|\n", "| b| 21|\n", "| b| 22|\n", "| c| 3|\n", "+---+-----+\n", "\n" ] } ], "source": [ "# 默认情况下所有列中的数据都一样的时候才会被视为重复数据\n", "pdf.drop_duplicates().show()" ] }, { "cell_type": "code", "execution_count": 228, "id": "9be61db8-c1e2-43a1-a695-18e9f69b666c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+-----+\n", "|key|value|\n", "+---+-----+\n", "| a| 1|\n", "| b| 21|\n", "| c| 3|\n", "+---+-----+\n", "\n" ] } ], "source": [ "# 可以只基于某个列去重, 但是由于 Partition 的机制, 如果基于的列不是 Partition Key \n", "# 那么 DataFrame 中的 Row 的顺序将无法保证得到保留\n", "pdf.drop_duplicates([\"key\"]).show()" ] }, { "cell_type": "code", "execution_count": null, "id": "66c320a6-5e00-448d-843c-6fad30f51aab", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.5" } }, "nbformat": 4, "nbformat_minor": 5 }