Functions¶
PySpark 中最为复杂的要属内置的海量函数了.
[1]:
import os
import sys
import site
cwd = os.getcwd()
print(f"Current directory: {cwd}")
print(f"Current Python version: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}")
print(f"Current Python interpreter: {sys.executable}")
print(f"Current site-packages: {site.getsitepackages()}")
sys.path.append(os.path.join(cwd, "site-packages"))
Current directory: /home/jovyan/docs/source/03-Functions
Current Python version: 3.10.5
Current Python interpreter: /opt/conda/bin/python
Current site-packages: ['/opt/conda/lib/python3.10/site-packages']
[2]:
# 首先创建一个 Spark Session
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark
[2]:
SparkSession - in-memory
[3]:
# 然后我们先将这些函数统一导入, 以供后续使用
import pyspark.sql.functions as func
Convert Data Type¶
PySpark 中对数据类型进行转化的关键函数是 Column.cast. 而你可以在 这里 找到所有支持的数据类型.
str -> int
int -> str
str -> date
date -> str
int -> datetime
str -> datetime
str: T.StringType, “string”
int: T.IntegerType, “int”
date: T.DateType, “date”
datetime: T.TimestampType, “tmestamp”
[29]:
import pyspark.sql.types as T
Int and String¶
整数和字符串的相互转化非常常见.
[42]:
pdf = spark.createDataFrame(
[
(1, "3"),
(2, "4"),
],
("a_int", "a_str")
)
pdf.collect()
[42]:
[Row(a_int=1, a_str='3'), Row(a_int=2, a_str='4')]
[44]:
# 整数 -> 字符串
pdf.select(
pdf.a_int.astype(T.StringType()).alias("new")
).collect()
[44]:
[Row(new='1'), Row(new='2')]
[45]:
# 字符串 -> 整数
pdf.select(
pdf.a_str.astype(T.IntegerType()).alias("new")
).collect()
[45]:
[Row(new=3), Row(new=4)]
如果有些字符串无法转化为整数, 那么就会返回 NULL
[46]:
pdf = spark.createDataFrame(
[
("1",),
("alice",),
],
("a_str",)
)
pdf.collect()
[46]:
[Row(a_str='1'), Row(a_str='alice')]
[48]:
# 字符串 -> 整数
pdf.select(
pdf.a_str.astype("int").alias("new")
).collect()
[48]:
[Row(new=1), Row(new=None)]
Date and Str and Integer¶
日期和字符串是可以来回转换的. 在 Python 中日期可以转换成从 1970-01-01 开始的天数, 以节约空间. 但是 PySpark 没有这个的内置支持.
[75]:
from datetime import date
pdf = spark.createDataFrame(
[
("2022-01-01", date(2022, 1, 1)),
("2022-01-02", date(2022, 1, 2)),
],
("a_str", "a_date")
)
pdf.collect()
[75]:
[Row(a_str='2022-01-01', a_date=datetime.date(2022, 1, 1)),
Row(a_str='2022-01-02', a_date=datetime.date(2022, 1, 2))]
[76]:
# 日期 -> 字符串
pdf.select(
pdf.a_date.astype(T.StringType()).alias("new")
).collect()
[76]:
[Row(new='2022-01-01'), Row(new='2022-01-02')]
[83]:
# 日期 -> 字符串
pdf.select(
pdf.a_date.astype(T.IntegerType()).alias("new")
).collect()
[83]:
[Row(new=None), Row(new=None)]
[78]:
# 字符串 -> 日期
pdf.select(
pdf.a_str.astype(T.DateType()).alias("new")
).collect()
[78]:
[Row(new=datetime.date(2022, 1, 1)), Row(new=datetime.date(2022, 1, 2))]
Datetime and Str and Integer¶
Python 中的时间对象是可以和字符串和数字相互转换的. 如果是整数则是精度为秒的时间戳. 如果是 Double (Float 不行) 则是精度为微秒 (10 负 6 次方) 的时间戳. PySpark 中的时间默认都是 UTC 时间.
[84]:
from datetime import datetime, timezone
pdf = spark.createDataFrame(
[
("2022-01-01 08:00:00.123", datetime(2022, 1, 1, 8, 0, 0, 123, tzinfo=timezone.utc)),
("2022-01-02 08:00:00.123456", datetime(2022, 1, 2, 8, 0, 0, 123456, tzinfo=timezone.utc)),
],
("a_str", "a_dt")
)
pdf.collect()
[84]:
[Row(a_str='2022-01-01 08:00:00.123', a_dt=datetime.datetime(2022, 1, 1, 8, 0, 0, 123)),
Row(a_str='2022-01-02 08:00:00.123456', a_dt=datetime.datetime(2022, 1, 2, 8, 0, 0, 123456))]
[85]:
# 时间 -> 字符串
pdf.select(
pdf.a_dt.astype(T.StringType()).alias("new")
).collect()
[85]:
[Row(new='2022-01-01 08:00:00.000123'), Row(new='2022-01-02 08:00:00.123456')]
[86]:
# 时间 -> 整数, 精度为秒
pdf.select(
pdf.a_dt.astype(T.IntegerType()).alias("new")
).collect()
[86]:
[Row(new=1641024000), Row(new=1641110400)]
[87]:
# 时间 -> 整数, 精度为微秒
pdf.select(
pdf.a_dt.astype(T.DoubleType()).alias("new")
).collect()
[87]:
[Row(new=1641024000.000123), Row(new=1641110400.123456)]
[88]:
# 字符串 -> 时间
pdf.select(
pdf.a_str.astype(T.TimestampType()).alias("new")
).collect()
[88]:
[Row(new=datetime.datetime(2022, 1, 1, 8, 0, 0, 123000)),
Row(new=datetime.datetime(2022, 1, 2, 8, 0, 0, 123456))]
Int and Boolean¶
在 Python 中整数是可以有布尔值含义的. 0 是 False, 非 0 数都是 True. 反过来 True 是 1, False 是 0.
[57]:
pdf = spark.createDataFrame(
[
(0, False,),
(1, True,),
(2, None),
(-1, None),
],
("a_int", "a_bool")
)
pdf.collect()
[57]:
[Row(a_int=0, a_bool=False),
Row(a_int=1, a_bool=True),
Row(a_int=2, a_bool=None),
Row(a_int=-1, a_bool=None)]
[58]:
# 整数 -> 布尔值, 跟 Python 一样, 0 是 False, 其他都是 True
pdf.select(
pdf.a_int.astype(T.BooleanType()).alias("new")
).collect()
[58]:
[Row(new=False), Row(new=True), Row(new=True), Row(new=True)]
[60]:
# 布尔值 -> 整数, 跟 Python 一样, True 是 1, False 是 0
pdf.select(
pdf.a_bool.astype(T.IntegerType()).alias("new")
).collect()
[60]:
[Row(new=0), Row(new=1), Row(new=None), Row(new=None)]
Int and Float¶
整数和小数之间的转换跟 Python 中的逻辑一致. 整数 -> 小数 后面加 .0
. 小数 -> 整数 则是向下取整.
[95]:
pdf = spark.createDataFrame(
[
(1, 3.14),
(2, 2.0),
(3, 2.718),
(0, 9.999999),
],
("a_int", "a_float")
)
pdf.collect()
[95]:
[Row(a_int=1, a_float=3.14),
Row(a_int=2, a_float=2.0),
Row(a_int=3, a_float=2.718),
Row(a_int=0, a_float=9.999999)]
[96]:
# 布尔值 -> 整数, 跟 Python 一样, True 是 1, False 是 0
pdf.select(
pdf.a_int.astype(T.FloatType()).alias("new")
).collect()
[96]:
[Row(new=1.0), Row(new=2.0), Row(new=3.0), Row(new=0.0)]
[97]:
# 布尔值 -> 整数, 跟 Python 一样, True 是 1, False 是 0
pdf.select(
pdf.a_float.astype(T.IntegerType()).alias("new")
).collect()
[97]:
[Row(new=3), Row(new=2), Row(new=2), Row(new=9)]
String Functions¶
PySpark 内置了许多对字符串的处理函数. 和 Python 标准库里的 str 类似, 基本上都能找到一一对应的函数.
Concatenant¶
拼接字符串. 对于数据类型不是字符串的 Column 会先转化成字符串.
[129]:
pdf = spark.createDataFrame(
[
("a", "b", 1, date(2022, 1, 1)),
],
("col1", "col2", "col3", "col4")
)
pdf.select(
func.concat_ws("-", pdf.col1, pdf.col2, pdf.col3, pdf.col4).alias("new")
).show()
+----------------+
| new|
+----------------+
|a-b-1-2022-01-01|
+----------------+
Base64 Encoding¶
对数据进行 Base64 编码. 常用于把被序列化后的数据编码成字符串通过网络传输
[131]:
import json
pdf = spark.createDataFrame(
[
(b"hello", "hello", json.dumps({"key": "value"})),
],
("col1", "col2", "col3")
)
pdf.select(
func.base64(pdf.col1),
func.base64(pdf.col2),
func.base64(pdf.col3),
).show(truncate=False)
+------------+------------+------------------------+
|base64(col1)|base64(col2)|base64(col3) |
+------------+------------+------------------------+
|aGVsbG8= |aGVsbG8= |eyJrZXkiOiAidmFsdWUifQ==|
+------------+------------+------------------------+
Format String¶
和 Python 中的 F-String Template 类似, 使用格式化字符串输出.
[120]:
pdf = spark.createDataFrame(
[
("a", 1),
],
("key", "value")
)
pdf.select(
func.format_string("%s-%s", pdf.key, pdf.value).alias("new"),
).show()
+---+
|new|
+---+
|a-1|
+---+
SubString Replace¶
对子字符串进行替换. 和 Python 中的 str.replace(...)
类似
[123]:
pdf = spark.createDataFrame(
[
("/home/alice/config.json",),
],
("col", )
)
pdf.select(
func.regexp_replace(pdf.col, "/home/alice", "/home/bob").alias("new"),
).show(truncate=False)
+---------------------+
|new |
+---------------------+
|/home/bob/config.json|
+---------------------+
Padding¶
对字符串左边或者右边填补. 通常用于确保数字拥有同样的字符串长度, 以用于排序.
[125]:
pdf = spark.createDataFrame(
[
(1,),
(2,),
],
("col", )
)
pdf.select(
func.lpad(pdf.col, 6, "0").alias("new"),
).show()
+------+
| new|
+------+
|000001|
|000002|
+------+
Collection / Array / Map Functions¶
[135]:
pdf = spark.createDataFrame(
[
(1, [11, 12], ["a", "b"]),
(2, [21, 22], ["c", "d"]),
],
("id", "int_arr", "str_arr")
)
[138]:
pdf.select(
func.array_contains(pdf.int_arr, 11).alias("new"),
).show()
pdf.select(
func.array_contains(pdf.str_arr, "d").alias("new"),
).show()
+-----+
| new|
+-----+
| true|
|false|
+-----+
+-----+
| new|
+-----+
|false|
| true|
+-----+
[142]:
pdf = spark.createDataFrame(
[
([1, 2], [2, 3])
],
("c1", "c2")
)
pdf.select(
func.arrays_overlap(
pdf.c1,
pdf.c2,
).alias("new"),
).show()
+----+
| new|
+----+
|true|
+----+
[145]:
pdf = spark.createDataFrame(
[
([1, 2, 3, 4, 5],)
],
("c",)
)
pdf.select(
func.slice(
pdf.c,
1,
3,
).alias("new"),
).show()
pdf.select(
func.slice(
pdf.c,
3,
5,
).alias("new"),
).show()
+---------+
| new|
+---------+
|[1, 2, 3]|
+---------+
+---------+
| new|
+---------+
|[3, 4, 5]|
+---------+
array_join¶
[149]:
pdf = spark.createDataFrame(
[
(["a", None, "c"],)
],
("c",)
)
[150]:
pdf.select(
func.array_join(
pdf.c,
"-",
).alias("new"),
).show()
+---+
|new|
+---+
|a-c|
+---+
[151]:
pdf.select(
func.array_join(
pdf.c,
"-",
"none",
).alias("new"),
).show()
+--------+
| new|
+--------+
|a-none-c|
+--------+
Set Operations¶
对集合的操作有:
首先我们来看两个列都是 Array 的情况.
在 Spark 中并没有专门为 array 和 set 设计数据结构. 而在用 Spark 集合操作函数的时候, 会自动先对其去重.
[172]:
pdf = spark.createDataFrame(
[
(["a", "b", "b"], ["b", "b", "c"])
],
("c1", "c2"),
)
pdf.show()
+---------+---------+
| c1| c2|
+---------+---------+
|[a, b, b]|[b, b, c]|
+---------+---------+
[164]:
pdf.select(
func.array_union(
pdf.c1,
pdf.c2,
).alias("new"),
).show()
+---------+
| new|
+---------+
|[a, b, c]|
+---------+
[165]:
pdf.select(
func.array_intersect(
pdf.c1,
pdf.c2,
).alias("new"),
).show()
+---+
|new|
+---+
|[b]|
+---+
[166]:
pdf.select(
func.array_except(
pdf.c1,
pdf.c2,
).alias("new"),
).show()
+---+
|new|
+---+
|[a]|
+---+
[167]:
pdf.select(
func.array_except(
pdf.c2,
pdf.c1,
).alias("new"),
).show()
+---+
|new|
+---+
|[c]|
+---+
然后我们来看两个列都是字符串, 把列中的所有元素看成一个 Array 进行集合运算
对于这种情况有两种方法.
方法 1 是先用 collect_set 函数将列转化成 Array 并去重, 于是一个 M 行 2 列的 DataFrame 就变成了一个 1 行 2 列的 DataFrame, 也就是之前的情况. 接下来用 array_union, array_intersect, array_except
函数就可以了.
[193]:
pdf = spark.createDataFrame(
[
("a", "b"),
("b", "b"),
("b", "c"),
],
("c1", "c2"),
)
pdf.show()
+---+---+
| c1| c2|
+---+---+
| a| b|
| b| b|
| b| c|
+---+---+
[196]:
pdf1 = pdf.select(
func.collect_set(pdf.c1).alias("c1_arr"),
func.collect_set(pdf.c2).alias("c2_arr"),
).show()
+------+------+
|c1_arr|c2_arr|
+------+------+
|[b, a]|[c, b]|
+------+------+
方法 2 是利把一个 DataFrame 中的两列变成两个 DataFrame, 然后用 JOIN 进行计算.
[197]:
pdf1 = spark.createDataFrame(
[
("a", ),
("b", ),
("b", ),
],
("c1", ),
)
pdf1.show()
pdf2 = spark.createDataFrame(
[
("b", ),
("b", ),
("c", ),
],
("c2", ),
)
pdf2.show()
+---+
| c1|
+---+
| a|
| b|
| b|
+---+
+---+
| c2|
+---+
| b|
| b|
| c|
+---+
[198]:
# 先对行进行去重
pdf11 = pdf1.drop_duplicates(["c1",])
pdf11.show()
pdf22 = pdf2.drop_duplicates(["c2",])
pdf22.show()
+---+
| c1|
+---+
| a|
| b|
+---+
+---+
| c2|
+---+
| b|
| c|
+---+
[199]:
# inner join 等于 intersection
(
pdf11.join(
pdf22,
on=pdf11.c1==pdf22.c2,
how="inner"
).select("c1").show()
)
+---+
| c1|
+---+
| b|
+---+
[205]:
# outer join 等于 union, 但是值会分布在两个列中, 你需要 collect_list 然后再集合
(
pdf11.join(
pdf22,
on=pdf11.c1==pdf22.c2,
how="outer"
).select("c1", "c2").show()
)
+----+----+
| c1| c2|
+----+----+
| a|null|
| b| b|
|null| c|
+----+----+
[203]:
# left_anti 相当于 except
# 从 c1 里删除 c2 中的元素
(
pdf11.join(
pdf22,
on=pdf11.c1==pdf22.c2,
how="left_anti"
).select("c1").show()
)
+---+
| c1|
+---+
| a|
+---+
[204]:
# 从 c2 里删除 c1 中的元素
(
pdf22.join(
pdf11,
on=pdf22.c2==pdf11.c1,
how="left_anti"
).select("c2").show()
)
+---+
| c2|
+---+
| c|
+---+
Deduplicate / Distinct¶
[229]:
pdf = spark.createDataFrame(
[
("a", ),
("b", ),
("b", ),
("c", ),
("c", ),
("c", ),
("d", ),
("d", ),
("e", ),
],
("col", ),
)
pdf.show()
+---+
|col|
+---+
| a|
| b|
| b|
| c|
| c|
| c|
| d|
| d|
| e|
+---+
[230]:
pdf.drop_duplicates(["col"]).show()
+---+
|col|
+---+
| b|
| a|
| c|
| e|
| d|
+---+
[231]:
pdf.select(
func.count_distinct(pdf.col),
).show()
+-------------------+
|count(DISTINCT col)|
+-------------------+
| 5|
+-------------------+
[226]:
pdf = spark.createDataFrame(
[
("a", 1),
("b", 21),
("b", 22),
("c", 3),
],
("key", "value"),
)
pdf.show()
+---+-----+
|key|value|
+---+-----+
| a| 1|
| b| 21|
| b| 22|
| c| 3|
+---+-----+
[227]:
# 默认情况下所有列中的数据都一样的时候才会被视为重复数据
pdf.drop_duplicates().show()
+---+-----+
|key|value|
+---+-----+
| a| 1|
| b| 21|
| b| 22|
| c| 3|
+---+-----+
[228]:
# 可以只基于某个列去重, 但是由于 Partition 的机制, 如果基于的列不是 Partition Key
# 那么 DataFrame 中的 Row 的顺序将无法保证得到保留
pdf.drop_duplicates(["key"]).show()
+---+-----+
|key|value|
+---+-----+
| a| 1|
| b| 21|
| c| 3|
+---+-----+
[ ]: