Functions

PySpark 中最为复杂的要属内置的海量函数了.

[1]:
import os
import sys
import site

cwd = os.getcwd()
print(f"Current directory: {cwd}")
print(f"Current Python version: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}")
print(f"Current Python interpreter: {sys.executable}")
print(f"Current site-packages: {site.getsitepackages()}")

sys.path.append(os.path.join(cwd, "site-packages"))
Current directory: /home/jovyan/docs/source/03-Functions
Current Python version: 3.10.5
Current Python interpreter: /opt/conda/bin/python
Current site-packages: ['/opt/conda/lib/python3.10/site-packages']
[2]:
# 首先创建一个 Spark Session
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark
[2]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.3.0
Master
local[*]
AppName
pyspark-shell
[3]:
# 然后我们先将这些函数统一导入, 以供后续使用

import pyspark.sql.functions as func

Convert Data Type

PySpark 中对数据类型进行转化的关键函数是 Column.cast. 而你可以在 这里 找到所有支持的数据类型.

  • str -> int

  • int -> str

  • str -> date

  • date -> str

  • int -> datetime

  • str -> datetime

  • str: T.StringType, “string”

  • int: T.IntegerType, “int”

  • date: T.DateType, “date”

  • datetime: T.TimestampType, “tmestamp”

[29]:
import pyspark.sql.types as T

Int and String

整数和字符串的相互转化非常常见.

[42]:
pdf = spark.createDataFrame(
    [
        (1, "3"),
        (2, "4"),
    ],
    ("a_int", "a_str")
)
pdf.collect()
[42]:
[Row(a_int=1, a_str='3'), Row(a_int=2, a_str='4')]
[44]:
# 整数 -> 字符串
pdf.select(
    pdf.a_int.astype(T.StringType()).alias("new")
).collect()
[44]:
[Row(new='1'), Row(new='2')]
[45]:
# 字符串 -> 整数
pdf.select(
    pdf.a_str.astype(T.IntegerType()).alias("new")
).collect()
[45]:
[Row(new=3), Row(new=4)]

如果有些字符串无法转化为整数, 那么就会返回 NULL

[46]:
pdf = spark.createDataFrame(
    [
        ("1",),
        ("alice",),
    ],
    ("a_str",)
)
pdf.collect()
[46]:
[Row(a_str='1'), Row(a_str='alice')]
[48]:
# 字符串 -> 整数
pdf.select(
    pdf.a_str.astype("int").alias("new")
).collect()
[48]:
[Row(new=1), Row(new=None)]

Date and Str and Integer

日期和字符串是可以来回转换的. 在 Python 中日期可以转换成从 1970-01-01 开始的天数, 以节约空间. 但是 PySpark 没有这个的内置支持.

[75]:
from datetime import date

pdf = spark.createDataFrame(
    [
        ("2022-01-01", date(2022, 1, 1)),
        ("2022-01-02", date(2022, 1, 2)),
    ],
    ("a_str", "a_date")
)
pdf.collect()
[75]:
[Row(a_str='2022-01-01', a_date=datetime.date(2022, 1, 1)),
 Row(a_str='2022-01-02', a_date=datetime.date(2022, 1, 2))]
[76]:
# 日期 -> 字符串
pdf.select(
    pdf.a_date.astype(T.StringType()).alias("new")
).collect()
[76]:
[Row(new='2022-01-01'), Row(new='2022-01-02')]
[83]:
# 日期 -> 字符串
pdf.select(
    pdf.a_date.astype(T.IntegerType()).alias("new")
).collect()
[83]:
[Row(new=None), Row(new=None)]
[78]:
# 字符串 -> 日期
pdf.select(
    pdf.a_str.astype(T.DateType()).alias("new")
).collect()
[78]:
[Row(new=datetime.date(2022, 1, 1)), Row(new=datetime.date(2022, 1, 2))]

Datetime and Str and Integer

Python 中的时间对象是可以和字符串和数字相互转换的. 如果是整数则是精度为秒的时间戳. 如果是 Double (Float 不行) 则是精度为微秒 (10 负 6 次方) 的时间戳. PySpark 中的时间默认都是 UTC 时间.

[84]:
from datetime import datetime, timezone

pdf = spark.createDataFrame(
    [
        ("2022-01-01 08:00:00.123", datetime(2022, 1, 1, 8, 0, 0, 123, tzinfo=timezone.utc)),
        ("2022-01-02 08:00:00.123456", datetime(2022, 1, 2, 8, 0, 0, 123456, tzinfo=timezone.utc)),
    ],
    ("a_str", "a_dt")
)
pdf.collect()
[84]:
[Row(a_str='2022-01-01 08:00:00.123', a_dt=datetime.datetime(2022, 1, 1, 8, 0, 0, 123)),
 Row(a_str='2022-01-02 08:00:00.123456', a_dt=datetime.datetime(2022, 1, 2, 8, 0, 0, 123456))]
[85]:
# 时间 -> 字符串
pdf.select(
    pdf.a_dt.astype(T.StringType()).alias("new")
).collect()
[85]:
[Row(new='2022-01-01 08:00:00.000123'), Row(new='2022-01-02 08:00:00.123456')]
[86]:
# 时间 -> 整数, 精度为秒
pdf.select(
    pdf.a_dt.astype(T.IntegerType()).alias("new")
).collect()
[86]:
[Row(new=1641024000), Row(new=1641110400)]
[87]:
# 时间 -> 整数, 精度为微秒
pdf.select(
    pdf.a_dt.astype(T.DoubleType()).alias("new")
).collect()
[87]:
[Row(new=1641024000.000123), Row(new=1641110400.123456)]
[88]:
# 字符串 -> 时间
pdf.select(
    pdf.a_str.astype(T.TimestampType()).alias("new")
).collect()
[88]:
[Row(new=datetime.datetime(2022, 1, 1, 8, 0, 0, 123000)),
 Row(new=datetime.datetime(2022, 1, 2, 8, 0, 0, 123456))]

Int and Boolean

在 Python 中整数是可以有布尔值含义的. 0 是 False, 非 0 数都是 True. 反过来 True 是 1, False 是 0.

[57]:
pdf = spark.createDataFrame(
    [
        (0, False,),
        (1, True,),
        (2, None),
        (-1, None),
    ],
    ("a_int", "a_bool")
)
pdf.collect()
[57]:
[Row(a_int=0, a_bool=False),
 Row(a_int=1, a_bool=True),
 Row(a_int=2, a_bool=None),
 Row(a_int=-1, a_bool=None)]
[58]:
# 整数 -> 布尔值, 跟 Python 一样, 0 是 False, 其他都是 True
pdf.select(
    pdf.a_int.astype(T.BooleanType()).alias("new")
).collect()
[58]:
[Row(new=False), Row(new=True), Row(new=True), Row(new=True)]
[60]:
# 布尔值 -> 整数, 跟 Python 一样, True 是 1, False 是 0
pdf.select(
    pdf.a_bool.astype(T.IntegerType()).alias("new")
).collect()
[60]:
[Row(new=0), Row(new=1), Row(new=None), Row(new=None)]

Int and Float

整数和小数之间的转换跟 Python 中的逻辑一致. 整数 -> 小数 后面加 .0. 小数 -> 整数 则是向下取整.

[95]:
pdf = spark.createDataFrame(
    [
        (1, 3.14),
        (2, 2.0),
        (3, 2.718),
        (0, 9.999999),
    ],
    ("a_int", "a_float")
)
pdf.collect()
[95]:
[Row(a_int=1, a_float=3.14),
 Row(a_int=2, a_float=2.0),
 Row(a_int=3, a_float=2.718),
 Row(a_int=0, a_float=9.999999)]
[96]:
# 布尔值 -> 整数, 跟 Python 一样, True 是 1, False 是 0
pdf.select(
    pdf.a_int.astype(T.FloatType()).alias("new")
).collect()
[96]:
[Row(new=1.0), Row(new=2.0), Row(new=3.0), Row(new=0.0)]
[97]:
# 布尔值 -> 整数, 跟 Python 一样, True 是 1, False 是 0
pdf.select(
    pdf.a_float.astype(T.IntegerType()).alias("new")
).collect()
[97]:
[Row(new=3), Row(new=2), Row(new=2), Row(new=9)]

String Functions

PySpark 内置了许多对字符串的处理函数. 和 Python 标准库里的 str 类似, 基本上都能找到一一对应的函数.

Concatenant

拼接字符串. 对于数据类型不是字符串的 Column 会先转化成字符串.

[129]:
pdf = spark.createDataFrame(
    [
        ("a", "b", 1, date(2022, 1, 1)),
    ],
    ("col1", "col2", "col3", "col4")
)
pdf.select(
    func.concat_ws("-", pdf.col1, pdf.col2, pdf.col3, pdf.col4).alias("new")
).show()

+----------------+
|             new|
+----------------+
|a-b-1-2022-01-01|
+----------------+

Base64 Encoding

对数据进行 Base64 编码. 常用于把被序列化后的数据编码成字符串通过网络传输

[131]:
import json

pdf = spark.createDataFrame(
    [
        (b"hello", "hello", json.dumps({"key": "value"})),
    ],
    ("col1", "col2", "col3")
)
pdf.select(
    func.base64(pdf.col1),
    func.base64(pdf.col2),
    func.base64(pdf.col3),
).show(truncate=False)

+------------+------------+------------------------+
|base64(col1)|base64(col2)|base64(col3)            |
+------------+------------+------------------------+
|aGVsbG8=    |aGVsbG8=    |eyJrZXkiOiAidmFsdWUifQ==|
+------------+------------+------------------------+

Format String

和 Python 中的 F-String Template 类似, 使用格式化字符串输出.

[120]:
pdf = spark.createDataFrame(
    [
        ("a", 1),
    ],
    ("key", "value")
)
pdf.select(
    func.format_string("%s-%s", pdf.key, pdf.value).alias("new"),
).show()

+---+
|new|
+---+
|a-1|
+---+

SubString Replace

对子字符串进行替换. 和 Python 中的 str.replace(...) 类似

[123]:
pdf = spark.createDataFrame(
    [
        ("/home/alice/config.json",),
    ],
    ("col", )
)
pdf.select(
    func.regexp_replace(pdf.col, "/home/alice", "/home/bob").alias("new"),
).show(truncate=False)
+---------------------+
|new                  |
+---------------------+
|/home/bob/config.json|
+---------------------+

Padding

对字符串左边或者右边填补. 通常用于确保数字拥有同样的字符串长度, 以用于排序.

[125]:
pdf = spark.createDataFrame(
    [
        (1,),
        (2,),
    ],
    ("col", )
)
pdf.select(
    func.lpad(pdf.col, 6, "0").alias("new"),
).show()
+------+
|   new|
+------+
|000001|
|000002|
+------+

Collection / Array / Map Functions

[135]:
pdf = spark.createDataFrame(
    [
        (1, [11, 12], ["a", "b"]),
        (2, [21, 22], ["c", "d"]),
    ],
    ("id", "int_arr", "str_arr")
)
[138]:
pdf.select(
    func.array_contains(pdf.int_arr, 11).alias("new"),
).show()

pdf.select(
    func.array_contains(pdf.str_arr, "d").alias("new"),
).show()
+-----+
|  new|
+-----+
| true|
|false|
+-----+

+-----+
|  new|
+-----+
|false|
| true|
+-----+

