Quick Start¶
[18]:
import os
import sys
import site
cwd = os.getcwd()
print(f"Current directory: {cwd}")
print(f"Current Python version: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}")
print(f"Current Python interpreter: {sys.executable}")
print(f"Current site-packages: {site.getsitepackages()}")
sys.path.append(os.path.join(cwd, "site-packages"))
Current directory: /home/jovyan/docs/source/01-Quick-Start
Current Python version: 3.10.5
Current Python interpreter: /opt/conda/bin/python
Current site-packages: ['/opt/conda/lib/python3.10/site-packages']
Spark Session¶
Spark 集群通常是跑在服务器上的, 而 Jupyter Notebook 则是提供了一个可以和 Spark 集群交互的界面. Livy 是一个能提供 Spark API 的软件. 你的 Jupyter Notebook 每次运行 Spark API, 实际上则是向 Livy 发送了一个命令, 并把返回的响应展现在 Notebook 中. 无论你是使用 Jupyter Notebook 还是 spark submit, 你都要创建一个 Session 跟 Spark 集群建立连接, 这个连接就叫做 Spark Session. 当然连接长期没有动作就会失效.
下面是一个创建 Spark Session 的极简例子. 其实还有很多参数可以用, 详情请参考 pyspark.sql.SparkSession.
[19]:
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark
[19]:
SparkSession - in-memory
DataFrame¶
DataFrame 是 Spark 中的核心 API. 代表的是分布在集群上的数据的一个二维表虚拟结构.
Create DataFrame¶
创建 DataFrame 的方式有很多种, 大体上可以分为两类:
通过 IO 从 数据库, 文件系统 中读取.
从内存中的数据创建而来, 例如从 Python 的 list, pandas 的 DataFrame 中转换而来.
[20]:
# Create from Python list of tuple
pdf_from_list_of_tuple = spark.createDataFrame(
[
(1, "a"),
(2, "b"),
],
("id", "name")
)
pdf_from_list_of_tuple.show()
+---+----+
| id|name|
+---+----+
| 1| a|
| 2| b|
+---+----+
[21]:
# Create from Python list of dict
pdf_from_list_of_dict = spark.createDataFrame(
[
{"id": 1, "name": "a"},
{"id": 2, "name": "b"},
],
)
pdf_from_list_of_dict.show()
+---+----+
| id|name|
+---+----+
| 1| a|
| 2| b|
+---+----+
[22]:
# Create from pandas.DataFrame
import pandas as pd
df = pd.DataFrame({"id": [1, 2], "name": ["a", "b"]})
pdf_from_pandas = spark.createDataFrame(df)
pdf_from_pandas.show()
+---+----+
| id|name|
+---+----+
| 1| a|
| 2| b|
+---+----+
[23]:
# Create from CSV file
from pathlib import Path
dir_here = Path(os.getcwd())
path_csv = dir_here / "tmp.csv"
# bsm
# cwd = os.getcwd()
# p_csv = Path(cwd) / "tmp.csv"
csv = """
id,name
1,a
2,b
""".strip()
path_csv.write_text(csv)
pdf_from_csv = spark.read.csv(f"{path_csv}", header=True)
pdf_from_csv.show()
+---+----+
| id|name|
+---+----+
| 1| a|
| 2| b|
+---+----+
DataFrame Attributes and Methods¶
[24]:
# Create from Python list of tuple
pdf = spark.createDataFrame(
[
(1, "a"),
(2, "b"),
(3, "c"),
(4, "d"),
(5, "e"),
(6, "f"),
(7, "g"),
],
("id", "name")
)
pdf.show()
+---+----+
| id|name|
+---+----+
| 1| a|
| 2| b|
| 3| c|
| 4| d|
| 5| e|
| 6| f|
| 7| g|
+---+----+
[25]:
# 打印 Schema
pdf.printSchema()
root
|-- id: long (nullable = true)
|-- name: string (nullable = true)
[26]:
# 返回 data type 的列表
pdf.dtypes
[26]:
[('id', 'bigint'), ('name', 'string')]
[27]:
# 返回 columns header
pdf.columns
[27]:
['id', 'name']
[28]:
# 返回一共有多少行
pdf.count()
[28]:
7
[29]:
# 返回一共有多少列
len(pdf.columns)
[29]:
2
[30]:
# 返回前面几行, 以 Row 列表的形式
pdf.head(3)
[30]:
[Row(id=1, name='a'), Row(id=2, name='b'), Row(id=3, name='c')]
[31]:
# 返回后面几行, 以 Row 列表的形式
pdf.tail(3)
[31]:
[Row(id=5, name='e'), Row(id=6, name='f'), Row(id=7, name='g')]
[32]:
# 预览前几行的数据
pdf.show(3)
+---+----+
| id|name|
+---+----+
| 1| a|
| 2| b|
| 3| c|
+---+----+
only showing top 3 rows
[33]:
# 同样是预览
# 有的时候有的值很长, pyspark 默认会 truncate 掉部分信息, 你可以关掉这个选项从而看到全部数据
# 有的时候有很多列, 水平放置对人类阅读很不友好, 你可以设置为垂直放置, 从而增加可读性
pdf.show(3, truncate=False, vertical=True)
-RECORD 0---
id | 1
name | a
-RECORD 1---
id | 2
name | b
-RECORD 2---
id | 3
name | c
only showing top 3 rows
Row¶
Row 代表着 DataFrame 中的一行. Spark 本质上仍然是跟 Hadoop 一行, 是以行为基本单位的并行计算引擎, 并不是一个列式计算的引擎. 每一行会根据 Partition Key 分配到不同的节点上计算后汇总.
Create Row¶
Row 不支持直接的 IO, 只支持从 Python 字典创建而来
[34]:
from pyspark.sql import Row
data = {"id": 1, "name": "a"}
row = Row(**data)
row
[34]:
Row(id=1, name='a')
Iterate Row¶
一个 DataFrame 是由许多 Row 组成的. 那么自然的我们希望从 DataFrame 提取出这些 Row 进行后续处理.
[35]:
# Create from Python list of Row
rows = [
Row(id=1, name="a", metadata=Row(value=1)),
Row(id=2, name="b", metadata=Row(value=2)),
Row(id=3, name="c", metadata=Row(value=3)),
]
pdf = spark.createDataFrame(rows)
pdf.show()
+---+----+--------+
| id|name|metadata|
+---+----+--------+
| 1| a| {1}|
| 2| b| {2}|
| 3| c| {3}|
+---+----+--------+
[36]:
# 以 Row 列表的形式返回 DataFrame 中的所有行,
pdf.collect()
[36]:
[Row(id=1, name='a', metadata=Row(value=1)),
Row(id=2, name='b', metadata=Row(value=2)),
Row(id=3, name='c', metadata=Row(value=3))]
[37]:
for row in pdf.toLocalIterator():
print(row)
Row(id=1, name='a', metadata=Row(value=1))
Row(id=2, name='b', metadata=Row(value=2))
Row(id=3, name='c', metadata=Row(value=3))
Row to Python Dict¶
[38]:
# 将 Row 对象转化为 Python 字典
# Row 只支持转化为字典, 不支持对 Key 和 Value 的循环
rows[0].asDict()
[38]:
{'id': 1, 'name': 'a', 'metadata': Row(value=1)}
[39]:
# 有的时候 Row 里会有 nested Row, 这个时候你就需要用 recursive=True 选项
rows[0].asDict(recursive=True)
[39]:
{'id': 1, 'name': 'a', 'metadata': {'value': 1}}
Column¶
Column 本身是 DataFrame 中的一个抽象, 和 Row 不同, 它并不是一个 data container 的实体. 所以你只能先创建 DataFrame 然后访问 Column. 和 Pandas 不同的是, 你无法对 Column 进行循环, 访问里面的值.
[40]:
# Create from Python list of Row
from pyspark.sql import Row, Column
rows = [
Row(id=1, name="a", metadata=Row(value=1)),
Row(id=2, name="b", metadata=Row(value=2)),
Row(id=3, name="c", metadata=Row(value=3)),
]
pdf = spark.createDataFrame(rows)
pdf.show()
+---+----+--------+
| id|name|metadata|
+---+----+--------+
| 1| a| {1}|
| 2| b| {2}|
| 3| c| {3}|
+---+----+--------+
[41]:
pdf.id
[41]:
Column<'id'>
[42]:
pdf["id"]
[42]:
Column<'id'>
[43]:
# 遍历所有的 column 名称
for column_name in pdf.columns:
print(column_name)
id
name
metadata
[44]:
# 遍历所有的 column 实体 (不是名称)
for column_name in pdf.columns:
column = pdf[column_name]
print(column)
Column<'id'>
Column<'name'>
Column<'metadata'>
DataFram and Columns¶
这一部分我们重点介绍在 DataFrame 中对列进行操作
[45]:
# 首先初始化一个 DataFrame
rows = [
Row(id=1, name="a", metadata=Row(id=1, name="a")),
Row(id=2, name="b", metadata=Row(id=2, name="b")),
Row(id=3, name="c", metadata=Row(id=3, name="c")),
]
pdf = spark.createDataFrame(rows)
pdf.show()
+---+----+--------+
| id|name|metadata|
+---+----+--------+
| 1| a| {1, a}|
| 2| b| {2, b}|
| 3| c| {3, c}|
+---+----+--------+
Select Columns in DataFrame¶
PySpark SQL 中最核心的语法就是 DataFrame.select, 功能是基于已有的 DataFrame 构建新的 DataFrame. 新的 DataFrame 可以包含全新的 Column, 之前 DataFrame 中已有的 Column, 或是基于已有的列计算得来的 Column. select 的语法非常灵活, 是 ETL 的重中之重.
[46]:
# 选择其中的一列, 返回的不是 "一个 Column", 而是 "只有一个 Column 的 DataFrame"
pdf.select(
pdf.metadata,
).show()
+--------+
|metadata|
+--------+
| {1, a}|
| {2, b}|
| {3, c}|
+--------+
[47]:
# 选择其中的多列, 返回一个 DataFrame
pdf.select(
pdf.id,
pdf.name,
).show()
+---+----+
| id|name|
+---+----+
| 1| a|
| 2| b|
| 3| c|
+---+----+
Create New Columns¶
创建新的列的方法主要有两类:
直接用 DataFrame.select 方法选择你需要的列然后进行一些计算. 该方法适用于 你不太需要已经存在的 Column, 而只想关注于创建新的 Column.
用 withColumn 或是 withColumns 方法在已有的 DataFrame 的基础上增加新的列. 该方法适用于 你需要全部已有的 Column, 只是想在增加少量新 Column.
下面的例子里我们基于 id 乘以 10 创建了一个新 Column. 这里我们先 import 了一个叫做 lit 的函数. lit 是 literature 也就是 “字面意义” 的意思. 你不能够直接将 Column 与 Python 值进行计算, lit 相当于是一个容器包含了 Python 里面的值, 并且在和 Column 计算之前就转化为了 Scala 对象, 以获得更高效的性能. 而最后我们用 alias
方法给新的 Column 命名.
[48]:
from pyspark.sql.functions import lit
pdf.select(
(pdf.id * lit(10)).alias("new_id")
).show()
+------+
|new_id|
+------+
| 10|
| 20|
| 30|
+------+
下面的例子我们在已有的 DataFrame 的基础上创建新的 Column. 如果只创建一个 Column 则用 withColumn
, 如果要创建多个 Column 则用 withColumns
.
[49]:
# withColumn
pdf.withColumn(
"new_id", pdf.id * lit(10),
).show()
+---+----+--------+------+
| id|name|metadata|new_id|
+---+----+--------+------+
| 1| a| {1, a}| 10|
| 2| b| {2, b}| 20|
| 3| c| {3, c}| 30|
+---+----+--------+------+
在 PySpark 中我们不能像 Python 一样直接进行字符串拼接, 而是要调用 concat 函数进行计算.
[50]:
from pyspark.sql.functions import concat
pdf.withColumn(
"new_name", concat(lit("Mr / Mrs"), pdf.name),
).show()
+---+----+--------+---------+
| id|name|metadata| new_name|
+---+----+--------+---------+
| 1| a| {1, a}|Mr / Mrsa|
| 2| b| {2, b}|Mr / Mrsb|
| 3| c| {3, c}|Mr / Mrsc|
+---+----+--------+---------+
[51]:
pdf.withColumns(
{
"new_id": pdf.id * lit(10),
"new_name": lit("Mr / Mrs") + pdf.name,
}
).show()
+---+----+--------+------+--------+
| id|name|metadata|new_id|new_name|
+---+----+--------+------+--------+
| 1| a| {1, a}| 10| null|
| 2| b| {2, b}| 20| null|
| 3| c| {3, c}| 30| null|
+---+----+--------+------+--------+
注意看, 虽然我们基于初始化的 DataFrame 做了这么多操作, 可是原来的 DataFrame 仍然没有被改变. 该现象涉及 Spark 中一个极其重要的机制 Lazy Load, 我们会在下一节详细解释这一现象.
[52]:
pdf.show()
+---+----+--------+
| id|name|metadata|
+---+----+--------+
| 1| a| {1, a}|
| 2| b| {2, b}|
| 3| c| {3, c}|
+---+----+--------+
Lazy Load¶
在传统的 Python 编程, 以及 pandas 这一类的数据分析工具中, 你运行一行代码的同时计算就发生了. 在下面的例子中, 你运行到这一行的时候, df["id"]
的值就已经被改变了.
df["id"] = df["id"] * 10
而在 Spark 中, 运算符 (Operator) 和实际的计算 (Evaluation) 是分开的两个步骤. Operator 定义了计算的步骤是怎样的, 而你可以选择在特定的时候实际进行计算 (Evaluation). 例如在下面的代码中, 新创建的 DataFrame 已经被定义了, 但是并没有被真正赋值.
new_pdf = pdf.select(
pdf.id,
pdf.name,
)
而当你调用一些具有 evaluation 功能的函数时, 新的 DataFrame 才会被真正赋值. 例如:
new_pdf.show()
这些具有 evaluation 功能的函数通常具有 “必须要真正计算而返回一个值给用户” 的特点. 例如 show
, collect
, count
等函数.
这么做的原因是: 在大数据处理时, 有很多步骤是非常费时的. 编写 Operator 逻辑本身是可能出错的, 我们不希望运行了半天程序结果最后报错. 我们希望能在运行这些 Operator 之前, 利用 Metadata 对其进行检查, 从而提早发现错误. 并且我们希望能继续编写代码, 而仅仅在我们真正需要数据的时候进行 Evaluation.
[ ]: