diff --git a/.github/workflows/deploy.yml b/.github/workflows/deploy.yml
index 35b6ad6..ced8a1b 100644
--- a/.github/workflows/deploy.yml
+++ b/.github/workflows/deploy.yml
@@ -34,4 +34,4 @@ jobs:
run: ssh-keyscan -p ${{ secrets.REMOTE_PORT }} -H ${{ secrets.REMOTE_HOST }} >> ~/.ssh/known_hosts
- name: Deploy with rsync
- run: rsync -avz _build/html/* ${{ secrets.REMOTE_USER }}@${{ secrets.REMOTE_HOST }}:/var/www/distributed-python/
+ run: rsync -avz _build/html/* ${{ secrets.REMOTE_USER }}@${{ secrets.REMOTE_HOST }}:/var/www/scale-python/
diff --git a/README.md b/README.md
index 33b2120..0f77b8d 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
-# Python 分布式编程
+# Python 数据科学加速
-开源的、面向下一代人工智能应用的 Python 分布式编程书籍。
+开源的、面向下一代数据科学和人工智能应用的 Python 并行加速编程书籍。
## 内容介绍
diff --git a/ch-dask-dataframe/indexing.ipynb b/ch-dask-dataframe/indexing.ipynb
index 51eed82..8206544 100644
--- a/ch-dask-dataframe/indexing.ipynb
+++ b/ch-dask-dataframe/indexing.ipynb
@@ -16,15 +16,27 @@
"hide-cell"
]
},
- "outputs": [],
+ "outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.\n",
+ "Perhaps you already have a cluster running?\n",
+ "Hosting the HTTP server on port 51899 instead\n",
+ " warnings.warn(\n"
+ ]
+ }
+ ],
"source": [
"%config InlineBackend.figure_format = 'svg'\n",
"import os\n",
- "import urllib\n",
- "import shutil\n",
- "from zipfile import ZipFile\n",
+ "import sys\n",
+ "sys.path.append(\"..\")\n",
+ "from datasets import nyc_flights\n",
"\n",
"import dask\n",
+ "dask.config.set({'dataframe.query-planning': False})\n",
"import dask.dataframe as dd\n",
"import pandas as pd\n",
"from dask.distributed import LocalCluster, Client\n",
@@ -52,7 +64,7 @@
},
{
"cell_type": "code",
- "execution_count": 15,
+ "execution_count": 2,
"metadata": {},
"outputs": [
{
@@ -123,7 +135,7 @@
"3 qux three 4 40"
]
},
- "execution_count": 15,
+ "execution_count": 2,
"metadata": {},
"output_type": "execute_result"
}
@@ -147,7 +159,7 @@
},
{
"cell_type": "code",
- "execution_count": 19,
+ "execution_count": 3,
"metadata": {},
"outputs": [
{
@@ -220,7 +232,7 @@
"qux three 4 40"
]
},
- "execution_count": 19,
+ "execution_count": 3,
"metadata": {},
"output_type": "execute_result"
}
@@ -239,7 +251,7 @@
},
{
"cell_type": "code",
- "execution_count": 20,
+ "execution_count": 4,
"metadata": {},
"outputs": [
{
@@ -310,7 +322,7 @@
"3 qux three 4 40"
]
},
- "execution_count": 20,
+ "execution_count": 4,
"metadata": {},
"output_type": "execute_result"
}
@@ -343,7 +355,7 @@
},
{
"cell_type": "code",
- "execution_count": 2,
+ "execution_count": 5,
"metadata": {},
"outputs": [
{
@@ -370,7 +382,7 @@
},
{
"cell_type": "code",
- "execution_count": 3,
+ "execution_count": 6,
"metadata": {},
"outputs": [
{
@@ -397,7 +409,7 @@
},
{
"cell_type": "code",
- "execution_count": 4,
+ "execution_count": 7,
"metadata": {},
"outputs": [
{
@@ -433,28 +445,28 @@
"
\n",
" \n",
" 2018-01-01 00:00:00 | \n",
- " 984 | \n",
- " 0.660595 | \n",
+ " 992 | \n",
+ " -0.711756 | \n",
"
\n",
" \n",
" 2018-01-01 00:00:01 | \n",
- " 960 | \n",
- " -0.747564 | \n",
+ " 1018 | \n",
+ " -0.838596 | \n",
"
\n",
" \n",
" 2018-01-01 00:00:02 | \n",
- " 1039 | \n",
- " 0.777117 | \n",
+ " 1000 | \n",
+ " -0.735968 | \n",
"
\n",
" \n",
" 2018-01-01 00:00:03 | \n",
- " 1038 | \n",
- " -0.501949 | \n",
+ " 1004 | \n",
+ " 0.904384 | \n",
"
\n",
" \n",
" 2018-01-01 00:00:04 | \n",
- " 992 | \n",
- " 0.767979 | \n",
+ " 1021 | \n",
+ " 0.025423 | \n",
"
\n",
" \n",
" ... | \n",
@@ -463,28 +475,28 @@
"
\n",
" \n",
" 2022-12-31 23:59:55 | \n",
- " 1005 | \n",
- " -0.102774 | \n",
+ " 1020 | \n",
+ " 0.961542 | \n",
"
\n",
" \n",
" 2022-12-31 23:59:56 | \n",
- " 1040 | \n",
- " -0.648857 | \n",
+ " 963 | \n",
+ " -0.663948 | \n",
"
\n",
" \n",
" 2022-12-31 23:59:57 | \n",
- " 1019 | \n",
- " -0.310174 | \n",
+ " 1010 | \n",
+ " 0.510401 | \n",
"
\n",
" \n",
" 2022-12-31 23:59:58 | \n",
- " 987 | \n",
- " 0.889037 | \n",
+ " 964 | \n",
+ " -0.882126 | \n",
"
\n",
" \n",
" 2022-12-31 23:59:59 | \n",
- " 977 | \n",
- " -0.078216 | \n",
+ " 1020 | \n",
+ " -0.532950 | \n",
"
\n",
" \n",
"\n",
@@ -494,22 +506,22 @@
"text/plain": [
" id x\n",
"timestamp \n",
- "2018-01-01 00:00:00 984 0.660595\n",
- "2018-01-01 00:00:01 960 -0.747564\n",
- "2018-01-01 00:00:02 1039 0.777117\n",
- "2018-01-01 00:00:03 1038 -0.501949\n",
- "2018-01-01 00:00:04 992 0.767979\n",
+ "2018-01-01 00:00:00 992 -0.711756\n",
+ "2018-01-01 00:00:01 1018 -0.838596\n",
+ "2018-01-01 00:00:02 1000 -0.735968\n",
+ "2018-01-01 00:00:03 1004 0.904384\n",
+ "2018-01-01 00:00:04 1021 0.025423\n",
"... ... ...\n",
- "2022-12-31 23:59:55 1005 -0.102774\n",
- "2022-12-31 23:59:56 1040 -0.648857\n",
- "2022-12-31 23:59:57 1019 -0.310174\n",
- "2022-12-31 23:59:58 987 0.889037\n",
- "2022-12-31 23:59:59 977 -0.078216\n",
+ "2022-12-31 23:59:55 1020 0.961542\n",
+ "2022-12-31 23:59:56 963 -0.663948\n",
+ "2022-12-31 23:59:57 1010 0.510401\n",
+ "2022-12-31 23:59:58 964 -0.882126\n",
+ "2022-12-31 23:59:59 1020 -0.532950\n",
"\n",
"[157766400 rows x 2 columns]"
]
},
- "execution_count": 4,
+ "execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
@@ -527,31 +539,31 @@
},
{
"cell_type": "code",
- "execution_count": 5,
+ "execution_count": 8,
"metadata": {},
"outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:640: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n",
+ " head = reader(BytesIO(b_sample), nrows=sample_rows, **head_kwargs)\n"
+ ]
+ },
{
"data": {
"text/plain": [
"(None, None, None, None, None, None, None)"
]
},
- "execution_count": 5,
+ "execution_count": 8,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
- "folder_path = os.path.join(os.getcwd(), \"../data/\")\n",
- "download_url = \"https://dp.godaai.org/nyc-flights.zip\"\n",
- "zip_file_path = os.path.join(folder_path, \"nyc-flights.zip\")\n",
- "if not os.path.exists(os.path.join(folder_path, \"nyc-flights\")):\n",
- " with urllib.request.urlopen(download_url) as response, open(zip_file_path, 'wb') as out_file:\n",
- " shutil.copyfileobj(response, out_file)\n",
- " zf = ZipFile(zip_file_path, 'r')\n",
- " zf.extractall(folder_path)\n",
- " zf.close()\n",
- "file_path = os.path.join(folder_path, \"nyc-flights\", \"*.csv\")\n",
+ "folder_path = nyc_flights()\n",
+ "file_path = os.path.join(folder_path, \"*.csv\")\n",
"flights_ddf = dd.read_csv(file_path,\n",
" parse_dates={'Date': [0, 1, 2]},\n",
" dtype={'TailNum': object,\n",
@@ -569,7 +581,7 @@
},
{
"cell_type": "code",
- "execution_count": 6,
+ "execution_count": 9,
"metadata": {},
"outputs": [
{
@@ -602,7 +614,7 @@
},
{
"cell_type": "code",
- "execution_count": 7,
+ "execution_count": 10,
"metadata": {},
"outputs": [
{
@@ -633,16 +645,38 @@
},
{
"cell_type": "code",
- "execution_count": 8,
+ "execution_count": 11,
"metadata": {},
"outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "2024-04-23 16:05:06,483 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 008ee90768895dabe7a3e94389222068 initialized by task ('shuffle-transfer-008ee90768895dabe7a3e94389222068', 0) executed on worker tcp://127.0.0.1:51911\n",
+ "2024-04-23 16:05:06,505 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 008ee90768895dabe7a3e94389222068 deactivated due to stimulus 'task-finished-1713859506.50483'\n"
+ ]
+ },
{
"name": "stdout",
"output_type": "stream",
"text": [
" col2\n",
"col1 \n",
- "01 a\n",
+ "01 a\n"
+ ]
+ },
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "2024-04-23 16:05:06,545 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 01fddf4f11082a43a6075f7888029dd3 initialized by task ('shuffle-transfer-01fddf4f11082a43a6075f7888029dd3', 1) executed on worker tcp://127.0.0.1:51912\n",
+ "2024-04-23 16:05:06,604 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle 01fddf4f11082a43a6075f7888029dd3 deactivated due to stimulus 'task-finished-1713859506.6028118'\n"
+ ]
+ },
+ {
+ "name": "stdout",
+ "output_type": "stream",
+ "text": [
" col2\n",
"col1 \n",
"02 c\n",
@@ -675,17 +709,25 @@
},
{
"cell_type": "code",
- "execution_count": 9,
+ "execution_count": 12,
"metadata": {},
"outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "2024-04-23 16:05:16,522 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 initialized by task ('shuffle-transfer-d162433f4ca23d129354be4d414ea589', 999) executed on worker tcp://127.0.0.1:51914\n",
+ "2024-04-23 16:05:27,101 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 deactivated due to stimulus 'task-finished-1713859527.100699'\n"
+ ]
+ },
{
"name": "stdout",
"output_type": "stream",
"text": [
"before set_index npartitions: 1826\n",
- "after set_index npartitions: 163\n",
- "CPU times: user 6.1 s, sys: 3.47 s, total: 9.57 s\n",
- "Wall time: 19.6 s\n"
+ "after set_index npartitions: 165\n",
+ "CPU times: user 6.63 s, sys: 3.65 s, total: 10.3 s\n",
+ "Wall time: 20.6 s\n"
]
}
],
@@ -706,7 +748,7 @@
},
{
"cell_type": "code",
- "execution_count": 10,
+ "execution_count": 13,
"metadata": {},
"outputs": [],
"source": [
@@ -716,15 +758,23 @@
},
{
"cell_type": "code",
- "execution_count": 11,
+ "execution_count": 14,
"metadata": {},
"outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "2024-04-23 16:05:38,056 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 initialized by task ('shuffle-transfer-d162433f4ca23d129354be4d414ea589', 999) executed on worker tcp://127.0.0.1:51914\n",
+ "2024-04-23 16:05:49,629 - distributed.shuffle._scheduler_plugin - WARNING - Shuffle d162433f4ca23d129354be4d414ea589 deactivated due to stimulus 'task-finished-1713859549.629161'\n"
+ ]
+ },
{
"name": "stdout",
"output_type": "stream",
"text": [
- "CPU times: user 3.25 s, sys: 1.09 s, total: 4.34 s\n",
- "Wall time: 11.7 s\n"
+ "CPU times: user 3.24 s, sys: 1.7 s, total: 4.94 s\n",
+ "Wall time: 11.9 s\n"
]
}
],
@@ -743,15 +793,15 @@
},
{
"cell_type": "code",
- "execution_count": 12,
+ "execution_count": 15,
"metadata": {},
"outputs": [
{
"name": "stdout",
"output_type": "stream",
"text": [
- "CPU times: user 1.94 s, sys: 743 ms, total: 2.68 s\n",
- "Wall time: 8.18 s\n"
+ "CPU times: user 1.88 s, sys: 1.09 s, total: 2.97 s\n",
+ "Wall time: 8.38 s\n"
]
}
],
@@ -782,9 +832,17 @@
},
{
"cell_type": "code",
- "execution_count": 13,
+ "execution_count": 17,
"metadata": {},
"outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "/var/folders/4n/v40br47s46ggrjm9bdm64lwh0000gn/T/ipykernel_76150/639704942.py:3: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n",
+ " pdf = pd.read_csv(file_path,\n"
+ ]
+ },
{
"data": {
"text/html": [
@@ -837,14 +895,14 @@
"1 JFK 9.311532"
]
},
- "execution_count": 13,
+ "execution_count": 17,
"metadata": {},
"output_type": "execute_result"
}
],
"source": [
"# pandas\n",
- "file_path = os.path.join(folder_path, \"nyc-flights\", \"1991.csv\")\n",
+ "file_path = os.path.join(folder_path, \"1991.csv\")\n",
"pdf = pd.read_csv(file_path,\n",
" parse_dates={'Date': [0, 1, 2]},\n",
" dtype={'TailNum': object,\n",
@@ -865,7 +923,7 @@
},
{
"cell_type": "code",
- "execution_count": 22,
+ "execution_count": 18,
"metadata": {},
"outputs": [
{
@@ -920,7 +978,7 @@
"1 JFK 9.311532"
]
},
- "execution_count": 22,
+ "execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
@@ -940,9 +998,23 @@
},
{
"cell_type": "code",
- "execution_count": 14,
+ "execution_count": 19,
"metadata": {},
"outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n",
+ " df = reader(bio, **kwargs)\n",
+ "/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n",
+ " df = reader(bio, **kwargs)\n",
+ "/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n",
+ " df = reader(bio, **kwargs)\n",
+ "/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n",
+ " df = reader(bio, **kwargs)\n"
+ ]
+ },
{
"data": {
"text/html": [
@@ -995,7 +1067,7 @@
"1 JFK 10.766914"
]
},
- "execution_count": 14,
+ "execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
@@ -1017,6 +1089,13 @@
"source": [
"client.shutdown()"
]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": null,
+ "metadata": {},
+ "outputs": [],
+ "source": []
}
],
"metadata": {
diff --git a/ch-dask-dataframe/map-partitions.ipynb b/ch-dask-dataframe/map-partitions.ipynb
index e077d3f..e3357cb 100644
--- a/ch-dask-dataframe/map-partitions.ipynb
+++ b/ch-dask-dataframe/map-partitions.ipynb
@@ -4,57 +4,37 @@
"cell_type": "markdown",
"metadata": {},
"source": [
+ "(sec-dask-map_partitions)=\n",
"# `map_partitions`\n",
"\n",
- "除了 {numref}`sec-dask-dataframe-shuffle` 中提到的一些需要通信的计算外,有一种最简单的并行方式,英文术语为 Embarrassingly Parallel,中文可翻译为易并行。它指的是该类计算不需要太多跨 Worker 的协调和通信。比如,对某个字段加一,每个 Worker 内执行加法操作即可,Worker 之间没有通信的开销。Dask DataFrame 中可以使用 `map_partitions()` 来做这类 Embarrassingly Parallel 的操作。`map_partitions(func)` 的参数是一个 `func`,这个 `func` 将在每个 Partition 上执行。\n",
+ "除了 {numref}`sec-dask-dataframe-shuffle` 中提到的一些需要通信的计算外,有一种最简单的并行方式,英文术语为 Embarrassingly Parallel,中文可翻译为**易并行**。它指的是该类计算不需要太多跨 Worker 的协调和通信。比如,对某个字段加一,每个 Worker 内执行加法操作即可,Worker 之间没有通信的开销。Dask DataFrame 中可以使用 `map_partitions(func)` 来做这类 Embarrassingly Parallel 的操作。`map_partitions(func)` 的参数是一个 `func`,这个 `func` 将在每个 pandas DataFrame 上执行,`func` 内可以使用 pandas DataFrame 的各类操作。如 {numref}`fig-dask-map-partitions` 所示,`map_partitions(func)` 对原来的 pandas DataFrame 进行了转换操作。\n",
"\n",
- "下面的案例对缺失值进行填充,它没有跨 Worker 的通信开销,因此是一种 Embarrassingly Parallel 的典型应用场景。"
- ]
- },
- {
- "cell_type": "code",
- "execution_count": 6,
- "metadata": {},
- "outputs": [],
- "source": [
- "import os\n",
- "import urllib\n",
- "import shutil\n",
- "from zipfile import ZipFile\n",
- "import warnings\n",
+ "```{figure} ../img/ch-dask-dataframe/map-partitions.svg\n",
+ "---\n",
+ "width: 600px\n",
+ "name: fig-dask-map-partitions\n",
+ "---\n",
+ "map_partitions()\n",
+ "```\n",
"\n",
- "warnings.simplefilter(action='ignore', category=FutureWarning)\n",
+ "## 案例:纽约出租车数据\n",
"\n",
- "folder_path = os.path.join(os.getcwd(), \"../data/\")\n",
- "download_url_prefix = \"https://gender-pay-gap.service.gov.uk/viewing/download-data/\"\n",
- "file_path_prefix = os.path.join(folder_path, \"gender-pay\")\n",
- "if not os.path.exists(file_path_prefix):\n",
- " os.makedirs(file_path_prefix)\n",
- "for year in [2017, 2018, 2019, 2020, 2021, 2022]:\n",
- " download_url = download_url_prefix + str(year)\n",
- " file_path = os.path.join(file_path_prefix, f\"{str(year)}.csv\")\n",
- " if not os.path.exists(file_path):\n",
- " with urllib.request.urlopen(download_url) as response, open(file_path, 'wb') as out_file:\n",
- " shutil.copyfileobj(response, out_file)"
+ "我们使用纽约出租车数据集进行简单的数据预处理:计算每个订单的时长。原数据集中,`tpep_pickup_datetime` 和 `tpep_dropoff_datetime` 分别为乘客上车和下车时间,现在只需要将下车时间 `tpep_dropoff_datetime` 减去上车时间 `tpep_pickup_datetime`。这个计算没有跨 Worker 的通信开销,因此是一种 Embarrassingly Parallel 的典型应用场景。"
]
},
{
"cell_type": "code",
- "execution_count": 7,
+ "execution_count": 4,
"metadata": {},
- "outputs": [
- {
- "name": "stderr",
- "output_type": "stream",
- "text": [
- "/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.\n",
- "Perhaps you already have a cluster running?\n",
- "Hosting the HTTP server on port 57481 instead\n",
- " warnings.warn(\n"
- ]
- }
- ],
+ "outputs": [],
"source": [
+ "import sys\n",
+ "sys.path.append(\"..\")\n",
+ "from datasets import nyc_taxi\n",
+ "\n",
+ "import pandas as pd\n",
+ "import dask\n",
+ "dask.config.set({'dataframe.query-planning': False})\n",
"import dask.dataframe as dd\n",
"import pandas as pd\n",
"from dask.distributed import LocalCluster, Client\n",
@@ -65,31 +45,249 @@
},
{
"cell_type": "code",
- "execution_count": 8,
+ "execution_count": 5,
"metadata": {},
"outputs": [],
"source": [
- "ddf = dd.read_csv(os.path.join(file_path_prefix, \"*.csv\"),\n",
- " dtype={'CompanyNumber': 'str', 'DiffMeanHourlyPercent': 'float64'})\n",
+ "dataset_path = nyc_taxi()\n",
+ "ddf = dd.read_parquet(dataset_path)"
+ ]
+ },
+ {
+ "cell_type": "code",
+ "execution_count": 6,
+ "metadata": {},
+ "outputs": [
+ {
+ "data": {
+ "text/html": [
+ "\n",
+ "\n",
+ "
\n",
+ " \n",
+ " \n",
+ " | \n",
+ " VendorID | \n",
+ " trip_duration | \n",
+ " tpep_pickup_datetime | \n",
+ " tpep_dropoff_datetime | \n",
+ " passenger_count | \n",
+ " trip_distance | \n",
+ " RatecodeID | \n",
+ " store_and_fwd_flag | \n",
+ " PULocationID | \n",
+ " DOLocationID | \n",
+ " payment_type | \n",
+ " fare_amount | \n",
+ " extra | \n",
+ " mta_tax | \n",
+ " tip_amount | \n",
+ " tolls_amount | \n",
+ " improvement_surcharge | \n",
+ " total_amount | \n",
+ " congestion_surcharge | \n",
+ " Airport_fee | \n",
+ "
\n",
+ " \n",
+ " \n",
+ " \n",
+ " 0 | \n",
+ " 1 | \n",
+ " 1253 | \n",
+ " 2023-06-01 00:08:48 | \n",
+ " 2023-06-01 00:29:41 | \n",
+ " 1.0 | \n",
+ " 3.40 | \n",
+ " 1.0 | \n",
+ " N | \n",
+ " 140 | \n",
+ " 238 | \n",
+ " 1 | \n",
+ " 21.9 | \n",
+ " 3.50 | \n",
+ " 0.5 | \n",
+ " 6.70 | \n",
+ " 0.0 | \n",
+ " 1.0 | \n",
+ " 33.60 | \n",
+ " 2.5 | \n",
+ " 0.00 | \n",
+ "
\n",
+ " \n",
+ " 1 | \n",
+ " 1 | \n",
+ " 614 | \n",
+ " 2023-06-01 00:15:04 | \n",
+ " 2023-06-01 00:25:18 | \n",
+ " 0.0 | \n",
+ " 3.40 | \n",
+ " 1.0 | \n",
+ " N | \n",
+ " 50 | \n",
+ " 151 | \n",
+ " 1 | \n",
+ " 15.6 | \n",
+ " 3.50 | \n",
+ " 0.5 | \n",
+ " 3.00 | \n",
+ " 0.0 | \n",
+ " 1.0 | \n",
+ " 23.60 | \n",
+ " 2.5 | \n",
+ " 0.00 | \n",
+ "
\n",
+ " \n",
+ " 2 | \n",
+ " 1 | \n",
+ " 1123 | \n",
+ " 2023-06-01 00:48:24 | \n",
+ " 2023-06-01 01:07:07 | \n",
+ " 1.0 | \n",
+ " 10.20 | \n",
+ " 1.0 | \n",
+ " N | \n",
+ " 138 | \n",
+ " 97 | \n",
+ " 1 | \n",
+ " 40.8 | \n",
+ " 7.75 | \n",
+ " 0.5 | \n",
+ " 10.00 | \n",
+ " 0.0 | \n",
+ " 1.0 | \n",
+ " 60.05 | \n",
+ " 0.0 | \n",
+ " 1.75 | \n",
+ "
\n",
+ " \n",
+ " 3 | \n",
+ " 2 | \n",
+ " 1406 | \n",
+ " 2023-06-01 00:54:03 | \n",
+ " 2023-06-01 01:17:29 | \n",
+ " 3.0 | \n",
+ " 9.83 | \n",
+ " 1.0 | \n",
+ " N | \n",
+ " 100 | \n",
+ " 244 | \n",
+ " 1 | \n",
+ " 39.4 | \n",
+ " 1.00 | \n",
+ " 0.5 | \n",
+ " 8.88 | \n",
+ " 0.0 | \n",
+ " 1.0 | \n",
+ " 53.28 | \n",
+ " 2.5 | \n",
+ " 0.00 | \n",
+ "
\n",
+ " \n",
+ " 4 | \n",
+ " 2 | \n",
+ " 514 | \n",
+ " 2023-06-01 00:18:44 | \n",
+ " 2023-06-01 00:27:18 | \n",
+ " 1.0 | \n",
+ " 1.17 | \n",
+ " 1.0 | \n",
+ " N | \n",
+ " 137 | \n",
+ " 234 | \n",
+ " 1 | \n",
+ " 9.3 | \n",
+ " 1.00 | \n",
+ " 0.5 | \n",
+ " 0.72 | \n",
+ " 0.0 | \n",
+ " 1.0 | \n",
+ " 15.02 | \n",
+ " 2.5 | \n",
+ " 0.00 | \n",
+ "
\n",
+ " \n",
+ "
\n",
+ "
"
+ ],
+ "text/plain": [
+ " VendorID trip_duration tpep_pickup_datetime tpep_dropoff_datetime \\\n",
+ "0 1 1253 2023-06-01 00:08:48 2023-06-01 00:29:41 \n",
+ "1 1 614 2023-06-01 00:15:04 2023-06-01 00:25:18 \n",
+ "2 1 1123 2023-06-01 00:48:24 2023-06-01 01:07:07 \n",
+ "3 2 1406 2023-06-01 00:54:03 2023-06-01 01:17:29 \n",
+ "4 2 514 2023-06-01 00:18:44 2023-06-01 00:27:18 \n",
+ "\n",
+ " passenger_count trip_distance RatecodeID store_and_fwd_flag \\\n",
+ "0 1.0 3.40 1.0 N \n",
+ "1 0.0 3.40 1.0 N \n",
+ "2 1.0 10.20 1.0 N \n",
+ "3 3.0 9.83 1.0 N \n",
+ "4 1.0 1.17 1.0 N \n",
+ "\n",
+ " PULocationID DOLocationID payment_type fare_amount extra mta_tax \\\n",
+ "0 140 238 1 21.9 3.50 0.5 \n",
+ "1 50 151 1 15.6 3.50 0.5 \n",
+ "2 138 97 1 40.8 7.75 0.5 \n",
+ "3 100 244 1 39.4 1.00 0.5 \n",
+ "4 137 234 1 9.3 1.00 0.5 \n",
+ "\n",
+ " tip_amount tolls_amount improvement_surcharge total_amount \\\n",
+ "0 6.70 0.0 1.0 33.60 \n",
+ "1 3.00 0.0 1.0 23.60 \n",
+ "2 10.00 0.0 1.0 60.05 \n",
+ "3 8.88 0.0 1.0 53.28 \n",
+ "4 0.72 0.0 1.0 15.02 \n",
+ "\n",
+ " congestion_surcharge Airport_fee \n",
+ "0 2.5 0.00 \n",
+ "1 2.5 0.00 \n",
+ "2 0.0 1.75 \n",
+ "3 2.5 0.00 \n",
+ "4 2.5 0.00 "
+ ]
+ },
+ "execution_count": 6,
+ "metadata": {},
+ "output_type": "execute_result"
+ }
+ ],
+ "source": [
+ "def transform(df):\n",
+ " df[\"trip_duration\"] = (df[\"tpep_dropoff_datetime\"] - df[\"tpep_pickup_datetime\"]).dt.seconds\n",
+ " # 将 `trip_duration` 挪到前面\n",
+ " dur_column = df.pop('trip_duration')\n",
+ " df.insert(1, dur_column.name, dur_column)\n",
+ " return df\n",
"\n",
- "def fillna(df):\n",
- " return df.fillna(value={\"PostCode\": \"UNKNOWN\"})\n",
- " \n",
- "ddf = ddf.map_partitions(fillna)"
+ "ddf = ddf.map_partitions(transform)\n",
+ "ddf.compute()\n",
+ "ddf.head(5)"
]
},
{
"cell_type": "markdown",
"metadata": {},
"source": [
- "Dask DataFrame 模拟了 pandas DataFrame,如果这个 API 的计算模式是 Embarrassingly Parallel,它的底层很可能就是使用 `map_partitions()` 实现的。\n",
+ "Dask DataFrame 的某些 API 的计算模式是 Embarrassingly Parallel,它的底层很可能就是使用 `map_partitions()` 实现的。\n",
"\n",
- "{numref}`sec-dask-dataframe-indexing` 提到过,Dask DataFrame 会在某个列上进行切分。我们可以在 `map_partitions()` 的 `func` 中实现任何我们想做的事情,但如果对这些切分的列做了改动,需要 `clear_divisions()` 或者重新 `set_index()`。"
+ "{numref}`sec-dask-dataframe-indexing` 提到过,Dask DataFrame 会在某个列上进行切分,但如果对这些切分的列做了改动,需要 `clear_divisions()` 或者重新 `set_index()`。"
]
},
{
"cell_type": "code",
- "execution_count": 9,
+ "execution_count": 7,
"metadata": {},
"outputs": [
{
@@ -114,43 +312,29 @@
" \n",
" \n",
" | \n",
- " EmployerName | \n",
- " EmployerId | \n",
- " Address | \n",
- " PostCode | \n",
- " CompanyNumber | \n",
- " SicCodes | \n",
- " DiffMeanHourlyPercent | \n",
- " DiffMedianHourlyPercent | \n",
- " DiffMeanBonusPercent | \n",
- " DiffMedianBonusPercent | \n",
- " MaleBonusPercent | \n",
- " FemaleBonusPercent | \n",
- " MaleLowerQuartile | \n",
- " FemaleLowerQuartile | \n",
- " MaleLowerMiddleQuartile | \n",
- " FemaleLowerMiddleQuartile | \n",
- " MaleUpperMiddleQuartile | \n",
- " FemaleUpperMiddleQuartile | \n",
- " MaleTopQuartile | \n",
- " FemaleTopQuartile | \n",
- " CompanyLinkToGPGInfo | \n",
- " ResponsiblePerson | \n",
- " EmployerSize | \n",
- " CurrentName | \n",
- " SubmittedAfterTheDeadline | \n",
- " DueDate | \n",
- " DateSubmitted | \n",
+ " VendorID | \n",
+ " trip_duration | \n",
+ " tpep_pickup_datetime | \n",
+ " tpep_dropoff_datetime | \n",
+ " passenger_count | \n",
+ " trip_distance | \n",
+ " RatecodeID | \n",
+ " store_and_fwd_flag | \n",
+ " PULocationID | \n",
+ " DOLocationID | \n",
+ " payment_type | \n",
+ " fare_amount | \n",
+ " extra | \n",
+ " mta_tax | \n",
+ " tip_amount | \n",
+ " tolls_amount | \n",
+ " improvement_surcharge | \n",
+ " total_amount | \n",
+ " congestion_surcharge | \n",
+ " Airport_fee | \n",
"
\n",
" \n",
- " npartitions=6 | \n",
- " | \n",
- " | \n",
- " | \n",
- " | \n",
- " | \n",
- " | \n",
- " | \n",
+ " npartitions=1 | \n",
" | \n",
" | \n",
" | \n",
@@ -176,17 +360,17 @@
"
\n",
" \n",
" | \n",
- " string | \n",
+ " int32 | \n",
+ " int32 | \n",
+ " datetime64[us] | \n",
+ " datetime64[us] | \n",
" int64 | \n",
- " string | \n",
- " string | \n",
- " string | \n",
- " string | \n",
- " float64 | \n",
- " float64 | \n",
- " float64 | \n",
- " float64 | \n",
" float64 | \n",
+ " int64 | \n",
+ " string | \n",
+ " int32 | \n",
+ " int32 | \n",
+ " int64 | \n",
" float64 | \n",
" float64 | \n",
" float64 | \n",
@@ -196,73 +380,6 @@
" float64 | \n",
" float64 | \n",
" float64 | \n",
- " string | \n",
- " string | \n",
- " string | \n",
- " string | \n",
- " bool | \n",
- " string | \n",
- " string | \n",
- "
\n",
- " \n",
- " | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- "
\n",
- " \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
"
\n",
" \n",
" | \n",
@@ -286,62 +403,22 @@
" ... | \n",
" ... | \n",
" ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- "
\n",
- " \n",
- " | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
- " ... | \n",
"
\n",
" \n",
"\n",
"\n",
- "Dask Name: fillna, 3 graph layers
"
+ "Dask Name: transform, 2 graph layers
"
],
"text/plain": [
"Dask DataFrame Structure:\n",
- " EmployerName EmployerId Address PostCode CompanyNumber SicCodes DiffMeanHourlyPercent DiffMedianHourlyPercent DiffMeanBonusPercent DiffMedianBonusPercent MaleBonusPercent FemaleBonusPercent MaleLowerQuartile FemaleLowerQuartile MaleLowerMiddleQuartile FemaleLowerMiddleQuartile MaleUpperMiddleQuartile FemaleUpperMiddleQuartile MaleTopQuartile FemaleTopQuartile CompanyLinkToGPGInfo ResponsiblePerson EmployerSize CurrentName SubmittedAfterTheDeadline DueDate DateSubmitted\n",
- "npartitions=6 \n",
- " string int64 string string string string float64 float64 float64 float64 float64 float64 float64 float64 float64 float64 float64 float64 float64 float64 string string string string bool string string\n",
- " ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n",
- "... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n",
- " ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n",
- " ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n",
- "Dask Name: fillna, 3 graph layers"
+ " VendorID trip_duration tpep_pickup_datetime tpep_dropoff_datetime passenger_count trip_distance RatecodeID store_and_fwd_flag PULocationID DOLocationID payment_type fare_amount extra mta_tax tip_amount tolls_amount improvement_surcharge total_amount congestion_surcharge Airport_fee\n",
+ "npartitions=1 \n",
+ " int32 int32 datetime64[us] datetime64[us] int64 float64 int64 string int32 int32 int64 float64 float64 float64 float64 float64 float64 float64 float64 float64\n",
+ " ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ... ...\n",
+ "Dask Name: transform, 2 graph layers"
]
},
- "execution_count": 9,
+ "execution_count": 7,
"metadata": {},
"output_type": "execute_result"
}
@@ -352,7 +429,7 @@
},
{
"cell_type": "code",
- "execution_count": 10,
+ "execution_count": 8,
"metadata": {},
"outputs": [],
"source": [
diff --git a/ch-dask-dataframe/read-write.ipynb b/ch-dask-dataframe/read-write.ipynb
index 3b1f5fe..a9ba5cd 100644
--- a/ch-dask-dataframe/read-write.ipynb
+++ b/ch-dask-dataframe/read-write.ipynb
@@ -40,63 +40,40 @@
"\n",
"## 数据切分与并行读取\n",
"\n",
- "我们以读取逗号分隔的数值(Comma-Separated Values,CSV)文件为例,来展示 Dask DataFrame 与 pandas 的区别。"
+ "### 案例:飞机起降数据\n",
+ "\n",
+ "飞机起降数据集由多个逗号分隔的数值(Comma-Separated Values,CSV)文件组成,每个文件对应一个年份,我们以读取多个 CSV 为例,来展示 Dask DataFrame 与 pandas 的区别。"
]
},
{
"cell_type": "code",
- "execution_count": 13,
+ "execution_count": 14,
"metadata": {},
- "outputs": [
- {
- "name": "stdout",
- "output_type": "stream",
- "text": [
- "/Users/luweizheng/Projects/godaai/distributed-python/ch-dask-dataframe/../data/nyc-flights/*.csv\n"
- ]
- }
- ],
+ "outputs": [],
"source": [
"import os\n",
"import glob\n",
- "import urllib\n",
- "import shutil\n",
- "from zipfile import ZipFile\n",
- "import warnings\n",
"\n",
+ "import sys\n",
+ "sys.path.append(\"..\")\n",
+ "from datasets import nyc_flights\n",
+ "\n",
+ "import warnings\n",
"warnings.simplefilter(action='ignore', category=FutureWarning)\n",
"\n",
- "folder_path = os.path.join(os.getcwd(), \"../data/\")\n",
- "download_url = \"https://dp.godaai.org/nyc-flights.zip\"\n",
- "zip_file_path = os.path.join(folder_path, \"nyc-flights.zip\")\n",
- "if not os.path.exists(os.path.join(folder_path, \"nyc-flights\")):\n",
- " with urllib.request.urlopen(download_url) as response, open(zip_file_path, 'wb') as out_file:\n",
- " shutil.copyfileobj(response, out_file)\n",
- " zf = ZipFile(zip_file_path, 'r')\n",
- " zf.extractall(folder_path)\n",
- " zf.close()\n",
+ "folder_path = nyc_flights()\n",
"\n",
- "file_path = os.path.join(folder_path, \"nyc-flights\", \"*.csv\")\n",
- "print(file_path)"
+ "file_path = os.path.join(folder_path, \"*.csv\")"
]
},
{
"cell_type": "code",
- "execution_count": 16,
+ "execution_count": 15,
"metadata": {},
- "outputs": [
- {
- "name": "stderr",
- "output_type": "stream",
- "text": [
- "/Users/luweizheng/miniconda3/envs/dispy/lib/python3.11/site-packages/distributed/node.py:182: UserWarning: Port 8787 is already in use.\n",
- "Perhaps you already have a cluster running?\n",
- "Hosting the HTTP server on port 55077 instead\n",
- " warnings.warn(\n"
- ]
- }
- ],
+ "outputs": [],
"source": [
+ "import dask\n",
+ "dask.config.set({'dataframe.query-planning': False})\n",
"import dask.dataframe as dd\n",
"import pandas as pd\n",
"from dask.distributed import LocalCluster, Client\n",
@@ -114,7 +91,7 @@
},
{
"cell_type": "code",
- "execution_count": 2,
+ "execution_count": 16,
"metadata": {},
"outputs": [],
"source": [
@@ -130,7 +107,7 @@
},
{
"cell_type": "code",
- "execution_count": 3,
+ "execution_count": 17,
"metadata": {},
"outputs": [],
"source": [
@@ -153,9 +130,17 @@
},
{
"cell_type": "code",
- "execution_count": 4,
+ "execution_count": 18,
"metadata": {},
"outputs": [
+ {
+ "name": "stderr",
+ "output_type": "stream",
+ "text": [
+ "/Users/luweizheng/miniconda3/envs/dask/lib/python3.11/site-packages/dask/dataframe/io/csv.py:195: FutureWarning: Support for nested sequences for 'parse_dates' in pd.read_csv is deprecated. Combine the desired columns with pd.to_datetime after parsing instead.\n",
+ " df = reader(bio, **kwargs)\n"
+ ]
+ },
{
"data": {
"text/html": [
@@ -302,7 +287,7 @@
"[3 rows x 21 columns]"
]
},
- "execution_count": 4,
+ "execution_count": 18,
"metadata": {},
"output_type": "execute_result"
}
@@ -313,7 +298,7 @@
},
{
"cell_type": "code",
- "execution_count": 5,
+ "execution_count": 19,
"metadata": {},
"outputs": [
{
@@ -457,7 +442,7 @@
"[3 rows x 21 columns]"
]
},
- "execution_count": 5,
+ "execution_count": 19,
"metadata": {},
"output_type": "execute_result"
}
@@ -477,7 +462,7 @@
},
{
"cell_type": "code",
- "execution_count": 6,
+ "execution_count": 20,
"metadata": {},
"outputs": [
{
@@ -486,255 +471,255 @@
"