[142]:
pdf = spark.createDataFrame(
    [
        ([1, 2], [2, 3])
    ],
    ("c1", "c2")
)
pdf.select(
    func.arrays_overlap(
        pdf.c1,
        pdf.c2,
    ).alias("new"),
).show()

+----+
| new|
+----+
|true|
+----+

[145]:
pdf = spark.createDataFrame(
    [
        ([1, 2, 3, 4, 5],)
    ],
    ("c",)
)

pdf.select(
    func.slice(
        pdf.c,
        1,
        3,
    ).alias("new"),
).show()

pdf.select(
    func.slice(
        pdf.c,
        3,
        5,
    ).alias("new"),
).show()

+---------+
|      new|
+---------+
|[1, 2, 3]|
+---------+

+---------+
|      new|
+---------+
|[3, 4, 5]|
+---------+

array_join

[149]:
pdf = spark.createDataFrame(
    [
        (["a", None, "c"],)
    ],
    ("c",)
)
[150]:
pdf.select(
    func.array_join(
        pdf.c,
        "-",
    ).alias("new"),
).show()
+---+
|new|
+---+
|a-c|
+---+

[151]:
pdf.select(
    func.array_join(
        pdf.c,
        "-",
        "none",
    ).alias("new"),
).show()

+--------+
|     new|
+--------+
|a-none-c|
+--------+

Set Operations

对集合的操作有:

  • 交 (Intersect): 两者的共同部分

  • 并 (Union): 两者合起来

  • 补 (Except): 从一个集合里移除另一个集合的元素

首先我们来看两个列都是 Array 的情况.

在 Spark 中并没有专门为 array 和 set 设计数据结构. 而在用 Spark 集合操作函数的时候, 会自动先对其去重.

[172]:
pdf = spark.createDataFrame(
    [
        (["a", "b", "b"], ["b", "b", "c"])
    ],
    ("c1", "c2"),
)
pdf.show()
+---------+---------+
|       c1|       c2|
+---------+---------+
|[a, b, b]|[b, b, c]|
+---------+---------+

[164]:
pdf.select(
    func.array_union(
        pdf.c1,
        pdf.c2,
    ).alias("new"),
).show()
+---------+
|      new|
+---------+
|[a, b, c]|
+---------+

[165]:
pdf.select(
    func.array_intersect(
        pdf.c1,
        pdf.c2,
    ).alias("new"),
).show()
+---+
|new|
+---+
|[b]|
+---+

[166]:
pdf.select(
    func.array_except(
        pdf.c1,
        pdf.c2,
    ).alias("new"),
).show()
+---+
|new|
+---+
|[a]|
+---+

[167]:
pdf.select(
    func.array_except(
        pdf.c2,
        pdf.c1,
    ).alias("new"),
).show()
+---+
|new|
+---+
|[c]|
+---+

然后我们来看两个列都是字符串, 把列中的所有元素看成一个 Array 进行集合运算

对于这种情况有两种方法.

方法 1 是先用 collect_set 函数将列转化成 Array 并去重, 于是一个 M 行 2 列的 DataFrame 就变成了一个 1 行 2 列的 DataFrame, 也就是之前的情况. 接下来用 array_union, array_intersect, array_except 函数就可以了.

[193]:
pdf = spark.createDataFrame(
    [
        ("a", "b"),
        ("b", "b"),
        ("b", "c"),
    ],
    ("c1", "c2"),
)
pdf.show()
+---+---+
| c1| c2|
+---+---+
|  a|  b|
|  b|  b|
|  b|  c|
+---+---+

[196]:
pdf1 = pdf.select(
    func.collect_set(pdf.c1).alias("c1_arr"),
    func.collect_set(pdf.c2).alias("c2_arr"),
).show()
+------+------+
|c1_arr|c2_arr|
+------+------+
|[b, a]|[c, b]|
+------+------+

方法 2 是利把一个 DataFrame 中的两列变成两个 DataFrame, 然后用 JOIN 进行计算.

[197]:
pdf1 = spark.createDataFrame(
    [
        ("a", ),
        ("b", ),
        ("b", ),
    ],
    ("c1", ),
)
pdf1.show()

pdf2 = spark.createDataFrame(
    [
        ("b", ),
        ("b", ),
        ("c", ),
    ],
    ("c2", ),
)
pdf2.show()
+---+
| c1|
+---+
|  a|
|  b|
|  b|
+---+

+---+
| c2|
+---+
|  b|
|  b|
|  c|
+---+

[198]:
# 先对行进行去重
pdf11 = pdf1.drop_duplicates(["c1",])
pdf11.show()

pdf22 = pdf2.drop_duplicates(["c2",])
pdf22.show()
+---+
| c1|
+---+
|  a|
|  b|
+---+

+---+
| c2|
+---+
|  b|
|  c|
+---+

[199]:
# inner join 等于 intersection
(
    pdf11.join(
        pdf22,
        on=pdf11.c1==pdf22.c2,
        how="inner"
    ).select("c1").show()
)
+---+
| c1|
+---+
|  b|
+---+

[205]:
# outer join 等于 union, 但是值会分布在两个列中, 你需要 collect_list 然后再集合
(
    pdf11.join(
        pdf22,
        on=pdf11.c1==pdf22.c2,
        how="outer"
    ).select("c1", "c2").show()
)
+----+----+
|  c1|  c2|
+----+----+
|   a|null|
|   b|   b|
|null|   c|
+----+----+

[203]:
# left_anti 相当于 except
# 从 c1 里删除 c2 中的元素
(
    pdf11.join(
        pdf22,
        on=pdf11.c1==pdf22.c2,
        how="left_anti"
    ).select("c1").show()
)
+---+
| c1|
+---+
|  a|
+---+

[204]:
# 从 c2 里删除 c1 中的元素
(
    pdf22.join(
        pdf11,
        on=pdf22.c2==pdf11.c1,
        how="left_anti"
    ).select("c2").show()
)
+---+
| c2|
+---+
|  c|
+---+

Deduplicate / Distinct

[229]:
pdf = spark.createDataFrame(
    [
        ("a", ),
        ("b", ),
        ("b", ),
        ("c", ),
        ("c", ),
        ("c", ),
        ("d", ),
        ("d", ),
        ("e", ),
    ],
    ("col", ),
)
pdf.show()
+---+
|col|
+---+
|  a|
|  b|
|  b|
|  c|
|  c|
|  c|
|  d|
|  d|
|  e|
+---+

[230]:
pdf.drop_duplicates(["col"]).show()
+---+
|col|
+---+
|  b|
|  a|
|  c|
|  e|
|  d|
+---+

[231]:
pdf.select(
    func.count_distinct(pdf.col),
).show()
+-------------------+
|count(DISTINCT col)|
+-------------------+
|                  5|
+-------------------+

[226]:
pdf = spark.createDataFrame(
    [
        ("a", 1),
        ("b", 21),
        ("b", 22),
        ("c", 3),
    ],
    ("key", "value"),
)
pdf.show()
+---+-----+
|key|value|
+---+-----+
|  a|    1|
|  b|   21|
|  b|   22|
|  c|    3|
+---+-----+

[227]:
# 默认情况下所有列中的数据都一样的时候才会被视为重复数据
pdf.drop_duplicates().show()
+---+-----+
|key|value|
+---+-----+
|  a|    1|
|  b|   21|
|  b|   22|
|  c|    3|
+---+-----+

[228]:
# 可以只基于某个列去重, 但是由于 Partition 的机制, 如果基于的列不是 Partition Key
# 那么 DataFrame 中的 Row 的顺序将无法保证得到保留
pdf.drop_duplicates(["key"]).show()
+---+-----+
|key|value|
+---+-----+
|  a|    1|
|  b|   21|
|  c|    3|
+---+-----+

[ ]: