Quick Start

[18]:
import os
import sys
import site

cwd = os.getcwd()
print(f"Current directory: {cwd}")
print(f"Current Python version: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}")
print(f"Current Python interpreter: {sys.executable}")
print(f"Current site-packages: {site.getsitepackages()}")

sys.path.append(os.path.join(cwd, "site-packages"))
Current directory: /home/jovyan/docs/source/01-Quick-Start
Current Python version: 3.10.5
Current Python interpreter: /opt/conda/bin/python
Current site-packages: ['/opt/conda/lib/python3.10/site-packages']

Spark Session

Spark 集群通常是跑在服务器上的, 而 Jupyter Notebook 则是提供了一个可以和 Spark 集群交互的界面. Livy 是一个能提供 Spark API 的软件. 你的 Jupyter Notebook 每次运行 Spark API, 实际上则是向 Livy 发送了一个命令, 并把返回的响应展现在 Notebook 中. 无论你是使用 Jupyter Notebook 还是 spark submit, 你都要创建一个 Session 跟 Spark 集群建立连接, 这个连接就叫做 Spark Session. 当然连接长期没有动作就会失效.

下面是一个创建 Spark Session 的极简例子. 其实还有很多参数可以用, 详情请参考 pyspark.sql.SparkSession.

[19]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()
spark
[19]:

SparkSession - in-memory

SparkContext

Spark UI

Version
v3.3.0
Master
local[*]
AppName
pyspark-shell

DataFrame

DataFrame 是 Spark 中的核心 API. 代表的是分布在集群上的数据的一个二维表虚拟结构.

Create DataFrame

创建 DataFrame 的方式有很多种, 大体上可以分为两类:

  1. 通过 IO 从 数据库, 文件系统 中读取.

  2. 从内存中的数据创建而来, 例如从 Python 的 list, pandas 的 DataFrame 中转换而来.

[20]:
# Create from Python list of tuple
pdf_from_list_of_tuple = spark.createDataFrame(
    [
        (1, "a"),
        (2, "b"),
    ],
    ("id", "name")
)
pdf_from_list_of_tuple.show()
+---+----+
| id|name|
+---+----+
|  1|   a|
|  2|   b|
+---+----+

[21]:
# Create from Python list of dict
pdf_from_list_of_dict = spark.createDataFrame(
    [
        {"id": 1, "name": "a"},
        {"id": 2, "name": "b"},
    ],
)
pdf_from_list_of_dict.show()
+---+----+
| id|name|
+---+----+
|  1|   a|
|  2|   b|
+---+----+

[22]:
# Create from pandas.DataFrame
import pandas as pd

df = pd.DataFrame({"id": [1, 2], "name": ["a", "b"]})
pdf_from_pandas = spark.createDataFrame(df)
pdf_from_pandas.show()
+---+----+
| id|name|
+---+----+
|  1|   a|
|  2|   b|
+---+----+

[23]:
# Create from CSV file
from pathlib import Path

dir_here = Path(os.getcwd())
path_csv = dir_here / "tmp.csv"

# bsm
# cwd = os.getcwd()
# p_csv = Path(cwd) / "tmp.csv"
csv = """
id,name
1,a
2,b
""".strip()
path_csv.write_text(csv)

pdf_from_csv = spark.read.csv(f"{path_csv}", header=True)
pdf_from_csv.show()
+---+----+
| id|name|
+---+----+
|  1|   a|
|  2|   b|
+---+----+

DataFrame Attributes and Methods

[24]:
# Create from Python list of tuple
pdf = spark.createDataFrame(
    [
        (1, "a"),
        (2, "b"),
        (3, "c"),
        (4, "d"),
        (5, "e"),
        (6, "f"),
        (7, "g"),
    ],
    ("id", "name")
)
pdf.show()
+---+----+
| id|name|
+---+----+
|  1|   a|
|  2|   b|
|  3|   c|
|  4|   d|
|  5|   e|
|  6|   f|
|  7|   g|
+---+----+

[25]:
# 打印 Schema
pdf.printSchema()
root
 |-- id: long (nullable = true)
 |-- name: string (nullable = true)

[26]:
# 返回 data type 的列表
pdf.dtypes
[26]:
[('id', 'bigint'), ('name', 'string')]
[27]:
# 返回 columns header
pdf.columns
[27]:
['id', 'name']
[28]:
# 返回一共有多少行
pdf.count()
[28]:
7
[29]:
# 返回一共有多少列
len(pdf.columns)
[29]:
2
[30]:
# 返回前面几行, 以 Row 列表的形式
pdf.head(3)
[30]:
[Row(id=1, name='a'), Row(id=2, name='b'), Row(id=3, name='c')]
[31]:
# 返回后面几行, 以 Row 列表的形式
pdf.tail(3)
[31]:
[Row(id=5, name='e'), Row(id=6, name='f'), Row(id=7, name='g')]
[32]:
# 预览前几行的数据
pdf.show(3)
+---+----+
| id|name|
+---+----+
|  1|   a|
|  2|   b|
|  3|   c|
+---+----+
only showing top 3 rows

[33]:
# 同样是预览
# 有的时候有的值很长, pyspark 默认会 truncate 掉部分信息, 你可以关掉这个选项从而看到全部数据
# 有的时候有很多列, 水平放置对人类阅读很不友好, 你可以设置为垂直放置, 从而增加可读性
pdf.show(3, truncate=False, vertical=True)
-RECORD 0---
 id   | 1
 name | a
-RECORD 1---
 id   | 2
 name | b
-RECORD 2---
 id   | 3
 name | c
only showing top 3 rows

Row

Row 代表着 DataFrame 中的一行. Spark 本质上仍然是跟 Hadoop 一行, 是以行为基本单位的并行计算引擎, 并不是一个列式计算的引擎. 每一行会根据 Partition Key 分配到不同的节点上计算后汇总.

Create Row

Row 不支持直接的 IO, 只支持从 Python 字典创建而来

[34]:
from pyspark.sql import Row

data = {"id": 1, "name": "a"}
row = Row(**data)
row
[34]:
Row(id=1, name='a')

Iterate Row

一个 DataFrame 是由许多 Row 组成的. 那么自然的我们希望从 DataFrame 提取出这些 Row 进行后续处理.

[35]:
# Create from Python list of Row
rows = [
    Row(id=1, name="a", metadata=Row(value=1)),
    Row(id=2, name="b", metadata=Row(value=2)),
    Row(id=3, name="c", metadata=Row(value=3)),
]
pdf = spark.createDataFrame(rows)
pdf.show()
+---+----+--------+
| id|name|metadata|
+---+----+--------+
|  1|   a|     {1}|
|  2|   b|     {2}|
|  3|   c|     {3}|
+---+----+--------+

[36]:
# 以 Row 列表的形式返回 DataFrame 中的所有行,
pdf.collect()
[36]:
[Row(id=1, name='a', metadata=Row(value=1)),
 Row(id=2, name='b', metadata=Row(value=2)),
 Row(id=3, name='c', metadata=Row(value=3))]
[37]:
for row in pdf.toLocalIterator():
    print(row)
Row(id=1, name='a', metadata=Row(value=1))
Row(id=2, name='b', metadata=Row(value=2))
Row(id=3, name='c', metadata=Row(value=3))

Row to Python Dict

[38]:
# 将 Row 对象转化为 Python 字典
# Row 只支持转化为字典, 不支持对 Key 和 Value 的循环
rows[0].asDict()
[38]:
{'id': 1, 'name': 'a', 'metadata': Row(value=1)}
[39]:
# 有的时候 Row 里会有 nested Row, 这个时候你就需要用 recursive=True 选项
rows[0].asDict(recursive=True)
[39]:
{'id': 1, 'name': 'a', 'metadata': {'value': 1}}

Column

Column 本身是 DataFrame 中的一个抽象, 和 Row 不同, 它并不是一个 data container 的实体. 所以你只能先创建 DataFrame 然后访问 Column. 和 Pandas 不同的是, 你无法对 Column 进行循环, 访问里面的值.

[40]:
# Create from Python list of Row
from pyspark.sql import Row, Column

rows = [
    Row(id=1, name="a", metadata=Row(value=1)),
    Row(id=2, name="b", metadata=Row(value=2)),
    Row(id=3, name="c", metadata=Row(value=3)),
]
pdf = spark.createDataFrame(rows)
pdf.show()
+---+----+--------+
| id|name|metadata|
+---+----+--------+
|  1|   a|     {1}|
|  2|   b|     {2}|
|  3|   c|     {3}|
+---+----+--------+

[41]:
pdf.id
[41]:
Column<'id'>
[42]:
pdf["id"]
[42]:
Column<'id'>
[43]:
# 遍历所有的 column 名称
for column_name in pdf.columns:
    print(column_name)
id
name
metadata
[44]:
# 遍历所有的 column 实体 (不是名称)
for column_name in pdf.columns:
    column = pdf[column_name]
    print(column)
Column<'id'>
Column<'name'>
Column<'metadata'>

DataFram and Columns

这一部分我们重点介绍在 DataFrame 中对列进行操作

[45]:
# 首先初始化一个 DataFrame
rows = [
    Row(id=1, name="a", metadata=Row(id=1, name="a")),
    Row(id=2, name="b", metadata=Row(id=2, name="b")),
    Row(id=3, name="c", metadata=Row(id=3, name="c")),
]
pdf = spark.createDataFrame(rows)
pdf.show()
+---+----+--------+
| id|name|metadata|
+---+----+--------+
|  1|   a|  {1, a}|
|  2|   b|  {2, b}|
|  3|   c|  {3, c}|
+---+----+--------+

Select Columns in DataFrame

PySpark SQL 中最核心的语法就是 DataFrame.select, 功能是基于已有的 DataFrame 构建新的 DataFrame. 新的 DataFrame 可以包含全新的 Column, 之前 DataFrame 中已有的 Column, 或是基于已有的列计算得来的 Column. select 的语法非常灵活, 是 ETL 的重中之重.

[46]:
# 选择其中的一列, 返回的不是 "一个 Column", 而是 "只有一个 Column 的 DataFrame"
pdf.select(
    pdf.metadata,
).show()
+--------+
|metadata|
+--------+
|  {1, a}|
|  {2, b}|
|  {3, c}|
+--------+

[47]:
# 选择其中的多列, 返回一个 DataFrame
pdf.select(
    pdf.id,
    pdf.name,
).show()
+---+----+
| id|name|
+---+----+
|  1|   a|
|  2|   b|
|  3|   c|
+---+----+

Create New Columns

创建新的列的方法主要有两类:

  1. 直接用 DataFrame.select 方法选择你需要的列然后进行一些计算. 该方法适用于 你不太需要已经存在的 Column, 而只想关注于创建新的 Column.

  2. withColumn 或是 withColumns 方法在已有的 DataFrame 的基础上增加新的列. 该方法适用于 你需要全部已有的 Column, 只是想在增加少量新 Column.

下面的例子里我们基于 id 乘以 10 创建了一个新 Column. 这里我们先 import 了一个叫做 lit 的函数. lit 是 literature 也就是 “字面意义” 的意思. 你不能够直接将 Column 与 Python 值进行计算, lit 相当于是一个容器包含了 Python 里面的值, 并且在和 Column 计算之前就转化为了 Scala 对象, 以获得更高效的性能. 而最后我们用 alias 方法给新的 Column 命名.

[48]:
from pyspark.sql.functions import lit

pdf.select(
    (pdf.id * lit(10)).alias("new_id")
).show()
+------+
|new_id|
+------+
|    10|
|    20|
|    30|
+------+

下面的例子我们在已有的 DataFrame 的基础上创建新的 Column. 如果只创建一个 Column 则用 withColumn, 如果要创建多个 Column 则用 withColumns.

[49]:
# withColumn
pdf.withColumn(
    "new_id", pdf.id * lit(10),
).show()
+---+----+--------+------+
| id|name|metadata|new_id|
+---+----+--------+------+
|  1|   a|  {1, a}|    10|
|  2|   b|  {2, b}|    20|
|  3|   c|  {3, c}|    30|
+---+----+--------+------+

在 PySpark 中我们不能像 Python 一样直接进行字符串拼接, 而是要调用 concat 函数进行计算.

[50]:
from pyspark.sql.functions import concat

pdf.withColumn(
    "new_name", concat(lit("Mr / Mrs"), pdf.name),
).show()
+---+----+--------+---------+
| id|name|metadata| new_name|
+---+----+--------+---------+
|  1|   a|  {1, a}|Mr / Mrsa|
|  2|   b|  {2, b}|Mr / Mrsb|
|  3|   c|  {3, c}|Mr / Mrsc|
+---+----+--------+---------+

[51]:
pdf.withColumns(
    {
        "new_id": pdf.id * lit(10),
        "new_name": lit("Mr / Mrs") + pdf.name,
    }
).show()
+---+----+--------+------+--------+
| id|name|metadata|new_id|new_name|
+---+----+--------+------+--------+
|  1|   a|  {1, a}|    10|    null|
|  2|   b|  {2, b}|    20|    null|
|  3|   c|  {3, c}|    30|    null|
+---+----+--------+------+--------+

注意看, 虽然我们基于初始化的 DataFrame 做了这么多操作, 可是原来的 DataFrame 仍然没有被改变. 该现象涉及 Spark 中一个极其重要的机制 Lazy Load, 我们会在下一节详细解释这一现象.

[52]:
pdf.show()
+---+----+--------+
| id|name|metadata|
+---+----+--------+
|  1|   a|  {1, a}|
|  2|   b|  {2, b}|
|  3|   c|  {3, c}|
+---+----+--------+

Lazy Load

在传统的 Python 编程, 以及 pandas 这一类的数据分析工具中, 你运行一行代码的同时计算就发生了. 在下面的例子中, 你运行到这一行的时候, df["id"] 的值就已经被改变了.

df["id"] = df["id"] * 10

而在 Spark 中, 运算符 (Operator) 和实际的计算 (Evaluation) 是分开的两个步骤. Operator 定义了计算的步骤是怎样的, 而你可以选择在特定的时候实际进行计算 (Evaluation). 例如在下面的代码中, 新创建的 DataFrame 已经被定义了, 但是并没有被真正赋值.

new_pdf = pdf.select(
    pdf.id,
    pdf.name,
)

而当你调用一些具有 evaluation 功能的函数时, 新的 DataFrame 才会被真正赋值. 例如:

new_pdf.show()

这些具有 evaluation 功能的函数通常具有 “必须要真正计算而返回一个值给用户” 的特点. 例如 show, collect, count 等函数.

这么做的原因是: 在大数据处理时, 有很多步骤是非常费时的. 编写 Operator 逻辑本身是可能出错的, 我们不希望运行了半天程序结果最后报错. 我们希望能在运行这些 Operator 之前, 利用 Metadata 对其进行检查, 从而提早发现错误. 并且我们希望能继续编写代码, 而仅仅在我们真正需要数据的时候进行 Evaluation.

[ ]: