{ "cells": [ { "cell_type": "code", "execution_count": 28, "id": "64db3ff6-e702-49c0-8cdf-0f2fcd50a761", "metadata": {}, "outputs": [ { "name": "stdout", "output_type": "stream", "text": [ "Current directory: /home/jovyan/docs/source/04-Typing\n", "Current Python version: 3.10.5\n", "Current Python interpreter: /opt/conda/bin/python\n", "Current site-packages: ['/opt/conda/lib/python3.10/site-packages']\n" ] } ], "source": [ "import os\n", "import sys\n", "import site\n", "\n", "cwd = os.getcwd()\n", "print(f\"Current directory: {cwd}\")\n", "print(f\"Current Python version: {sys.version_info.major}.{sys.version_info.minor}.{sys.version_info.micro}\")\n", "print(f\"Current Python interpreter: {sys.executable}\")\n", "print(f\"Current site-packages: {site.getsitepackages()}\")\n", "\n", "sys.path.append(os.path.join(cwd, \"site-packages\"))" ] }, { "cell_type": "code", "execution_count": 29, "id": "5fe08872-5422-4b19-8244-c0123190828b", "metadata": {}, "outputs": [ { "data": { "text/html": [ "\n", "
\n", "

SparkSession - in-memory

\n", " \n", "
\n", "

SparkContext

\n", "\n", "

Spark UI

\n", "\n", "
\n", "
Version
\n", "
v3.3.0
\n", "
Master
\n", "
local[*]
\n", "
AppName
\n", "
pyspark-shell
\n", "
\n", "
\n", " \n", "
\n", " " ], "text/plain": [ "" ] }, "execution_count": 29, "metadata": {}, "output_type": "execute_result" } ], "source": [ "# 首先创建一个 Spark Session\n", "from pyspark.sql import SparkSession\n", "\n", "spark = SparkSession.builder.getOrCreate()\n", "spark" ] }, { "cell_type": "markdown", "id": "61f5db48-916e-4a00-aeda-65e2d43d8b12", "metadata": {}, "source": [ "# Typing in PySpark" ] }, { "cell_type": "code", "execution_count": 30, "id": "77a457e4-0186-4285-9243-7b004ab15f6a", "metadata": {}, "outputs": [], "source": [ "import json\n", "from pathlib import Path\n", "import pyspark.sql.functions as F\n", "import pyspark.sql.types as T" ] }, { "cell_type": "markdown", "id": "06d8cd7c-9af1-461e-91dc-3c0bc855d8ea", "metadata": {}, "source": [ "## Mixed Type in One Column\n", "\n", "PySpark 的 DataFrame 是 Schema Enforced 的. 也就是说一个 Column 中的值的 Type 必须一致. **如果你是直接创建 DataFrame 而你又 Pass in 了不同的 Type, 那么会直接报错不让你创建**.\n" ] }, { "cell_type": "code", "execution_count": 31, "id": "906b8599-8213-4a90-b270-906da27985ce", "metadata": {}, "outputs": [ { "ename": "TypeError", "evalue": "field col: Can not merge type and ", "output_type": "error", "traceback": [ "\u001b[0;31m---------------------------------------------------------------------------\u001b[0m", "\u001b[0;31mTypeError\u001b[0m Traceback (most recent call last)", "Input \u001b[0;32mIn [31]\u001b[0m, in \u001b[0;36m\u001b[0;34m()\u001b[0m\n\u001b[0;32m----> 1\u001b[0m pdf \u001b[38;5;241m=\u001b[39m \u001b[43mspark\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mcreateDataFrame\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 2\u001b[0m \u001b[43m \u001b[49m\u001b[43m[\u001b[49m\n\u001b[1;32m 3\u001b[0m \u001b[43m \u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;241;43m1\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m)\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 4\u001b[0m \u001b[43m \u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;241;43m2\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m)\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 5\u001b[0m \u001b[43m \u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;241;43m3\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m)\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 6\u001b[0m \u001b[43m \u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;241;43m4\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m)\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 7\u001b[0m \u001b[43m \u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;241;43m5\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m)\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 8\u001b[0m \u001b[43m \u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43ma\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m)\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 9\u001b[0m \u001b[43m \u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mb\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43m)\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 10\u001b[0m \u001b[43m \u001b[49m\u001b[43m]\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 11\u001b[0m \u001b[43m \u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[38;5;124;43mcol\u001b[39;49m\u001b[38;5;124;43m\"\u001b[39;49m\u001b[43m,\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 12\u001b[0m \u001b[43m)\u001b[49m\n\u001b[1;32m 13\u001b[0m pdf\u001b[38;5;241m.\u001b[39mshow()\n", "File \u001b[0;32m/usr/local/spark/python/pyspark/sql/session.py:894\u001b[0m, in \u001b[0;36mSparkSession.createDataFrame\u001b[0;34m(self, data, schema, samplingRatio, verifySchema)\u001b[0m\n\u001b[1;32m 889\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m has_pandas \u001b[38;5;129;01mand\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(data, pandas\u001b[38;5;241m.\u001b[39mDataFrame):\n\u001b[1;32m 890\u001b[0m \u001b[38;5;66;03m# Create a DataFrame from pandas DataFrame.\u001b[39;00m\n\u001b[1;32m 891\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28msuper\u001b[39m(SparkSession, \u001b[38;5;28mself\u001b[39m)\u001b[38;5;241m.\u001b[39mcreateDataFrame( \u001b[38;5;66;03m# type: ignore[call-overload]\u001b[39;00m\n\u001b[1;32m 892\u001b[0m data, schema, samplingRatio, verifySchema\n\u001b[1;32m 893\u001b[0m )\n\u001b[0;32m--> 894\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_create_dataframe\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 895\u001b[0m \u001b[43m \u001b[49m\u001b[43mdata\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mschema\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43msamplingRatio\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mverifySchema\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;66;43;03m# type: ignore[arg-type]\u001b[39;49;00m\n\u001b[1;32m 896\u001b[0m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n", "File \u001b[0;32m/usr/local/spark/python/pyspark/sql/session.py:936\u001b[0m, in \u001b[0;36mSparkSession._create_dataframe\u001b[0;34m(self, data, schema, samplingRatio, verifySchema)\u001b[0m\n\u001b[1;32m 934\u001b[0m rdd, struct \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_createFromRDD(data\u001b[38;5;241m.\u001b[39mmap(prepare), schema, samplingRatio)\n\u001b[1;32m 935\u001b[0m \u001b[38;5;28;01melse\u001b[39;00m:\n\u001b[0;32m--> 936\u001b[0m rdd, struct \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_createFromLocal\u001b[49m\u001b[43m(\u001b[49m\u001b[38;5;28;43mmap\u001b[39;49m\u001b[43m(\u001b[49m\u001b[43mprepare\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mdata\u001b[49m\u001b[43m)\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mschema\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 937\u001b[0m \u001b[38;5;28;01massert\u001b[39;00m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_jvm \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m\n\u001b[1;32m 938\u001b[0m jrdd \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_jvm\u001b[38;5;241m.\u001b[39mSerDeUtil\u001b[38;5;241m.\u001b[39mtoJavaArray(rdd\u001b[38;5;241m.\u001b[39m_to_java_object_rdd())\n", "File \u001b[0;32m/usr/local/spark/python/pyspark/sql/session.py:631\u001b[0m, in \u001b[0;36mSparkSession._createFromLocal\u001b[0;34m(self, data, schema)\u001b[0m\n\u001b[1;32m 628\u001b[0m data \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mlist\u001b[39m(data)\n\u001b[1;32m 630\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m schema \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;28;01mNone\u001b[39;00m \u001b[38;5;129;01mor\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(schema, (\u001b[38;5;28mlist\u001b[39m, \u001b[38;5;28mtuple\u001b[39m)):\n\u001b[0;32m--> 631\u001b[0m struct \u001b[38;5;241m=\u001b[39m \u001b[38;5;28;43mself\u001b[39;49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43m_inferSchemaFromList\u001b[49m\u001b[43m(\u001b[49m\u001b[43mdata\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mnames\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mschema\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 632\u001b[0m converter \u001b[38;5;241m=\u001b[39m _create_converter(struct)\n\u001b[1;32m 633\u001b[0m tupled_data: Iterable[Tuple] \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mmap\u001b[39m(converter, data)\n", "File \u001b[0;32m/usr/local/spark/python/pyspark/sql/session.py:517\u001b[0m, in \u001b[0;36mSparkSession._inferSchemaFromList\u001b[0;34m(self, data, names)\u001b[0m\n\u001b[1;32m 515\u001b[0m infer_dict_as_struct \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mself\u001b[39m\u001b[38;5;241m.\u001b[39m_jconf\u001b[38;5;241m.\u001b[39minferDictAsStruct()\n\u001b[1;32m 516\u001b[0m prefer_timestamp_ntz \u001b[38;5;241m=\u001b[39m is_timestamp_ntz_preferred()\n\u001b[0;32m--> 517\u001b[0m schema \u001b[38;5;241m=\u001b[39m \u001b[43mreduce\u001b[49m\u001b[43m(\u001b[49m\n\u001b[1;32m 518\u001b[0m \u001b[43m \u001b[49m\u001b[43m_merge_type\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 519\u001b[0m \u001b[43m \u001b[49m\u001b[43m(\u001b[49m\u001b[43m_infer_schema\u001b[49m\u001b[43m(\u001b[49m\u001b[43mrow\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mnames\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43minfer_dict_as_struct\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mprefer_timestamp_ntz\u001b[49m\u001b[43m)\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;28;43;01mfor\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mrow\u001b[49m\u001b[43m \u001b[49m\u001b[38;5;129;43;01min\u001b[39;49;00m\u001b[43m \u001b[49m\u001b[43mdata\u001b[49m\u001b[43m)\u001b[49m\u001b[43m,\u001b[49m\n\u001b[1;32m 520\u001b[0m \u001b[43m\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 521\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m _has_nulltype(schema):\n\u001b[1;32m 522\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mValueError\u001b[39;00m(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mSome of types cannot be determined after inferring\u001b[39m\u001b[38;5;124m\"\u001b[39m)\n", "File \u001b[0;32m/usr/local/spark/python/pyspark/sql/types.py:1383\u001b[0m, in \u001b[0;36m_merge_type\u001b[0;34m(a, b, name)\u001b[0m\n\u001b[1;32m 1381\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(a, StructType):\n\u001b[1;32m 1382\u001b[0m nfs \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mdict\u001b[39m((f\u001b[38;5;241m.\u001b[39mname, f\u001b[38;5;241m.\u001b[39mdataType) \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m cast(StructType, b)\u001b[38;5;241m.\u001b[39mfields)\n\u001b[0;32m-> 1383\u001b[0m fields \u001b[38;5;241m=\u001b[39m [\n\u001b[1;32m 1384\u001b[0m StructField(\n\u001b[1;32m 1385\u001b[0m f\u001b[38;5;241m.\u001b[39mname, _merge_type(f\u001b[38;5;241m.\u001b[39mdataType, nfs\u001b[38;5;241m.\u001b[39mget(f\u001b[38;5;241m.\u001b[39mname, NullType()), name\u001b[38;5;241m=\u001b[39mnew_name(f\u001b[38;5;241m.\u001b[39mname))\n\u001b[1;32m 1386\u001b[0m )\n\u001b[1;32m 1387\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m a\u001b[38;5;241m.\u001b[39mfields\n\u001b[1;32m 1388\u001b[0m ]\n\u001b[1;32m 1389\u001b[0m names \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mset\u001b[39m([f\u001b[38;5;241m.\u001b[39mname \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m fields])\n\u001b[1;32m 1390\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m n \u001b[38;5;129;01min\u001b[39;00m nfs:\n", "File \u001b[0;32m/usr/local/spark/python/pyspark/sql/types.py:1385\u001b[0m, in \u001b[0;36m\u001b[0;34m(.0)\u001b[0m\n\u001b[1;32m 1381\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(a, StructType):\n\u001b[1;32m 1382\u001b[0m nfs \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mdict\u001b[39m((f\u001b[38;5;241m.\u001b[39mname, f\u001b[38;5;241m.\u001b[39mdataType) \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m cast(StructType, b)\u001b[38;5;241m.\u001b[39mfields)\n\u001b[1;32m 1383\u001b[0m fields \u001b[38;5;241m=\u001b[39m [\n\u001b[1;32m 1384\u001b[0m StructField(\n\u001b[0;32m-> 1385\u001b[0m f\u001b[38;5;241m.\u001b[39mname, \u001b[43m_merge_type\u001b[49m\u001b[43m(\u001b[49m\u001b[43mf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mdataType\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mnfs\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mget\u001b[49m\u001b[43m(\u001b[49m\u001b[43mf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mname\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mNullType\u001b[49m\u001b[43m(\u001b[49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m\u001b[43m,\u001b[49m\u001b[43m \u001b[49m\u001b[43mname\u001b[49m\u001b[38;5;241;43m=\u001b[39;49m\u001b[43mnew_name\u001b[49m\u001b[43m(\u001b[49m\u001b[43mf\u001b[49m\u001b[38;5;241;43m.\u001b[39;49m\u001b[43mname\u001b[49m\u001b[43m)\u001b[49m\u001b[43m)\u001b[49m\n\u001b[1;32m 1386\u001b[0m )\n\u001b[1;32m 1387\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m a\u001b[38;5;241m.\u001b[39mfields\n\u001b[1;32m 1388\u001b[0m ]\n\u001b[1;32m 1389\u001b[0m names \u001b[38;5;241m=\u001b[39m \u001b[38;5;28mset\u001b[39m([f\u001b[38;5;241m.\u001b[39mname \u001b[38;5;28;01mfor\u001b[39;00m f \u001b[38;5;129;01min\u001b[39;00m fields])\n\u001b[1;32m 1390\u001b[0m \u001b[38;5;28;01mfor\u001b[39;00m n \u001b[38;5;129;01min\u001b[39;00m nfs:\n", "File \u001b[0;32m/usr/local/spark/python/pyspark/sql/types.py:1378\u001b[0m, in \u001b[0;36m_merge_type\u001b[0;34m(a, b, name)\u001b[0m\n\u001b[1;32m 1375\u001b[0m \u001b[38;5;28;01mreturn\u001b[39;00m b\n\u001b[1;32m 1376\u001b[0m \u001b[38;5;28;01melif\u001b[39;00m \u001b[38;5;28mtype\u001b[39m(a) \u001b[38;5;129;01mis\u001b[39;00m \u001b[38;5;129;01mnot\u001b[39;00m \u001b[38;5;28mtype\u001b[39m(b):\n\u001b[1;32m 1377\u001b[0m \u001b[38;5;66;03m# TODO: type cast (such as int -> long)\u001b[39;00m\n\u001b[0;32m-> 1378\u001b[0m \u001b[38;5;28;01mraise\u001b[39;00m \u001b[38;5;167;01mTypeError\u001b[39;00m(new_msg(\u001b[38;5;124m\"\u001b[39m\u001b[38;5;124mCan not merge type \u001b[39m\u001b[38;5;132;01m%s\u001b[39;00m\u001b[38;5;124m and \u001b[39m\u001b[38;5;132;01m%s\u001b[39;00m\u001b[38;5;124m\"\u001b[39m \u001b[38;5;241m%\u001b[39m (\u001b[38;5;28mtype\u001b[39m(a), \u001b[38;5;28mtype\u001b[39m(b))))\n\u001b[1;32m 1380\u001b[0m \u001b[38;5;66;03m# same type\u001b[39;00m\n\u001b[1;32m 1381\u001b[0m \u001b[38;5;28;01mif\u001b[39;00m \u001b[38;5;28misinstance\u001b[39m(a, StructType):\n", "\u001b[0;31mTypeError\u001b[0m: field col: Can not merge type and " ] } ], "source": [ "pdf = spark.createDataFrame(\n", " [\n", " (1, ),\n", " (2, ),\n", " (3, ),\n", " (4, ),\n", " (5, ),\n", " (\"a\", ),\n", " (\"b\", ),\n", " ],\n", " (\"col\",)\n", ")\n", "pdf.show()" ] }, { "cell_type": "markdown", "id": "57ee72d0-17d4-41c9-9bce-1aa0bd14fa29", "metadata": {}, "source": [ "**如果你是通过 IO 从文件中读取数据, 而存在 Type 不匹配的情况, 那么 Spark 会尝试将所有数据 Convert 成同一个 Type, 通常是字符串. 如果实在不行则报错**. 这个机制也有副作用, 当你期望的数值是 \"非字符串\" 时, 你往往想要把错误的 \"字符串\" 所在的行丢弃, 而 Spark 默认的行为则是全部转化成 \"字符串\"." ] }, { "cell_type": "code", "execution_count": null, "id": "3e3cd7ad-1980-450f-aee3-402c4a8081a0", "metadata": {}, "outputs": [], "source": [ "p_json = Path(cwd) / \"tmp.json\"\n", "\n", "records = [\n", " {\"col\": 1},\n", " {\"col\": 2},\n", " {\"col\": 3},\n", " {\"col\": 4},\n", " {\"col\": 5},\n", " {\"col\": \"a\"},\n", " {\"col\": \"b\"},\n", "]\n", "\n", "content = \"\\n\".join([\n", " json.dumps(record)\n", " for record in records\n", "])\n", "\n", "p_json.write_text(content)\n", "\n", "pdf = spark.read.json(\n", " f\"{p_json}\",\n", ")\n", "\n", "pdf.show()" ] }, { "cell_type": "code", "execution_count": null, "id": "7a4d5f2d-0054-44dc-95ae-8868f369d009", "metadata": {}, "outputs": [], "source": [ "pdf.printSchema()" ] }, { "cell_type": "code", "execution_count": null, "id": "b202dfbe-c546-4191-94f8-36696313aeb0", "metadata": {}, "outputs": [], "source": [ "pdf.collect()" ] }, { "cell_type": "markdown", "id": "b07e1779-aea7-4ae4-aaf9-5e02c2b6c658", "metadata": {}, "source": [ "为了解决 \"错误的数据类型会连累正确的数据类型被强制转化成字符串\" 的问题, 你可以在 IO 的时候手动指定 schema, 并允许 ``nullable = True``, 这样凡是数据类型不对的行就会自动被设为 Null. 手动指定 schema 的 API 可以 [参考这里](https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.types.StructType.html#pyspark.sql.types.StructType)" ] }, { "cell_type": "code", "execution_count": null, "id": "28f9d51e-9775-42ac-979a-ff4a50b272dc", "metadata": {}, "outputs": [], "source": [ "# Define Structure\n", "schema = T.StructType([\n", " T.StructField(\"col\", T.IntegerType(), True),\n", "])\n", "\n", "pdf = spark.read.json(f\"{p_json}\", schema=schema)\n", "\n", "pdf.show()" ] }, { "cell_type": "code", "execution_count": null, "id": "3c4c0e8f-b2ff-4f90-9d95-12238413bd22", "metadata": {}, "outputs": [], "source": [] } ], "metadata": { "kernelspec": { "display_name": "Python 3 (ipykernel)", "language": "python", "name": "python3" }, "language_info": { "codemirror_mode": { "name": "ipython", "version": 3 }, "file_extension": ".py", "mimetype": "text/x-python", "name": "python", "nbconvert_exporter": "python", "pygments_lexer": "ipython3", "version": "3.10.5" } }, "nbformat": 4, "nbformat_minor": 5 }