{ "cells": [ { "cell_type": "markdown", "id": "864ea14f-932d-4e4d-b44e-f96b55796d79", "metadata": {}, "source": [ "# Quick Start" ] }, { "cell_type": "code", "execution_count": 18, "id": "e18f42a6-0f5e-456a-adf1-8e0be6125980", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Current directory: /home/jovyan/docs/source/01-Quick-Start\n", "Current Python version: 3.10.5\n", "Current Python interpreter: /opt/conda/bin/python\n", "Current site-packages: ['/opt/conda/lib/python3.10/site-packages']\n" ] } ], "source": [ "import os\n", "import sys\n", "import site\n", "\n", "cwd = os.getcwd()\n", "print(f\"Current directory: {cwd}\")\n", "print(f\"Current Python version: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}\")\n", "print(f\"Current Python interpreter: {sys.executable}\")\n", "print(f\"Current site-packages: {site.getsitepackages()}\")\n", "\n", "sys.path.append(os.path.join(cwd, \"site-packages\"))" ] }, { "cell_type": "markdown", "id": "8d8532af-13bc-4861-91bd-13b3a6d21e31", "metadata": {}, "source": [ "## Spark Session\n", "\n", "Spark 集群通常是跑在服务器上的, 而 Jupyter Notebook 则是提供了一个可以和 Spark 集群交互的界面. [Livy](https://livy.apache.org/) 是一个能提供 Spark API 的软件. 你的 Jupyter Notebook 每次运行 Spark API, 实际上则是向 Livy 发送了一个命令, 并把返回的响应展现在 Notebook 中. 无论你是使用 Jupyter Notebook 还是 spark submit, 你都要创建一个 Session 跟 Spark 集群建立连接, 这个连接就叫做 Spark Session. 当然连接长期没有动作就会失效.\n", "\n", "下面是一个创建 Spark Session 的极简例子. 其实还有很多参数可以用, 详情请参考 [pyspark.sql.SparkSession](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.SparkSession.html).\n" ] }, { "cell_type": "code", "execution_count": 19, "id": "6b03e3be-3ae4-4521-b4a7-c27dbb3363d0", "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "

SparkSession - in-memory

\n", " \n", "
\n", "

SparkContext

\n", "\n", "

Spark UI

\n", "\n", "
\n", "
Version
\n", "
v3.3.0
\n", "
Master
\n", "
local[*]
\n", "
AppName
\n", "
pyspark-shell
\n", "
\n", "
\n", " \n", "
\n", " " ], "text/plain": [ "" ] }, "execution_count": 19, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.builder.getOrCreate()\n", "spark" ] }, { "cell_type": "markdown", "id": "9774411f-2e6f-45b8-9d8a-873f95d58474", "metadata": {}, "source": [ "## DataFrame\n", "\n", "DataFrame 是 Spark 中的核心 API. 代表的是分布在集群上的数据的一个二维表虚拟结构. " ] }, { "cell_type": "markdown", "id": "24e60456-4ced-4108-8bfb-a9535028335a", "metadata": {}, "source": [ "### Create DataFrame\n", "\n", "创建 DataFrame 的方式有很多种, 大体上可以分为两类:\n", "\n", "1. 通过 IO 从 数据库, 文件系统 中读取.\n", "2. 从内存中的数据创建而来, 例如从 Python 的 list, pandas 的 DataFrame 中转换而来." ] }, { "cell_type": "code", "execution_count": 20, "id": "c9908ddc-b6f4-491f-87d5-d858a4861529", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----+\n", "| id|name|\n", "+---+----+\n", "| 1| a|\n", "| 2| b|\n", "+---+----+\n", "\n" ] } ], "source": [ "# Create from Python list of tuple\n", "pdf_from_list_of_tuple = spark.createDataFrame(\n", " [\n", " (1, \"a\"),\n", " (2, \"b\"),\n", " ],\n", " (\"id\", \"name\")\n", ")\n", "pdf_from_list_of_tuple.show()" ] }, { "cell_type": "code", "execution_count": 21, "id": "baf6b4d2-1124-4fe5-b709-733fabce3826", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----+\n", "| id|name|\n", "+---+----+\n", "| 1| a|\n", "| 2| b|\n", "+---+----+\n", "\n" ] } ], "source": [ "# Create from Python list of dict\n", "pdf_from_list_of_dict = spark.createDataFrame(\n", " [\n", " {\"id\": 1, \"name\": \"a\"},\n", " {\"id\": 2, \"name\": \"b\"},\n", " ],\n", ")\n", "pdf_from_list_of_dict.show()" ] }, { "cell_type": "code", "execution_count": 22, "id": "aa1bdcd8-df18-4e2d-9bb6-4b2a5f08650d", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----+\n", "| id|name|\n", "+---+----+\n", "| 1| a|\n", "| 2| b|\n", "+---+----+\n", "\n" ] } ], "source": [ "# Create from pandas.DataFrame\n", "import pandas as pd\n", "\n", "df = pd.DataFrame({\"id\": [1, 2], \"name\": [\"a\", \"b\"]})\n", "pdf_from_pandas = spark.createDataFrame(df)\n", "pdf_from_pandas.show()" ] }, { "cell_type": "code", "execution_count": 23, "id": "26f757f7-8c0f-4727-b74e-e17e13e9cf91", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----+\n", "| id|name|\n", "+---+----+\n", "| 1| a|\n", "| 2| b|\n", "+---+----+\n", "\n" ] } ], "source": [ "# Create from CSV file\n", "from pathlib import Path\n", "\n", "dir_here = Path(os.getcwd())\n", "path_csv = dir_here / \"tmp.csv\"\n", "\n", "# bsm\n", "# cwd = os.getcwd()\n", "# p_csv = Path(cwd) / \"tmp.csv\"\n", "csv = \"\"\"\n", "id,name\n", "1,a\n", "2,b\n", "\"\"\".strip()\n", "path_csv.write_text(csv)\n", "\n", "pdf_from_csv = spark.read.csv(f\"{path_csv}\", header=True)\n", "pdf_from_csv.show()" ] }, { "cell_type": "markdown", "id": "3ce64afb-9822-466b-82a9-dc7ec2ed7ec1", "metadata": {}, "source": [ "### DataFrame Attributes and Methods" ] }, { "cell_type": "code", "execution_count": 24, "id": "55b58f50-4757-4971-8742-149c4fdfe37c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----+\n", "| id|name|\n", "+---+----+\n", "| 1| a|\n", "| 2| b|\n", "| 3| c|\n", "| 4| d|\n", "| 5| e|\n", "| 6| f|\n", "| 7| g|\n", "+---+----+\n", "\n" ] } ], "source": [ "# Create from Python list of tuple\n", "pdf = spark.createDataFrame(\n", " [\n", " (1, \"a\"),\n", " (2, \"b\"),\n", " (3, \"c\"),\n", " (4, \"d\"),\n", " (5, \"e\"),\n", " (6, \"f\"),\n", " (7, \"g\"),\n", " ],\n", " (\"id\", \"name\")\n", ")\n", "pdf.show()" ] }, { "cell_type": "code", "execution_count": 25, "id": "ff9146ff-beeb-4d57-90e9-255dfbb37769", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "root\n", " |-- id: long (nullable = true)\n", " |-- name: string (nullable = true)\n", "\n" ] } ], "source": [ "# 打印 Schema\n", "pdf.printSchema()" ] }, { "cell_type": "code", "execution_count": 26, "id": "d0f8f97e-eda3-4728-9188-fc4ec048ac44", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[('id', 'bigint'), ('name', 'string')]" ] }, "execution_count": 26, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 返回 data type 的列表\n", "pdf.dtypes" ] }, { "cell_type": "code", "execution_count": 27, "id": "39246397-ddb5-4ffe-bee4-b99259fbb3ea", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "['id', 'name']" ] }, "execution_count": 27, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 返回 columns header\n", "pdf.columns" ] }, { "cell_type": "code", "execution_count": 28, "id": "c2082ece-466b-46be-a111-0d5cf8bcbe91", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "7" ] }, "execution_count": 28, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 返回一共有多少行\n", "pdf.count()" ] }, { "cell_type": "code", "execution_count": 29, "id": "6d055f32-9b37-456c-96e3-e075c3962653", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "2" ] }, "execution_count": 29, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 返回一共有多少列\n", "len(pdf.columns)" ] }, { "cell_type": "code", "execution_count": 30, "id": "f55bca35-29a7-4159-a38c-4d13041ff3f9", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(id=1, name='a'), Row(id=2, name='b'), Row(id=3, name='c')]" ] }, "execution_count": 30, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 返回前面几行, 以 Row 列表的形式\n", "pdf.head(3)" ] }, { "cell_type": "code", "execution_count": 31, "id": "b6ca9582-345a-4284-8a1a-3a32f5313ebc", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(id=5, name='e'), Row(id=6, name='f'), Row(id=7, name='g')]" ] }, "execution_count": 31, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 返回后面几行, 以 Row 列表的形式\n", "pdf.tail(3)" ] }, { "cell_type": "code", "execution_count": 32, "id": "a50ac68a-e4c9-4dce-894f-e0747fef391c", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----+\n", "| id|name|\n", "+---+----+\n", "| 1| a|\n", "| 2| b|\n", "| 3| c|\n", "+---+----+\n", "only showing top 3 rows\n", "\n" ] } ], "source": [ "# 预览前几行的数据\n", "pdf.show(3)" ] }, { "cell_type": "code", "execution_count": 33, "id": "2c156be2-a42d-4526-b095-264bfd3d3e75", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "-RECORD 0---\n", " id | 1 \n", " name | a \n", "-RECORD 1---\n", " id | 2 \n", " name | b \n", "-RECORD 2---\n", " id | 3 \n", " name | c \n", "only showing top 3 rows\n", "\n" ] } ], "source": [ "# 同样是预览\n", "# 有的时候有的值很长, pyspark 默认会 truncate 掉部分信息, 你可以关掉这个选项从而看到全部数据\n", "# 有的时候有很多列, 水平放置对人类阅读很不友好, 你可以设置为垂直放置, 从而增加可读性\n", "pdf.show(3, truncate=False, vertical=True)" ] }, { "cell_type": "markdown", "id": "6069eff3-c9e4-4dc4-ae3a-9ad394a32316", "metadata": {}, "source": [ "## Row\n", "\n", "Row 代表着 DataFrame 中的一行. Spark 本质上仍然是跟 Hadoop 一行, 是以行为基本单位的并行计算引擎, 并不是一个列式计算的引擎. 每一行会根据 Partition Key 分配到不同的节点上计算后汇总." ] }, { "cell_type": "markdown", "id": "dbc76ff5-c511-48c5-8ff4-f6903df62a41", "metadata": {}, "source": [ "### Create Row\n", "\n", "Row 不支持直接的 IO, 只支持从 Python 字典创建而来" ] }, { "cell_type": "code", "execution_count": 34, "id": "693b1bcf-fca6-42e7-a1ac-06c0d654da9f", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Row(id=1, name='a')" ] }, "execution_count": 34, "metadata": {}, "output_type": "execute_result" } ], "source": [ "from pyspark.sql import Row\n", "\n", "data = {\"id\": 1, \"name\": \"a\"}\n", "row = Row(**data)\n", "row" ] }, { "cell_type": "markdown", "id": "db6dffea-ac8c-408e-9aac-46896432bdaf", "metadata": {}, "source": [ "### Iterate Row\n", "\n", "一个 DataFrame 是由许多 Row 组成的. 那么自然的我们希望从 DataFrame 提取出这些 Row 进行后续处理." ] }, { "cell_type": "code", "execution_count": 35, "id": "4e6f7629-e780-4191-a03e-5a674f1de9e1", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----+--------+\n", "| id|name|metadata|\n", "+---+----+--------+\n", "| 1| a| {1}|\n", "| 2| b| {2}|\n", "| 3| c| {3}|\n", "+---+----+--------+\n", "\n" ] } ], "source": [ "# Create from Python list of Row\n", "rows = [\n", " Row(id=1, name=\"a\", metadata=Row(value=1)),\n", " Row(id=2, name=\"b\", metadata=Row(value=2)),\n", " Row(id=3, name=\"c\", metadata=Row(value=3)),\n", "]\n", "pdf = spark.createDataFrame(rows)\n", "pdf.show()" ] }, { "cell_type": "code", "execution_count": 36, "id": "d5fc0fb6-89d3-4c4d-9aeb-e6d1975ab4cf", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "[Row(id=1, name='a', metadata=Row(value=1)),\n", " Row(id=2, name='b', metadata=Row(value=2)),\n", " Row(id=3, name='c', metadata=Row(value=3))]" ] }, "execution_count": 36, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 以 Row 列表的形式返回 DataFrame 中的所有行, \n", "pdf.collect()" ] }, { "cell_type": "code", "execution_count": 37, "id": "bb5c3edd-79df-4209-8fbf-6356a1105d03", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Row(id=1, name='a', metadata=Row(value=1))\n", "Row(id=2, name='b', metadata=Row(value=2))\n", "Row(id=3, name='c', metadata=Row(value=3))\n" ] } ], "source": [ "for row in pdf.toLocalIterator():\n", " print(row)" ] }, { "cell_type": "markdown", "id": "c662eb71-f17d-4aaa-bd3e-987e4f55b81b", "metadata": {}, "source": [ "### Row to Python Dict" ] }, { "cell_type": "code", "execution_count": 38, "id": "446ce9ff-a189-4e0a-839a-f4b696761036", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{'id': 1, 'name': 'a', 'metadata': Row(value=1)}" ] }, "execution_count": 38, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 将 Row 对象转化为 Python 字典\n", "# Row 只支持转化为字典, 不支持对 Key 和 Value 的循环\n", "rows[0].asDict()" ] }, { "cell_type": "code", "execution_count": 39, "id": "69a7be1d-2105-4c70-aa16-6811297b5f75", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "{'id': 1, 'name': 'a', 'metadata': {'value': 1}}" ] }, "execution_count": 39, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 有的时候 Row 里会有 nested Row, 这个时候你就需要用 recursive=True 选项\n", "rows[0].asDict(recursive=True)" ] }, { "cell_type": "markdown", "id": "d68caf0e-a633-429d-9551-fbe4f93a7589", "metadata": {}, "source": [ "## Column\n", "\n", "Column 本身是 DataFrame 中的一个抽象, 和 Row 不同, 它并不是一个 data container 的实体. 所以你只能先创建 DataFrame 然后访问 Column. 和 Pandas 不同的是, 你无法对 Column 进行循环, 访问里面的值." ] }, { "cell_type": "code", "execution_count": 40, "id": "ccb54abe-fb87-4c2c-9e73-347c48f33a3a", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----+--------+\n", "| id|name|metadata|\n", "+---+----+--------+\n", "| 1| a| {1}|\n", "| 2| b| {2}|\n", "| 3| c| {3}|\n", "+---+----+--------+\n", "\n" ] } ], "source": [ "# Create from Python list of Row\n", "from pyspark.sql import Row, Column\n", "\n", "rows = [\n", " Row(id=1, name=\"a\", metadata=Row(value=1)),\n", " Row(id=2, name=\"b\", metadata=Row(value=2)),\n", " Row(id=3, name=\"c\", metadata=Row(value=3)),\n", "]\n", "pdf = spark.createDataFrame(rows)\n", "pdf.show()" ] }, { "cell_type": "code", "execution_count": 41, "id": "582c46b8-e7be-4a7e-a46f-a9cf38b7c367", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Column<'id'>" ] }, "execution_count": 41, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pdf.id" ] }, { "cell_type": "code", "execution_count": 42, "id": "377a5296-5174-4bb7-8d7f-39e6cec201cd", "metadata": {}, "outputs": [ { "data": { "text/plain": [ "Column<'id'>" ] }, "execution_count": 42, "metadata": {}, "output_type": "execute_result" } ], "source": [ "pdf[\"id\"]" ] }, { "cell_type": "code", "execution_count": 43, "id": "85606716-fcdb-4684-9394-a465c3ee3f4e", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "id\n", "name\n", "metadata\n" ] } ], "source": [ "# 遍历所有的 column 名称\n", "for column_name in pdf.columns:\n", " print(column_name)" ] }, { "cell_type": "code", "execution_count": 44, "id": "6a6bc741-573f-4c31-b79c-86005fa55ad7", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Column<'id'>\n", "Column<'name'>\n", "Column<'metadata'>\n" ] } ], "source": [ "# 遍历所有的 column 实体 (不是名称)\n", "for column_name in pdf.columns:\n", " column = pdf[column_name]\n", " print(column)" ] }, { "cell_type": "markdown", "id": "bd108ab6-ae22-4375-a62b-1b845bf50a9b", "metadata": {}, "source": [ "## DataFram and Columns\n", "\n", "这一部分我们重点介绍在 DataFrame 中对列进行操作" ] }, { "cell_type": "code", "execution_count": 45, "id": "eb4855c3-a2ee-4dd9-b13f-132f4881734b", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----+--------+\n", "| id|name|metadata|\n", "+---+----+--------+\n", "| 1| a| {1, a}|\n", "| 2| b| {2, b}|\n", "| 3| c| {3, c}|\n", "+---+----+--------+\n", "\n" ] } ], "source": [ "# 首先初始化一个 DataFrame\n", "rows = [\n", " Row(id=1, name=\"a\", metadata=Row(id=1, name=\"a\")),\n", " Row(id=2, name=\"b\", metadata=Row(id=2, name=\"b\")),\n", " Row(id=3, name=\"c\", metadata=Row(id=3, name=\"c\")),\n", "]\n", "pdf = spark.createDataFrame(rows)\n", "pdf.show()" ] }, { "cell_type": "markdown", "id": "b58a5092-d0bb-444b-bf29-cd4c4e9d009b", "metadata": {}, "source": [ "### Select Columns in DataFrame\n", "\n", "PySpark SQL 中最核心的语法就是 [DataFrame.select]([DataFrame.select](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.select.html#pyspark.sql.DataFrame.select)), 功能是基于已有的 DataFrame 构建新的 DataFrame. 新的 DataFrame 可以包含全新的 Column, 之前 DataFrame 中已有的 Column, 或是基于已有的列计算得来的 Column. select 的语法非常灵活, 是 ETL 的重中之重." ] }, { "cell_type": "code", "execution_count": 46, "id": "a9b09e03-122b-44be-ba09-3002a197a036", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+--------+\n", "|metadata|\n", "+--------+\n", "| {1, a}|\n", "| {2, b}|\n", "| {3, c}|\n", "+--------+\n", "\n" ] } ], "source": [ "# 选择其中的一列, 返回的不是 \"一个 Column\", 而是 \"只有一个 Column 的 DataFrame\"\n", "pdf.select(\n", " pdf.metadata,\n", ").show()" ] }, { "cell_type": "code", "execution_count": 47, "id": "c94461bd-a8e4-4d0f-bc97-a53dcf6b506f", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----+\n", "| id|name|\n", "+---+----+\n", "| 1| a|\n", "| 2| b|\n", "| 3| c|\n", "+---+----+\n", "\n" ] } ], "source": [ "# 选择其中的多列, 返回一个 DataFrame\n", "pdf.select(\n", " pdf.id,\n", " pdf.name,\n", ").show()" ] }, { "cell_type": "markdown", "id": "217188e1-e15a-47d5-a2df-693005b83e9a", "metadata": {}, "source": [ "### Create New Columns\n", "\n", "创建新的列的方法主要有两类:\n", "\n", "1. 直接用 [DataFrame.select](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.select.html#pyspark.sql.DataFrame.select) 方法选择你需要的列然后进行一些计算. 该方法适用于 **你不太需要已经存在的 Column, 而只想关注于创建新的 Column**.\n", "2. 用 [withColumn](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumn.html#pyspark.sql.DataFrame.withColumn) 或是 [withColumns](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.DataFrame.withColumns.html#pyspark.sql.DataFrame.withColumns) 方法在已有的 DataFrame 的基础上增加新的列. 该方法适用于 **你需要全部已有的 Column, 只是想在增加少量新 Column**." ] }, { "cell_type": "markdown", "id": "f3e13218-0c2c-4d03-9d46-aa78d2274a46", "metadata": {}, "source": [ "下面的例子里我们基于 id 乘以 10 创建了一个新 Column. 这里我们先 import 了一个叫做 [lit](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.lit.html#pyspark.sql.functions.lit) 的函数. lit 是 literature 也就是 \"字面意义\" 的意思. 你不能够直接将 Column 与 Python 值进行计算, lit 相当于是一个容器包含了 Python 里面的值, 并且在和 Column 计算之前就转化为了 Scala 对象, 以获得更高效的性能. 而最后我们用 ``alias`` 方法给新的 Column 命名." ] }, { "cell_type": "code", "execution_count": 48, "id": "f266396e-a50b-41ea-900d-b966b50981c6", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+------+\n", "|new_id|\n", "+------+\n", "| 10|\n", "| 20|\n", "| 30|\n", "+------+\n", "\n" ] } ], "source": [ "from pyspark.sql.functions import lit\n", "\n", "pdf.select(\n", " (pdf.id * lit(10)).alias(\"new_id\")\n", ").show()" ] }, { "cell_type": "markdown", "id": "935b667a-b642-4894-b6e8-b43c956ce280", "metadata": {}, "source": [ "下面的例子我们在已有的 DataFrame 的基础上创建新的 Column. 如果只创建一个 Column 则用 ``withColumn``, 如果要创建多个 Column 则用 ``withColumns``." ] }, { "cell_type": "code", "execution_count": 49, "id": "6b980a05-56de-49fe-94f9-e69a574c698e", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----+--------+------+\n", "| id|name|metadata|new_id|\n", "+---+----+--------+------+\n", "| 1| a| {1, a}| 10|\n", "| 2| b| {2, b}| 20|\n", "| 3| c| {3, c}| 30|\n", "+---+----+--------+------+\n", "\n" ] } ], "source": [ "# withColumn\n", "pdf.withColumn(\n", " \"new_id\", pdf.id * lit(10),\n", ").show()" ] }, { "cell_type": "markdown", "id": "6651e5b3-c2f7-4ce7-8eec-7bc6db5e53d2", "metadata": {}, "source": [ "在 PySpark 中我们不能像 Python 一样直接进行字符串拼接, 而是要调用 [concat](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.functions.concat.html#pyspark.sql.functions.concat) 函数进行计算." ] }, { "cell_type": "code", "execution_count": 50, "id": "7b2c9cd9-6556-40a0-99de-ddaa3f48b8f4", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----+--------+---------+\n", "| id|name|metadata| new_name|\n", "+---+----+--------+---------+\n", "| 1| a| {1, a}|Mr / Mrsa|\n", "| 2| b| {2, b}|Mr / Mrsb|\n", "| 3| c| {3, c}|Mr / Mrsc|\n", "+---+----+--------+---------+\n", "\n" ] } ], "source": [ "from pyspark.sql.functions import concat\n", "\n", "pdf.withColumn(\n", " \"new_name\", concat(lit(\"Mr / Mrs\"), pdf.name),\n", ").show()" ] }, { "cell_type": "code", "execution_count": 51, "id": "393c331c-cc4a-4fe0-8b0d-28ba2a86a279", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----+--------+------+--------+\n", "| id|name|metadata|new_id|new_name|\n", "+---+----+--------+------+--------+\n", "| 1| a| {1, a}| 10| null|\n", "| 2| b| {2, b}| 20| null|\n", "| 3| c| {3, c}| 30| null|\n", "+---+----+--------+------+--------+\n", "\n" ] } ], "source": [ "pdf.withColumns(\n", " {\n", " \"new_id\": pdf.id * lit(10),\n", " \"new_name\": lit(\"Mr / Mrs\") + pdf.name,\n", " }\n", ").show()" ] }, { "cell_type": "markdown", "id": "cf5fa5df-f88d-434c-b25c-95a8e7d859ed", "metadata": {}, "source": [ "注意看, 虽然我们基于初始化的 DataFrame 做了这么多操作, 可是原来的 DataFrame 仍然没有被改变. 该现象涉及 Spark 中一个极其重要的机制 Lazy Load, 我们会在下一节详细解释这一现象." ] }, { "cell_type": "code", "execution_count": 52, "id": "1d0c9ca7-75af-4d6e-8dce-3a8b3125cd97", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "+---+----+--------+\n", "| id|name|metadata|\n", "+---+----+--------+\n", "| 1| a| {1, a}|\n", "| 2| b| {2, b}|\n", "| 3| c| {3, c}|\n", "+---+----+--------+\n", "\n" ] } ], "source": [ "pdf.show()" ] }, { "cell_type": "markdown", "id": "f321ae2d-1953-45c5-9cfb-cae5cb1bd9d9", "metadata": {}, "source": [ "## Lazy Load" ] }, { "cell_type": "markdown", "id": "fbebe44c-28c5-48bc-b4e8-99f63a7cf442", "metadata": {}, "source": [ "在传统的 Python 编程, 以及 pandas 这一类的数据分析工具中, 你运行一行代码的同时计算就发生了. 在下面的例子中, 你运行到这一行的时候, ``df[\"id\"]`` 的值就已经被改变了.\n", "\n", "```python\n", "df[\"id\"] = df[\"id\"] * 10\n", "```\n", "\n", "而在 Spark 中, 运算符 (Operator) 和实际的计算 (Evaluation) 是分开的两个步骤. Operator 定义了计算的步骤是怎样的, 而你可以选择在特定的时候实际进行计算 (Evaluation). 例如在下面的代码中, 新创建的 DataFrame 已经被定义了, 但是并没有被真正赋值.\n", "\n", "```\n", "new_pdf = pdf.select(\n", " pdf.id,\n", " pdf.name,\n", ")\n", "```\n", "\n", "而当你调用一些具有 evaluation 功能的函数时, 新的 DataFrame 才会被真正赋值. 例如:\n", "\n", "```\n", "new_pdf.show()\n", "```\n", "\n", "这些具有 evaluation 功能的函数通常具有 \"必须要真正计算而返回一个值给用户\" 的特点. 例如 ``show``, ``collect``, ``count`` 等函数.\n", "\n", "这么做的原因是: 在大数据处理时, 有很多步骤是非常费时的. 编写 Operator 逻辑本身是可能出错的, 我们不希望运行了半天程序结果最后报错. 我们希望能在运行这些 Operator 之前, 利用 Metadata 对其进行检查, 从而提早发现错误. 并且我们希望能继续编写代码, 而仅仅在我们真正需要数据的时候进行 Evaluation." ] }, { "cell_type": "code", "execution_count": null, "id": "5a5a1f7c-47c6-4b83-b404-87458ca164b2", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.5" }, "toc-autonumbering": true }, "nbformat": 4, "nbformat_minor": 5 }