Skip to content

Commit

Permalink
mpi
Browse files Browse the repository at this point in the history
  • Loading branch information
luweizheng committed Feb 12, 2024
1 parent 6acc6aa commit a211326
Show file tree
Hide file tree
Showing 27 changed files with 815 additions and 600 deletions.
1 change: 0 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ data/
*.DS_Store
*.csv
*egg-info*
dist*
_build/*
test*.md
.idea
Expand Down
1 change: 0 additions & 1 deletion _toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ subtrees:
- file: ch-dask/task-graph-partitioning
- file: ch-dask-dataframe/index
entries:
- file: ch-dask-dataframe/dask-pandas
- file: ch-dask-dataframe/read-write
- file: ch-ray-core/index
entries:
Expand Down
3 changes: 0 additions & 3 deletions ch-dask-dataframe/dask-pandas.md

This file was deleted.

2 changes: 2 additions & 0 deletions ch-dask-dataframe/index.md
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
# Dask DataFrame

pandas 已经成为 DataFrame 的标准,但无法利用多核和集群,Dask DataFrame 试图解决 pandas 并行计算的问题。Dask DataFrame 尽量提供与 pandas 一致的 API,但使用起来,Dask DataFrame 仍有很多不同。本章假设用户已经了解并熟悉 pandas,并重点讨论 Dask DataFrame 与 pandas 的区别。

```{tableofcontents}
```
459 changes: 223 additions & 236 deletions ch-dask-dataframe/read-write.ipynb

Large diffs are not rendered by default.

2 changes: 1 addition & 1 deletion ch-dask/dask-dataframe-intro.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,7 @@
"source": [
"## 创建 Dask DataFrame\n",
"\n",
"使用 Dask 内置的方法创建一个名为 `ddf` 的 `DataFrame`,这份数据是随机生成的,每秒钟生成一个数据样本,共计 4 天(从 2024-01-01 0:00 至 2024-01-05 0:00)。"
"使用 Dask 内置的方法创建一个名为 `ddf` 的 `DataFrame`,这份数据是一个时间序列数据,是随机生成的,每个数据样本代表一秒钟,共计 4 天(从 2024-01-01 0:00 至 2024-01-05 0:00)。"
]
},
{
Expand Down
18 changes: 16 additions & 2 deletions ch-dask/dask-distributed.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -409,7 +409,21 @@
"如果你在个人电脑上操作 `SSHCluster`,即个人电脑是 Client,各个计算节点组成集群,你应该将个人电脑与各个计算节点之间设置免密码登录。确切地说,各个计算节点的 `authorized_keys` 文件中应该存储了个人电脑的公钥。\n",
":::\n",
"\n",
"使用 Python 代码启动这个 Dask 集群。`hosts` 是要拉起 Dask 的主机列表,列表上第一个节点将启动 Dask Scheduler,其余节点启动 Dask Worker;`connect_options` 是 SSH 登录的一些参数,比如用户名 `username`、密码 `password`、端口号 `port`,这些参数以 `{\"key\": \"value\"}` 的形式传递给 Dask。\n",
"可以使用 Python 代码启动这个 Dask 集群:\n",
"\n",
"```python\n",
"from dask.distributed import Client, SSHCluster\n",
"cluster = SSHCluster(\n",
" hosts=[\"localhost\", \"node1\", \"node2\"],\n",
" connect_options={\n",
" \"username\": \"xxx\",\n",
" \"password\": \"yyy\",\n",
" }\n",
")\n",
"client = Client(cluster)\n",
"```\n",
"\n",
"`hosts` 是要拉起 Dask 的主机列表,列表上第一个节点将启动 Dask Scheduler,其余节点启动 Dask Worker;`connect_options` 是 SSH 登录的一些参数,比如用户名 `username`、密码 `password`、端口号 `port`,这些参数以 `{\"key\": \"value\"}` 的形式传递给 Dask。\n",
"\n",
"```python\n",
"from dask.distributed import Client, SSHCluster\n",
Expand Down Expand Up @@ -448,7 +462,7 @@
"\n",
"## 自动缩放\n",
"\n",
"前面介绍的 Kubernetes 和高性能计算集群均支持自动缩放(Auto-Scaling 或 Adaptive Scaling),因为他们本身就是集群调度管理软件,他们管理着大量计算资源,不同的应用向他们请求资源。Dask 集群构建在 Kubernetes 上,相当于在 Kubernetes 所管理的计算资源上申请一个子集。在 Kubernetes 或高性能计算集群上,我们可以使用 Dask 的自动缩放技术,自动增加或减少 Dask 所需计算资源。自动缩放主要考虑以下场景:\n",
"前面介绍的 Kubernetes 和高性能计算集群(例如 Slurm)均支持自动缩放(Auto-Scaling 或 Adaptive Scaling),因为他们本身就是集群调度管理软件,他们管理着大量计算资源,不同的应用向他们请求资源。Dask 构建在 Kubernetes 或 Slurm 上,相当于在 Kubernetes 或 Slurm 所管理的计算资源上申请一个子集。在 Kubernetes 或 Slurm 上,我们可以使用 Dask 的自动缩放技术,自动增加或减少 Dask 所需计算资源。自动缩放主要考虑以下场景:\n",
"\n",
"* 用户当前作业对计算资源要求很高,需要更多的资源来满足计算需求。\n",
"* 用户当前作业所申请的计算资源闲置,这些资源可被其他用户使用。尤其是当用户进行交互式数据可视化,而非大规模计算时。\n",
Expand Down
2 changes: 1 addition & 1 deletion ch-data-science/index.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# 数据科学

Python 分布式编程主要服务于数据科学和人工智能等对高性能计算领域,本章主要介绍数据科学相关背景知识,如果相关基础较好,也可以直接跳过本章。
Python 分布式编程主要服务于数据科学和人工智能等领域,本章主要介绍数据科学相关背景知识,如果相关基础较好,也可以直接跳过本章。

```{tableofcontents}
```
9 changes: 8 additions & 1 deletion ch-data-science/machine-learning.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -1037,7 +1037,7 @@
"\\end{aligned}\n",
"$$\n",
"\n",
"公式中,$\\alpha$ 是学习率,即参数更新的速度,它控制着算法每轮迭代的更新补偿,如果学习率太大则容易振荡,太小则收敛速度又会过慢。\n",
"公式中,$\\alpha$ 是学习率,即参数更新的速度,如果学习率太大则容易振荡,不容易收敛,太小则收敛速度又会过慢。\n",
"\n",
"各层的导数又被称为梯度,参数沿着梯度方向下降,又被成为梯度下降法。计算各层的导数时,往往是从最后的损失函数开始,向前一层一层地求梯度,即先求最后第 $n$ 层的梯度,得到第 $n$ 层的梯度,结合链式法则,求第 $n-1$ 层的梯度。{numref}`back-propagation` 展示了神经网络的反向传播过程。\n",
"\n",
Expand All @@ -1049,6 +1049,13 @@
"神经网络的反向传播\n",
"```\n",
"\n",
"## 超参数\n",
"\n",
"神经网络训练过程中,有很多训练模型之前需要人为设定的一些参数,这些参数不能通过模型的反向传播算法来自动学习,而需要手动选择和调整。这些参数又被成为超参数,超参数的选择通常基于经验或反复试验。以下是一些超参数:\n",
"\n",
"* 学习率,即刚才提到的 $\\alpha$,控制着每次更新参数的步长。\n",
"* 网络结构:模型的层数、每层的神经元数量、激活函数的选择等。不同的网络结构对于不同的任务可能有不同的性能表现。\n",
"\n",
"## 推理\n",
"\n",
"模型训练就是前向和反向传播,模型推理只需要前向传播,只不过输入层换成了需要预测的 $\\boldsymbol{x}$。"
Expand Down
2 changes: 1 addition & 1 deletion ch-intro/computer-architecture.md
Original file line number Diff line number Diff line change
Expand Up @@ -27,4 +27,4 @@ name: computer-arch

## 异构计算

在异构计算的框架下,CPU 和主存通常被称为主机(Host),各类专用的加速器被称为设备(Device)。尽管异构计算是一个很宽泛的概念,但当前基于 GPU 的异构计算是主流,尤其是以英伟达为代表的 GPU 占据了大量市场份额,所以这里主要以 GPU 为例介绍异构计算。GPU 有区别于 CPU 的芯片微架构和编译软件栈。软件层面,英伟达的 GPU 提供了 CUDA(Compute Unified Device Architecture)编程接口,硬件层面,GPU 有很多个专用计算核心(CUDA Core)和 GPU 上的存储(GPU Memory)。通常,数据从主存到 GPU 存储之间搬运有一定时间成本。其他加速器与英伟达 GPU 整体结构也很相似。
在异构计算的框架下,CPU 和主存通常被称为主机(Host),各类专用的加速器被称为设备(Device)。尽管异构计算是一个很宽泛的概念,但当前基于 GPU 的异构计算是主流,尤其是以英伟达为代表的 GPU 占据了大量市场份额,所以这里主要以 GPU 为例介绍异构计算。GPU 有区别于 CPU 的芯片微架构和编译软件栈。软件层面,英伟达的 GPU 提供了 CUDA(Compute Unified Device Architecture)编程接口,硬件层面,GPU 有很多个专用计算核心(CUDA Core 或 Tensor Core)和 GPU 上的存储(GPU Memory)。通常,数据从主存到 GPU 存储之间搬运有一定时间成本。其他加速器与英伟达 GPU 整体结构也很相似。
4 changes: 2 additions & 2 deletions ch-intro/parallel-program-design.md
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ PCAM 方法

* 将矩阵切分成子矩阵,每个子矩阵 $m \times m$ 大小,在每台计算节点上执行 `max()` 函数求得子矩阵的最大值。
* 将每个子矩阵的最大值汇集到一个计算节点,再该节点再次执行一下 `max()` 求得整个矩阵的最大值。
* 子矩阵切分方式 $m \times m$ 可以在单个计算节点上运行,聚集值再次求最大值也可以在单给计算节点上运行
* $m \times m$ 大小的子矩阵 `max()` 可以在单个计算节点上运行。
* 将以上计算分发到多个节点。

## 切分方式
Expand All @@ -49,6 +49,6 @@ MapReduce 中主要涉及四个阶段:
* 切分(Split):将大数据切分成很多份小数据,每份小数据可以在单个 Worker 上计算。
* 映射(Map):对每个小数据执行 Map 操作,Map 是一个函数映射,程序员需要自定义 Map 函数,Map 函数输出一个键值对(Key-Value)。在词频统计的例子中,每出现一个词,计 1 次,Key 是词,Value 是 1,表示出现 1 次。
* 交换(Shuffle):将相同的 Key 归结到相同的 Worker 上。这一步涉及数据交换。词频统计的例子中,将相同的词发送到同一个 Worker 上。
* 聚合(Reduce):所有相同的 Key 进行聚合操作,程序员需要自定义 Reduce 函数。词频统计的例子中,之前 Shuffle 阶段将已经将数据归结到了一起,现在只需要将所有词频求和。
* 聚合(Reduce):所有相同的 Key 进行聚合操作,程序员需要自定义 Reduce 函数。词频统计的例子中,之前 Shuffle 阶段将已经将相同的 Key 归结到了一起,现在只需要将所有词频求和。

MapReduce 的编程范式深刻影响了 Apache Hadoop、Apache Spark、Dask 等开源项目。
2 changes: 1 addition & 1 deletion ch-intro/thread-process.md
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ CPU、GPU、网卡等都是被操作系统(Operating System,OS)管理的
width: 600px
name: mac-process
---
进程和线程
macOS 的活动监视器
```

技术层面上,操作系统管理所有进程的执行,为它们合理地分配资源:操作系统以进程为单位分配主存空间,每个进程都有自己的地址空间、数据栈等等。
Expand Down
6 changes: 3 additions & 3 deletions ch-mpi/collective.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@
"* 数据移动,比如 [`Comm.Bcast`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Bcast), [`Comm.Scatter`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Scatter),[`Comm.Gather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Gather)等。\n",
"* 集合计算,比如 [`Comm.Reduce`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Reduce), [`Intracomm.Scan`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Intracomm.html#mpi4py.MPI.Intracomm.Scan) 等。\n",
"\n",
"首字母大写的函数是基于缓存的,比如 `Comm.Bcast`, `Comm.Scatter`,`Comm.Gather`,[`Comm.Allgather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Allgather), [`Comm.Alltoall`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Alltoall)。首字母小写的函数可以收发 Python 对象,比如 [`Comm.bcast`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.bcast),[`Comm.scatter`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.scatter),[`Comm.gather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.gather),[`Comm.allgather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.allgather), [`Comm.alltoall`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.alltoall)。\n",
"首字母大写的函数是基于缓存的,比如 `Comm.Bcast`, `Comm.Scatter`,`Comm.Gather`,[`Comm.Allgather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Allgather) [`Comm.Alltoall`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Alltoall)。首字母小写的函数可以收发 Python 对象,比如 [`Comm.bcast`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.bcast),[`Comm.scatter`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.scatter),[`Comm.gather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.gather),[`Comm.allgather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.allgather) [`Comm.alltoall`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.alltoall)。\n",
"\n",
"## 同步\n",
"\n",
Expand Down Expand Up @@ -188,15 +188,15 @@
"\n",
"```{figure} ../img/ch-mpi/reduce.svg\n",
"---\n",
"width: 600px\n",
"width: 500px\n",
"name: mpi-reduce\n",
"---\n",
"Reduce\n",
"```\n",
"\n",
"```{figure} ../img/ch-mpi/scan.svg\n",
"---\n",
"width: 600px\n",
"width: 500px\n",
"name: mpi-scan\n",
"---\n",
"Scan\n",
Expand Down
6 changes: 2 additions & 4 deletions ch-mpi/mpi-intro.md
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
(mpi-intro)=
# MPI 简介

Message Passing Interface(MPI)是个经典的并行计算工具,由于它的“年龄”比较老,新一代程序员很少听说过这个“老古董”,也经常忽视其重要性。但随着人工智能大模型浪潮的到来,MPI 或者基于 MPI 思想的各类通讯库再次回到人们的视线内,因为大模型必须使用并行计算框架进行跨机通信。比如,大模型训练框架 deepspeed 就使用了 mpi4py 进行多机通信。
Message Passing Interface(MPI)是个经典的并行计算工具,由于它的“年龄”比较老,新一代程序员很少听说过这个“老古董”,也经常忽视其重要性。但随着人工智能大模型浪潮的到来,MPI 或者基于 MPI 思想的各类通讯库再次回到人们的视线内,因为大模型必须使用并行计算框架进行跨机通信。比如,大模型训练框架 [DeepSpeed](https://github.com/microsoft/DeepSpeed) 就使用了 mpi4py 进行多机通信。

## 历史

Expand Down Expand Up @@ -52,6 +52,4 @@ pip install mpi4py
```bash
conda install -c conda-forge mpich
conda install -c conda-forge mpi4py
```

大部分 MPI 程序均需要在命令行中先编译再拉起。为解决这个问题,我们还安装了 ipyparallel,可以在 Jupyter Notebook 中完成并行程序的拉起。
```
14 changes: 10 additions & 4 deletions ch-mpi/point-to-point.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -304,14 +304,14 @@
"\n",
"我们先分析一下阻塞式通信。`Send` 和 `Recv` 这两个基于缓存的方法:\n",
"\n",
"* `Send` 直到缓存是空的时候,也就是说缓存中的数据都被发送出去后,才返回(`return`),允许运行用户代码中剩下的业务逻辑。缓存区域可以被接下来其他的 `Send` 循环再利用。\n",
"* `Recv` 直到缓存区域数据到达,才返回(`return`),,允许运行用户代码中剩下的业务逻辑\n",
"* `Send` 直到缓存是空的时候,也就是说缓存中的数据都被发送出去后,才返回(`return`)。缓存区域可以被接下来其他的 `Send` 循环再利用。\n",
"* `Recv` 直到缓存区域填满,才返回(`return`)。\n",
"\n",
"如 {ref}`mpi-communications` 所示,阻塞通信是数据完成传输,才会返回(`return`),否则一直在等待。\n",
"\n",
"```{figure} ../img/ch-mpi/blocking.svg\n",
"---\n",
"width: 800px\n",
"width: 600px\n",
"name: blocking-communications\n",
"---\n",
"阻塞式通信示意图\n",
Expand All @@ -330,7 +330,13 @@
"\n",
"### 非阻塞\n",
"\n",
"非阻塞式通信调用后直接返回 [`Request`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Request.html#mpi4py.MPI.Request) 句柄(Handle),程序员接下来再对 `Request` 做处理,比如等待 `Request` 涉及的数据传输完毕。非阻塞式通信有大写的 i(I) 作为前缀, [`Irecv`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.irecv) 的函数参数与之前相差不大,只不过返回值是一个 `Request`。 `Request` 类提供了 `wait` 方法,显示地调用 `wait()` 可以等待数据传输完毕。用 `Isend` 写的阻塞式的代码,可以改为 `Isend` + [`Request.wait()`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Request.html#mpi4py.MPI.Request.wait) 以非阻塞方式实现。\n",
"相比之下,非阻塞通信不会等待数据传输的完成。非阻塞通信可以重叠通信和计算来增强性能,即网络侧完成通信任务,CPU 侧完成计算任务。[`isend`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.isend) 和 [`irecv`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.irecv) 可用于非阻塞通信:\n",
"\n",
"* `isend`:启动一个非阻塞发送操作并立即将控制返回给程序员,允许执行后续代码。\n",
"\n",
"* `irecv`:启动一个非阻塞接收操作并立即将控制返回给程序员,允许执行后续代码。\n",
"\n",
"非阻塞式通信调用后直接返回 [`Request`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Request.html#mpi4py.MPI.Request) 句柄(Handle),程序员接下来再对 `Request` 做处理,比如等待 `Request` 涉及的数据传输完毕。非阻塞式通信有大写的 i(I)或小写的 i 作为前缀,I 前缀的基于缓存,i 的没有。[`isend`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.isend) 的函数参数与 [`send`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.send) 相差不大,只不过 `isend` 返回值是一个 `Request`。 `Request` 类提供了 `wait` 方法,显示地调用 `wait()` 可以等待数据传输完毕。用 `send` 写的阻塞式的代码,可以改为 `Isend` + [`Request.wait()`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Request.html#mpi4py.MPI.Request.wait) 以非阻塞方式实现。\n",
"\n",
"{numref}`mpi-non-blocking` 展示了一个非阻塞式通信的例子。\n",
"\n",
Expand Down
16 changes: 15 additions & 1 deletion conf.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,21 @@
'repository_url': 'https://github.com/godaai/distributed-python',
'repository_branch': 'main',
'extra_footer': '',
'home_page_in_toc': True,
'home_page_in_toc': True,
'icon_links': [
{
"name": "English",
"url": "https://dp.godaai.org/", # required
"icon": "fa fa-language",
"type": "fontawesome",
},
{
"name": "GitHub",
"url": "https://github.com/godaai/distributed-python",
"icon": "https://img.shields.io/github/stars/godaai/distributed-python?style=for-the-badge",
"type": "url",
},
],
'announcement': "如果你觉得内容对你有帮助,请在 <a href=\"https://github.com/godaai/distributed-python\">GitHub</a> 上点个 star 吧!",
'analytics': {'google_analytics_id': ''},
'use_repository_button': True,
Expand Down
Loading

0 comments on commit a211326

Please sign in to comment.