diff --git a/ch-dask-dataframe/dask-case-study.ipynb b/ch-dask-dataframe/dask-case-study.ipynb index 80a3ab0..2031476 100644 --- a/ch-dask-dataframe/dask-case-study.ipynb +++ b/ch-dask-dataframe/dask-case-study.ipynb @@ -7,7 +7,7 @@ "(sec-dask-case-study)=\n", "# 基于 Dask 的数据分析案例\n", "\n", - "本节我们将介绍 2个基于 Dask 的数据分析案例。" + "本节我们将介绍 2个基于 Dask 的数据分析案例。首先导入必要的包,并创建 Dask 集群。" ] }, { diff --git a/ch-dask-dataframe/shuffle.ipynb b/ch-dask-dataframe/shuffle.ipynb index a087466..9e3fe22 100644 --- a/ch-dask-dataframe/shuffle.ipynb +++ b/ch-dask-dataframe/shuffle.ipynb @@ -20,7 +20,7 @@ "width: 800px\n", "name: fig-shuffle-tasks-p2p\n", "---\n", - "Dask 仪表盘\n", + "Dask Shuffle: tasks v.s. p2p\n", "```\n", "\n", "目前,Dask 提供了两类 Shuffle 实现策略:单机和分布式。\n", @@ -114,7 +114,7 @@ "|---\t|---\t|---\t|---\t|\n", "| [`DataFrame.set_index()`](https://docs.dask.org/en/latest/generated/dask_expr._collection.DataFrame.set_index.html) | 修改索引列,加速后续基于索引列的计算\t| 是 | 是\t|\n", "| [`DataFrame.repartition()`](https://docs.dask.org/en/latest/generated/dask_expr._collection.DataFrame.repartition.html) | 修改 Partition 数量,多用于数据倾斜场景 | 否\t| 是 |\n", - "| [`DataFrame.shuffle()`](https://docs.dask.org/en/latest/generated/dask_expr._collection.DataFrame.shuffle.html) | 将相同的值归结到同一个 Partition | 否 | 是 |\n", + "| [`DataFrame.shuffle()`](https://docs.dask.org/en/latest/generated/dask_expr._collection.DataFrame.shuffle.html) | 将 DataFrame 重新打散并排列 | 否 | 是 |\n", "```\n", "\n", "在 {numref}`sec-dask-dataframe-indexing` 我们提过,`set_index()` 将某字段设置为索引列,后续一系列计算非常依赖这个字段,`set_index()` 能显著加速后续计算。`repartition()` 主要解决数据倾斜的问题,即某些 Partiton 上的数据过大,过大的 Partition 有可能导致内存不足。" diff --git a/ch-dask-ml/index.md b/ch-dask-ml/index.md index 9e5ffa8..57cbd2e 100644 --- a/ch-dask-ml/index.md +++ b/ch-dask-ml/index.md @@ -8,7 +8,7 @@ * 训练数据和模型可放到单机内存中,超参数调优需要多机并行; * 训练数据无法放到单机内存中,需要进行分布式训练。 -一方面,Dask 社区将主要精力投入在 Dask DataFrame 上,对 Dask-ML 和分布式训练的优化并不多;另一方面,深度学习已经冲击传统机器学习算法,Dask 设计之初并不是面向深度学习的。读者阅读本章,了解 Dask 机器学习能力后,可以根据自身需求选择适合自己的框架。 +一方面,Dask 社区主要集中在 Dask DataFrame 的开发上,对于 Dask-ML 以及分布式训练的优化投入相对较少;另一方面,深度学习已经对传统机器学习算法产生了显著影响,而 Dask 在设计之初并未专门针对深度学习场景。读者在阅读本章并了解 Dask 在机器学习方面的能力之后,可以根据个人的实际需求选择最适合自己的框架。 ```{tableofcontents} ``` \ No newline at end of file diff --git a/ch-dask-ml/preprocessing.md b/ch-dask-ml/preprocessing.md index 7fe9a53..c5aeaa1 100644 --- a/ch-dask-ml/preprocessing.md +++ b/ch-dask-ml/preprocessing.md @@ -1,8 +1,8 @@ (sec-dask-ml-preprocessing)= # 数据预处理 -{numref}`sec-data-science-lifecycle` 我们提到过,数据科学工作的重点是理解数据和处理数据,Dask 可以将很多单机的任务横向扩展到集群上,并且可以和 Python 社区数据可视化等库结合,完成探索性数据分析。 +在 {numref}`sec-data-science-lifecycle` 中提到,数据科学工作的核心在于理解数据和处理数据。Dask 能够将许多单机任务扩展到集群上执行,并能与 Python 社区中的数据可视化等库结合,以完成探索性数据分析。 -分布式数据预处理部分更多依赖 Dask DataFrame 和 Dask Array 的能力,这里不再赘述。 +在分布式数据预处理方面,更多地依赖于 Dask DataFrame 和 Dask Array 的功能,这一点在此不再赘述。 -特征工程部分,Dask-ML 实现了很多 `sklearn.preprocessing` 的 API,比如 [`MinMaxScaler`](https://ml.dask.org/modules/generated/dask_ml.preprocessing.MinMaxScaler.html)。对 Dask 而言,稍有不同的是其独热编码,本书写作时,Dask 使用 [`DummyEncoder`](https://ml.dask.org/modules/generated/dask_ml.preprocessing.DummyEncoder.html) 对类别特征进行独热编码,`DummyEncoder` 是 scikit-learn `OneHotEncoder` 的 Dask 替代。我们将在 {numref}`sec-dask-ml-hyperparameter` 将展示一个类型特征的案例。 \ No newline at end of file +在特征工程部分,Dask-ML 实现了很多 `sklearn.preprocessing` 的 API,比如 [`MinMaxScaler`](https://ml.dask.org/modules/generated/dask_ml.preprocessing.MinMaxScaler.html)。对 Dask 来说,一个稍有不同的地方是其独热编码的实现。截至本书写作时,Dask 使用 [`DummyEncoder`](https://ml.dask.org/modules/generated/dask_ml.preprocessing.DummyEncoder.html) 对类别特征进行独热编码,`DummyEncoder` 是 scikit-learn `OneHotEncoder` 的 Dask 替代。我们将在 {numref}`sec-dask-ml-hyperparameter` 展示一个关于类型特征的案例。 \ No newline at end of file diff --git a/ch-data-science/data-science-lifecycle.ipynb b/ch-data-science/data-science-lifecycle.ipynb index ebcf6f6..d67888f 100644 --- a/ch-data-science/data-science-lifecycle.ipynb +++ b/ch-data-science/data-science-lifecycle.ipynb @@ -62912,7 +62912,7 @@ "网络上的开源数据往往已经被处理过,实际业务场景的数据准备难度比开源数据大很多。\n", "因此,数据准备是最耗时的步骤,它可能会占整个项目时间的 70%-90%,然而它是整个数据科学生命周期中最重要的步骤。\n", "\n", - "在房价预测这个场景下,当前我们使用的这个开源数据集的特征只有 8 列。为了更好地做这个任务,应该挖掘更多的信息,包括房屋面积等更详细的信息。如果能从其他数据源获取数据,并将这些数据融合,能显著提升最终要过,但数据融合需要进行数据清洗和对齐工作。\n", + "在房价预测这个场景下,当前我们使用的这个开源数据集的特征只有 8 列。为了更好地做这个任务,应该挖掘更多的信息,包括房屋面积等更详细的信息。如果能从其他数据源获取数据,并将这些数据融合,能显著提升准确度等模型性能,但数据融合需要进行数据清洗和对齐工作。\n", "\n", "数据的预处理非常依赖数据分析师的经验,这里不仔细展开讲解。现在我们只对数据进行切分,分为训练集和测试集。" ] @@ -67549,8 +67549,7 @@ "source": [ "## 部署\n", "\n", - "模型需要经过严格验证之后才能部署上线。在正式给所有用户使用之前,除了在离线测试之外,还需要进行线上灰度测试。离线测试,指的是只线下验证,基于历史数据,不将模型推送给线上用户;灰度测试,是在该项目正式推送给全量用户前,选择特定人群试用,根据效果逐步扩大试用者数量,由“灰”到“黑”。如果灰度测试效果不佳,还需要继续迭代,重复上面的各个步骤。\n", - "如果数据收集不正确,或数据清洗不合理,或离线测试评估指标有问题,模型都无法取得较好的线上效果。\n", + "模型在部署上线前必须经过严格的验证。在正式向所有用户开放使用之前,除了进行离线测试外,还应实施线上灰度测试。所谓离线测试,是指仅在非在线环境中进行验证,基于历史数据,不将模型推送给线上用户。灰度测试则是在项目正式全面推广前,选择特定用户群体进行试用,并根据反馈效果逐步增加试用者的数量,实现从“灰”到“全量”的过渡。如果灰度测试的结果不尽人意,需要继续迭代并重复上述所有步骤。此外,如果数据收集不准确,数据清洗处理不当,或离线测试的评估指标存在问题,模型在线上的实际效果也可能不会理想。\n", "\n", "## 小结\n", "\n", diff --git a/ch-data-science/hyperparameter.md b/ch-data-science/hyperparameter.md index 0407fc9..fc4bd68 100644 --- a/ch-data-science/hyperparameter.md +++ b/ch-data-science/hyperparameter.md @@ -21,13 +21,20 @@ name: fig-tune-algorithms * 网格搜索(Grid Search):网格搜索是一种穷举搜索方法,它通过遍历所有可能的超参数组合来寻找最优解,这些组合会逐一被用来训练和评估模型。网格搜索简单直观,但当超参数空间很大时,所需的计算成本会急剧增加。 * 随机搜索(Random Search):随机搜索不是遍历所有可能的组合,而是在解空间中随机选择超参数组合进行评估。这种方法的效率通常高于网格搜索,因为它不需要评估所有可能的组合,而是通过随机抽样来探索参数空间。随机搜索尤其适用于超参数空间非常大或维度很高的情况,它可以在较少的尝试中发现性能良好的超参数配置。然而,由于随机性的存在,随机搜索可能会错过一些局部最优解,因此可能需要更多的尝试次数来确保找到一个好的解。 -* 贝叶斯优化(Bayesian Optimization):贝叶斯优化是一种**迭代式**超参数搜索技术。它基于贝叶斯定理,概率模型来指导搜索最优超参数。这种方法的核心思想是构建一个贝叶斯模型,通常是高斯过程(Gaussian Process),来近似评估目标函数的未知部分。贝叶斯优化能够在有限的评估次数内,智能地选择最有希望的超参数组合进行尝试,特别适用于计算成本高昂的场景。 +* 自适应迭代式(Adaptive):自适应迭代式超参数搜索技术中最知名的是贝叶斯优化(Bayesian Optimization)。贝叶斯优化基于贝叶斯定理,利用概率模型来指导搜索最优超参数。这种方法的核心思想是构建一个贝叶斯模型,通常是高斯过程(Gaussian Process),来近似评估目标函数的未知部分。贝叶斯优化能够在有限的评估次数内,智能地选择最有希望的超参数组合进行尝试,特别适用于计算成本高昂的场景。 + +超参数调优属于一种黑盒优化过程。所谓黑盒优化,是指目标函数被视为一个黑盒,我们只能通过观察其输入和输出来推断其行为。虽然黑盒的概念可能有些难以理解,但可以与梯度下降算法进行比较。梯度下降算法不是一种黑盒优化算法,因为它能够获得目标函数的梯度(或其近似值),并利用这些梯度信息来指导搜索方向,从而找到目标函数的(局部)最优解。与此不同,黑盒优化算法通常无法获得目标函数的数学表达式或其梯度,因此无法应用基于梯度的优化技术。 + +贝叶斯优化、遗传算法、模拟退火等都是常见的黑盒优化方法。这些算法在超参数的搜索空间中选择一些候选解,运行目标函数以评估这些超参数组合的实际性能。基于这些实际性能的反馈,算法会不断迭代调整超参数,重复上述过程,直至满足终止条件。 -超参数调优是一种黑盒优化,所谓黑盒优化,指的是目标函数是一个黑盒,我们只能通过观察其输入和输出来推断其行为。黑盒的概念比较难以理解,但是我们可以相比梯度下降算法,梯度下降算法**不是**一种黑盒优化算法,我们可以得到目标函数的梯度(或近似值),并用梯度来指导搜索方向,最终找到目标函数的(局部)最优解。黑盒优化算法一般无法找到目标函数的数学表达式和梯度,也无法使用基于梯度的优化技术。贝叶斯优化、遗传算法、模拟退火等都是黑盒优化,这些算法通常在超参数搜索空间中选择一些候选解,运行目标函数,得到超参数组合的实际性能,基于实际性能,不断迭代调整,即重复上述过程,直到满足条件。 ### 贝叶斯优化 -贝叶斯优化基于贝叶斯定理,这里不深入探讨详细的数学公式。简单来说,它需要先掌握搜索空间中几个观测样本点(Observation)的实际性能,构建概率模型,描述每个超参数在每个取值点上模型性能指标的**均值**和**方差**。其中,均值代表这个点最终的期望效果,均值越大表示模型最终性能指标越大,方差表示这个点的不确定性,方差越大表示这个点不确定,值得去探索。{numref}`fig-bayesian-optimization-explained` 在一个 1 维超参数搜索空间中迭代 3 步的过程,虚线是目标函数的真实值,实线是预测值(或者叫后验概率分布均值),实线上下的蓝色区域为置信区间。贝叶斯优化利用了高斯回归过程,即目标函数是由一系列观测样本点所构成的随机过程,通过高斯概率模型来描述这个随机过程的概率分布。贝叶斯优化通过不断地收集观测样本点来更新目标函数的后验分布,直到后验分布基本贴合真实分布。对应 {numref}`fig-bayesian-optimization-explained` 中,进行第 3 次迭代之前只有两个观测样本点,经过第 3 次迭代和第 4 次迭代之后中增加了新的观测样本点,这几个样本点附近的预测值逐渐接近真实值。 +贝叶斯优化是一种基于贝叶斯定理的优化方法,这里不深入讨论其详细的数学公式。简而言之,贝叶斯优化首先需要在搜索空间中获取若干观测样本点(Observation)的实际性能数据,以构建一个概率模型。该模型用于描述每个超参数在不同取值点上模型性能指标的**均值**和**方差**。其中,均值代表该点的期望效果,均值越大,表明模型在该点的性能指标越高;方差则表示该点的不确定性,方差越大,说明该点的不确定性越高,因此更值得进一步探索。 + +{numref}`fig-bayesian-optimization-explained` 是在一维超参数搜索空间中迭代3步的过程,虚线表示目标函数的真实值,而实线代表预测值(或称为后验概率分布的均值)。实线上下的蓝色区域表示置信区间。贝叶斯优化利用高斯过程回归,即认为目标函数是由一系列观测样本点构成的随机过程,并通过高斯概率模型来描述这一随机过程的概率分布。 + +贝叶斯优化通过持续收集新的观测样本点来更新目标函数的后验分布,直至该后验分布与真实分布基本吻合。如 {numref}`fig-bayesian-optimization-explained` 所示,在进行第三次迭代之前,仅有两个观测样本点。随着第三次和第四次迭代的进行,新增的观测样本点使得这些点附近的预测值逐渐逼近真实值。 贝叶斯优化有两个核心概念: @@ -118,4 +125,6 @@ width: 600px name: fig-population-based-training --- PBT 训练中的利用和探索。利用指模型表现不理想时,将当前模型换成其他表现较好的参数权重;探索指变异生成新的超参数。 -``` \ No newline at end of file +``` + +当某个模型的性能表现未达到预期时,可以考虑替换当前模型的参数权重,采用那些表现更佳的参数权重。同时,为了进一步探索和优化模型,可以通过调整超参数来进行变异,以生成新的超参数组合。 \ No newline at end of file diff --git a/ch-mpi-large-model/data-parallel.md b/ch-mpi-large-model/data-parallel.md index 39fe072..e9d6b3c 100644 --- a/ch-mpi-large-model/data-parallel.md +++ b/ch-mpi-large-model/data-parallel.md @@ -1,7 +1,7 @@ (sec-data-parallel)= # 数据并行 -数据并行是一种最常见的大模型并行方法,相对其他并行,数据并行最简单。如 {numref}`fig-data-parallel-img` 所示,模型被拷贝到不同的 GPU 设备上,训练数据被切分为多份,每份分给不同的 GPU 进行训练。这种编程范式又被称为单程序多数据(Single Program Multiple Data,SPMD)。 +数据并行是大模型并行方法中最常见的一种,并且相对于其他并行方法来说,数据并行在实现上更为简单直观。如 {numref}`fig-data-parallel-img` 所示,模型的副本被加载到不同的 GPU 设备上,而训练数据则被分割成多个份,每个份由不同的 GPU 独立进行训练。这种编程模式被称为单程序多数据(Single Program Multiple Data, SPMD)。 ```{figure} ../img/ch-mpi-large-model/data-parallel.svg --- @@ -13,7 +13,7 @@ name: fig-data-parallel-img ## 非并行训练 -{numref}`sec-machine-learning-intro` 介绍了神经网络模型训练的过程。我们先从非并行的场景开始,这里使用 MNIST 手写数字识别案例来演示,如 {numref}`fig-data-parallel-single` 所示,它包含了一次前向传播和一次反向传播。 +{numref}`sec-machine-learning-intro` 介绍了神经网络模型训练的过程。我们首先从非并行的场景开始讨论,在这一场景中,使用 MNIST 手写数字识别案例进行演示。如 {numref}`fig-data-parallel-single` 所示,该案例展示了一次前向传播和一次反向传播的过程。 ```{figure} ../img/ch-mpi-large-model/data-parallel-single.svg --- @@ -25,8 +25,7 @@ name: fig-data-parallel-single ## 数据并行 -数据并行将数据集切分成多份,模型权重在不同 GPU 上拷贝一份。如 {numref}`fig-data-parallel-distributed` 所示,有两块 GPU,在每块 GPU 上,有拷贝的模型权重和被切分的输入数据集;每块 GPU 上**独立**进行前向传播和反向传播,即前向传播计算每层的输出值,反向传播计算模型权重的梯度,两块 GPU 上的计算互不影响。 - +数据并行技术涉及将数据集分割成多份,并在不同的 GPU 上复制模型权重。如 {numref}`fig-data-parallel-distributed` 所示,假设有两块 GPU,每块 GPU 上都有一个模型权重的副本以及相应切分的输入数据子集。在每块 GPU 上,都会**独立**进行前向传播和反向传播的过程:前向传播负责计算每层的输出值,而反向传播则用于计算模型权重的梯度。这些计算在不同的 GPU 之间是相互独立的,互不干扰。 ```{figure} ../img/ch-mpi-large-model/data-parallel-distributed.svg --- @@ -44,9 +43,9 @@ $$ } $$ -同步不同 GPU 上的梯度,可以使用 MPI 提供的 `AllReduce` 原语。MPI 的 `AllReduce` 将每块 GPU 上分别计算得到的梯度收集起来,计算平均后,再将更新后的梯度重新分发给各块 GPU。 +同步不同 GPU 上的梯度,可以使用 MPI 提供的 `AllReduce` 原语。MPI 的 `AllReduce` 将每块 GPU 上独立计算得到的梯度收集起来,进行平均计算,然后将计算得到的平均梯度广播回各块 GPU。 -如 {numref}`fig-data-parallel-all-reduce` 所示,梯度同步阶段,MPI 的 `AllReduce` 原语将各 GPU 上的梯度进行同步。 +如 {numref}`fig-data-parallel-all-reduce` 所示,在梯度同步阶段,MPI 的 `AllReduce` 原语确保了各 GPU 上梯度的一致性。 ```{figure} ../img/ch-mpi-large-model/data-parallel-all-reduce.svg --- diff --git a/ch-mpi-large-model/index.md b/ch-mpi-large-model/index.md index 9e57d5b..f483a90 100644 --- a/ch-mpi-large-model/index.md +++ b/ch-mpi-large-model/index.md @@ -1,12 +1,12 @@ # MPI 与大模型 -本章主要解释大模型的并行方法。大模型指的是神经网络的参数量很大,必须并行地进行训练和推理。大模型并行有如下特点: +本章主要解释大模型的并行方法。所谓大模型,指的是参数量庞大的神经网络,它们必须通过并行方式进行训练和推理。大模型并行具有以下几个特点: -* 计算运行在 GPU 这样的加速卡上; -* 加速卡非常昂贵,应尽量提高加速卡的利用率; -* 模型参数量大,无论是训练还是推理,可能有大量数据需要在加速卡之间传输,对带宽和延迟的要求都很高。 +* 计算运行在 GPU 这样的加速卡上,这些硬件专为提高计算效率而设计。; +* 加速卡的成本非常高昂,因此应努力提高其利用率,确保投资的回报。 +* 由于模型参数量巨大,在训练或推理过程中,可能需要在加速卡之间传输大量数据,这要求有很高的带宽和低延迟以保证效率。 -本章主要从概念和原理上进行解读,具体的实现可参考其他论文和开源库。 +本章将从概念和原理上进行详细解读,而具体的实现细节可以参考其他学术论文和开源库。 ```{tableofcontents} ``` \ No newline at end of file diff --git a/ch-mpi-large-model/nccl.md b/ch-mpi-large-model/nccl.md index a833584..eaf1e4a 100644 --- a/ch-mpi-large-model/nccl.md +++ b/ch-mpi-large-model/nccl.md @@ -1,6 +1,7 @@ # NCCL 简介 -随着 GPU 被广泛应用高性能计算各个领域,尤其是深度学习和大模型对延迟和带宽要求越来越高,如何在多个 GPU 之间高效传输数据愈发重要。GPU 编程中,数据需要从 CPU 和 GPU 之间相互拷贝,而 MPI 最初为 CPU 集群设计,无法适应大规模 GPU 集群的场景。比如,传统的 MPI 是进程间数据传输,而单台 GPU 服务器通常有多张 GPU 卡,单机多卡之间也要进行大量数据传输。又比如 {numref}`fig-mpi-wo-gpu-direct` 中,多个节点之间的 GPU 通信要经过以下步骤:数据从 GPU 拷贝到 CPU,再通过 MPI 相互发送数据。为了适应大规模 GPU 集群和深度学习应用,英伟达设计了英伟达集合通讯库(NVIDIA Collective Communications Library,NCCL),旨在解决 MPI 在大规模 GPU 集群上无法完成的各类问题。 +随着 GPU 在高性能计算领域的广泛应用,尤其是在深度学习和大型模型训练中,对延迟和带宽的要求日益严格,因此,实现多个 GPU 之间高效的数据传输变得越发重要。在 GPU 编程中,数据经常需要在 CPU 和 GPU 之间进行复制。然而,MPI 最初是为 CPU 集群设计的,并不完全适应大规模 GPU 集群的应用场景。 +例如,在传统的 MPI 中,数据传输是在进程间进行的。但在单机多 GPU 服务器上,通常需要在多张 GPU 卡之间进行大量的数据传输。如 {numref}`fig-mpi-wo-gpu-direct` 所示,多个节点上的 GPU 通信需要经过以下步骤:首先将数据从 GPU 复制到 CPU,然后通过 MPI 进行数据的发送和接收。为了更好地适应大规模 GPU 集群和深度学习应用的需求,NVIDIA 设计了 NVIDIA 集合通信库(NVIDIA Collective Communications Library, NCCL),它旨在解决 MPI 在大规模 GPU 集群上遇到的各种问题。 ```{figure} ../img/ch-mpi-large-model/mpi-wo-gpu-direct.svg --- @@ -10,7 +11,7 @@ name: fig-mpi-wo-gpu-direct 先将数据从 GPU 拷贝到 CPU,再进行通信 ``` -MPI 与 NCCL 并不是完全是替代关系,NCCL 的很多通信原语,比如点对点通信和集合通信都借鉴了 MPI,可以说 NCCL 是在 MPI 基础上做的延展,更适合 GPU 集群。{numref}`fig-gpu-communication` 展示了 NCCL 实现的通信原语。 +MPI 与 NCCL 并不是完全的替代关系。NCCL 的许多通信原语,例如点对点通信和集合通信,都受到了 MPI 的影响。可以说,NCCL 是在 MPI 的基础上进行的扩展,它更加适合 GPU 集群环境。{numref}`fig-gpu-communication` 展示了 NCCL 实现的通信原语。 ```{figure} ../img/ch-mpi-large-model/gpu-communication.svg --- @@ -25,7 +26,6 @@ NCCL 实现了常见的通信原语 * AMD 提供了针对 ROCm 的 RCCL(ROCm Communication Collectives Library) * 华为提供了 HCCL(Huawei Collective Communication Library) +这些集合通信库是为特定硬件量身定制的,目的是解决特定集群环境中的通信挑战。 -这些集合通信库都是针对特定硬件的通信库,旨在解决特定集群的通信问题。 - -NCCL 主要提供了 C/C++ 编程接口,Python 社区如果使用的话,可以考虑 PyTorch 的 `torch.distributed`。NCCL 也是 PyTorch 推荐的 GPU 并行计算后端。本书不再细致讲解 `torch.distributed` 的使用,而是继续用 MPI 来演示大模型训练和推理过程中涉及的各类通信问题。 \ No newline at end of file +NCCL 主要提供 C/C++ 编程接口。对于 Python 社区的开发者,如果需要使用 NCCL,可以考虑 PyTorch 的 `torch.distributed` 库。NCCL 也是 PyTorch 推荐的 GPU 并行计算后端。本书不会深入讲解 `torch.distributed` 的使用细节,而是继续使用 MPI 来展示大模型训练和推理过程中涉及的各种通信问题。 \ No newline at end of file diff --git a/ch-mpi-large-model/pipeline-parallel.md b/ch-mpi-large-model/pipeline-parallel.md index 34fca17..9f3394f 100644 --- a/ch-mpi-large-model/pipeline-parallel.md +++ b/ch-mpi-large-model/pipeline-parallel.md @@ -1,7 +1,7 @@ (sec-pipeline-parallel)= # 流水线并行 -流水线并行是另外一种常见的大模型并行方法。数据并行将模型权重在每个 GPU 上拷贝一份,如果模型大小没有超过单块 GPU 显存大小,数据并行是最简单易用的选项。但现在的模型大到已经无法放在单块 GPU 上,比如 175B 的 GPT-3,如果用 FP16 存储,也需要 350GB 存储空间,而单块 NVIDIA A100 和 H100 为 80GB。流水线并行可以解决这个问题,它将大模型的不同层切分到不同的 GPU 上。其核心思想如 {numref}`fig-pipeline-parallel-img` 所示。 +流水线并行是另一种常见的大型模型并行方法。当模型大小没有超过单个 GPU 显存容量时,数据并行通过在每个 GPU 上复制一份模型权重,成为最简单易用的选项。然而,现代的模型已经变得如此庞大,以至于无法放入单块 GPU 中,例如拥有 175B 参数的 GPT-3。即使使用 FP16 格式存储,也需要 350GB 的存储空间,而单块 NVIDIA A100 或 H100 GPU 的显存仅为 80GB。流水线并行提供了解决这一问题的方案,它通过将大模型的不同层分配到不同的 GPU 上来实现。这一核心思想在 {numref}`fig-pipeline-parallel-img` 中有详细展示。 ```{figure} ../img/ch-mpi-large-model/pipeline-parallel.svg --- @@ -41,7 +41,9 @@ name: fig-pipeline-parallel-distributed ## 流水线并行 + 数据并行 -流水线并行与数据并行是相互正交的,两者可以结合起来同时使用。由于两种并行是正交的,互不干扰,为避免数据传输错乱,应使用 MPI 的 Communicator 来做隔离。在 {numref}`sec-mpi-hello-world` 中我们曾经提到,Communicator 可以被理解为 MPI 中的组,同一个 GPU 可以在不同的 Communicator 中。如 {numref}`fig-pipeline-parallel-data-parallel` 所示,我们创建了两类 Communicator:红色为流水线并行的 Communicator,蓝色为数据并行的 Communicator。同一个 GPU 既属于红色,也属于蓝色:既要实现流水线并行中模型层之间的通信,也要实现数据并行的梯度同步。 +流水线并行与数据并行是两种互不干扰的方法,它们可以结合使用以提高计算效率。为了避免在数据传输过程中出现错乱,应使用 MPI 的 Communicator 来进行隔离和协调。正如我们在 {numref}`sec-mpi-hello-world` 中提到的,Communicator 在 MPI 中可以被理解为一个通信组,允许同一个 GPU 参与到多个不同的 Communicator 中。 + +如 {numref}`fig-pipeline-parallel-data-parallel` 所示,我们创建了两种类型的 Communicator:红色的用于流水线并行,而蓝色的用于数据并行。同一个 GPU 可以同时属于这两个 Communicator,这样它既能处理流水线并行中模型层之间的通信,也能参与数据并行中的梯度同步。 ```{figure} ../img/ch-mpi-large-model/pipeline-parallel-data-parallel.svg --- diff --git a/ch-mpi/collective.ipynb b/ch-mpi/collective.ipynb index b813932..962a433 100644 --- a/ch-mpi/collective.ipynb +++ b/ch-mpi/collective.ipynb @@ -7,19 +7,19 @@ "(sec-mpi-collective)=\n", "# 集合通信\n", "\n", - "{ref}`sec-mpi-point2point` 介绍了点对点通信,即发送方和接收方之间相互传输数据。本节主要介绍一种全局的通信方式:集合通信,即在一个组里的多个进程间同时传输数据。集合通信目前只有阻塞的方式。\n", + "{ref}`sec-mpi-point2point` 介绍了点对点通信,本节主要介绍一种全局的通信方式:集合通信,它允许在组内的多个进程之间同时进行数据传输。目前,集合通信仅支持阻塞模式。\n", "\n", - "常用的集合通信主要有以下几类:\n", + "集合通信主要包括以下几类:\n", "\n", - "* 同步,比如 [`Comm.Barrier`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Barrier)。\n", + "* 同步操作,比如 [`Comm.Barrier`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Barrier)。\n", "* 数据移动,比如 [`Comm.Bcast`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Bcast), [`Comm.Scatter`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Scatter),[`Comm.Gather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Gather)等。\n", "* 集合计算,比如 [`Comm.Reduce`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Reduce), [`Intracomm.Scan`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Intracomm.html#mpi4py.MPI.Intracomm.Scan) 等。\n", "\n", - "首字母大写的函数是基于缓存的,比如 `Comm.Bcast`, `Comm.Scatter`,`Comm.Gather`,[`Comm.Allgather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Allgather), [`Comm.Alltoall`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Alltoall)。首字母小写的函数可以收发 Python 对象,比如 [`Comm.bcast`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.bcast),[`Comm.scatter`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.scatter),[`Comm.gather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.gather),[`Comm.allgather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.allgather), [`Comm.alltoall`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.alltoall)。\n", + "首字母大写的函数名表示这些操作是基于缓冲区的,比如 `Comm.Bcast`, `Comm.Scatter`,`Comm.Gather`,[`Comm.Allgather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Allgather), [`Comm.Alltoall`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Alltoall)。而首字母小写的函数名表示这些操作可以传输 Python 对象,比如 [`Comm.bcast`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.bcast),[`Comm.scatter`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.scatter),[`Comm.gather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.gather),[`Comm.allgather`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.allgather), [`Comm.alltoall`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.alltoall)。\n", "\n", "## 同步\n", "\n", - "MPI 的计算分布在多个进程,每个进程的计算速度有快有慢。`Comm.Barrier` 对 Communicator 里所有进程都执行同步等待,就像 Barrier 的英文名一样,设置一个屏障,等待所有进程都执行完。计算比较快的进程们达到 `Comm.Barrier`,无法执行 `Comm.Barrier` 之后的计算逻辑,必须等待其他所有进程都运行到 `Comm.Barrier` 才行。\n", + "MPI 计算分布在多个进程中,而这些进程的计算速度可能各不相同。`Comm.Barrier` 对 Communicator 里所有进程都执行同步等待,正如 \"Barrier\" 这个英文名所暗示的,它相当于设置了一个障碍,要求所有进程到达这一点后才能继续执行。计算速度快的进程在到达 `Comm.Barrier()`,不能继续执行 `Comm.Barrier()` 之后的计算逻辑,它们必须等待其他所有进程也到达这一点。\n", "\n", "## 数据移动\n", "\n", diff --git a/ch-mpi/mpi-hello-world.ipynb b/ch-mpi/mpi-hello-world.ipynb index 46ca72a..6fc512a 100644 --- a/ch-mpi/mpi-hello-world.ipynb +++ b/ch-mpi/mpi-hello-world.ipynb @@ -24,10 +24,12 @@ "\n", "## World 和 Rank\n", "\n", - "在进行 MPI 编程时,进程之间要互相通信,我们首先要解决两个问题:在 MPI 的众多进程中,“我是谁?”,除了我,“还有谁?”。MPI 标准中, [`MPI_Comm_rank`](https://learn.microsoft.com/en-us/message-passing-interface/mpi-comm-rank-function) 定义了我是谁。[`MPI_COMM_WORLD`](https://learn.microsoft.com/en-us/message-passing-interface/mpi-comm-size-function) 定义了还有谁。 开始编写一个 MPI 程序时,要先定义一个世界(World),这个 World 有 `size` 个进程,每个进程有一个识别自己的号码,这个号码被称为 Rank,Rank 是 0 到 `size - 1` 的整数。更严肃地阐述:\n", + "在进行 MPI 编程时,进程间通信需要首先解决两个基本问题:在MPI进程群中,如何识别“我是谁?”以及“除了我,还有哪些进程?”。MPI 中,[`MPI_Comm_rank`](https://learn.microsoft.com/en-us/message-passing-interface/mpi-comm-rank-function) 定义了每个进程的身份,即“我是谁?”;而 [`MPI_COMM_WORLD`](https://learn.microsoft.com/en-us/message-passing-interface/mpi-comm-size-function) 定义了参与通信的所有进程集合,即“还有谁?”。编写 MPI 程序的第一步是创建一个通信域,称为“World”,它包含 `size` 个进程,每个进程都有一个唯一的识别码,即 Rank,其值取值范围为 0~ `size-1`。\n", "\n", - "* MPI 中的 World 是指所有参与并行计算的进程的总集合。在一个 MPI 程序中,所有的进程都属于一个默认的通信域,这个通信域就被称为 `MPI_COMM_WORLD`。所有在这个通信域中的进程都可以进行通信。\n", - "* World 中的每个进程都有一个唯一的 Rank,Rank 用来标识进程在通信域中的位置。由于每个进程有自己的 Rank 号码,程序员可以控制 Rank 为 0 的进程发送数据给 Rank 为 1 的进程。\n", + "更严肃地表述:\n", + "\n", + "* 在 MPI 中, World 是指所有参与并行计算的进程的集合。在一个 MPI 程序中,所有进程默认属于一个通信域,即 `MPI_COMM_WORLD`。在这个通信域内,所有进程都能够相互通信。\n", + "* World 中的每个进程都有一个唯一的 Rank,Rank 用来标识该进程在通信域中的位置。由于每个进程有自己的 Rank 号码,程序员可以控制 Rank 为 0 的进程发送数据给 Rank 为 1 的进程。\n", "\n", "## 案例:Hello World\n", "\n", @@ -46,9 +48,9 @@ "comm.Barrier()\n", "```\n", "\n", - "在这段程序中,`print()` 是在单个进程内执行的,打印出当前进程的 Rank 和主机名。[`comm.Barrier()`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Barrier) 将每个进程做了阻断,直到所有进程都到达 `comm.Barrier()` 时,才会进行 `comm.Barrier()` 之后操作。本例中,`comm.Barrier()` 后无其他操作,程序将退出。\n", + "在这段程序中,`print()` 在单个进程内执行的,用于打印出该进程的 Rank 和主机名。[`comm.Barrier()`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.Barrier) 确保所有进程同步,即它将每个进程暂时阻塞,直到所有进程都到达该同步点,之后才会继续执行 `comm.Barrier()` 之后的代码。在本例中,`comm.Barrier()` 之后没有其他操作,因此程序将在此之后退出。\n", "\n", - "如果在个人电脑上,启动 8 个进程,在命令行中执行:" + "如果在个人计算机上,启动 8 个进程,可以通过命令行执行以下命令:" ] }, { @@ -79,22 +81,22 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "不同厂商的 `mpiexec` 的参数会有一些区别。相比 C/C++ 或 Fortran,mpi4py 的方便之处在于不需要使用 `mpicc` 编译器进行编译,直接执行即可。\n", + "不同厂商的 `mpiexec` 在参数上可能会有所区别。与 C/C++ 或 Fortran 不同,使用 mpi4py 的便利之处在于它不需要使用 `mpicc` 编译器进行编译,而是可以直接执行。\n", "\n", - "如果有一个集群,且集群挂载了一个共享的文件系统,即集群上每个节点上的特定目录的内容是一样的,源代码 `hello.py` 和安装的 MPI `mpiexec` 是一模一样的。可以这样拉起多个 MPI 进程:\n", + "如果你有一个集群,该集群挂载了一个共享文件系统,即集群上每个节点上的挂载目录的内容是相同的,源代码 `hello.py` 和安装的 MPI `mpiexec` 也是完全一致的。可以通过以下命令启动多个 MPI 进程:\n", "\n", "```bash\n", "mpiexec –hosts h1:4,h2:4,h3:4,h4:4 –n 16 python hello.py\n", "```\n", "\n", - "这个启动命令一共在 16 个进程上执行,16 个进程分布在 4 个计算节点上,每个节点使用了 4 个进程。如果节点比较多,还可以单独编写一个节点信息文件,比如命名为 `hf`,内容为:\n", + "该启动命令将在 16 个进程上执行,16 个进程分布在 4 个计算节点上,每个节点运行了 4 个进程。如果节点数量较多,可以创建一个包含节点信息的文件,例如命名为 `hf`,其内容如下:\n", "\n", "```\n", "h1:8\n", "h2:8\n", "```\n", "\n", - "这样执行:\n", + "然后执行以下命令:\n", "\n", "```\n", "mpiexec –hostfile hf –n 16 python hello.py\n", @@ -102,9 +104,9 @@ "\n", "## Communicator\n", "\n", - "刚才我们提到了 World 的概念,并使用了 `MPI_COMM_WORLD`,更准确地说,`MPI_COMM_WORLD` 是一个 Communicator。MPI 将进程划分到不同的组(Group)中,每个 Group 有不同的 Color,Group 和 Color 共同组成了 Communicator,或者说 Communicator 是 Group + Color,一个默认的 Communicator 就是 `MPI_COMM_WORLD`。\n", + "刚才我们提到了 World 的概念,并使用了 `MPI_COMM_WORLD`,更准确地说,`MPI_COMM_WORLD` 是一个通信器(Communicator)。MPI 将进程划分到不同的组(Group)中,每个 Group 有不同的 Color,Group 和 Color 共同组成了 Communicator,或者说 Communicator 是 Group + Color,一个默认的 Communicator 就是 `MPI_COMM_WORLD`。\n", "\n", - "对于一个进程,它可能在不同的 Communicator 中,因此它在不同 Communicator 中的 Rank 可能也不一样。{numref}`fig-mpi-communicatitor` (a) 、(b) 和(c)为三个 Communicator,圆圈为进程。当我们启动一个 MPI 程序时,就创建了默认的 Communicator(`MPI_COMM_WORLD`),如 {numref}`fig-mpi-communicatitor` (a) 所示。每个进程在 Communicator 内被分配了一个 Rank 号码,图中圆圈上的数字是进程在这个 Communicator 中的 Rank。同样的进程可以被划归到不同的 Communicator 中,且相同的进程在不同 Communicator 的 Rank 可以不一样,如 {numref}`fig-mpi-communicatitor` (b)和(c)所示。每个 Communicator 内进程通信是相互独立的。大部分简单的程序,使用默认的 `MPI_COMM_WORLD` 就足够了。\n", + "对于一个进程,它可能属于多个 Communicator,因此在不同 Communicator 中的 Rank 也可能不同。{numref}`fig-mpi-communicatitor` (a)、(b)和(c)展示了三个不同的 Communicator,其中圆圈代表进程。当我们启动一个 MPI 程序时,就自动创建了默认的 Communicator:`MPI_COMM_WORLD`,如 {numref}`fig-mpi-communicatitor` (a)所示。在每个 Communicator 内,每个进程都被分配了一个唯一的 Rank,图中圆圈上的数字表示进程在该 Communicator 中的 Rank。同一个进程可以属于不同的 Communicator,并且在不同 Communicator 中的 Rank 也可能不同,如 {numref}`fig-mpi-communicatitor` (b)和(c)所示。每个 Communicator 内的进程通信是相互独立的。对于大多数简单的程序而言,使用默认的MPI_COMM_WORLD已经足够。\n", "\n", "```{figure} ../img/ch-mpi/communicator.svg\n", "---\n", diff --git a/ch-mpi/mpi-intro.md b/ch-mpi/mpi-intro.md index 1e9a446..5c14bbe 100644 --- a/ch-mpi/mpi-intro.md +++ b/ch-mpi/mpi-intro.md @@ -1,35 +1,39 @@ (sec-mpi-intro)= # MPI 简介 -Message Passing Interface(MPI)是个经典的并行计算工具,由于它的“年龄”比较老,新一代程序员很少听说过这个“老古董”,也经常忽视其重要性。但随着人工智能大模型浪潮的到来,MPI 或者基于 MPI 思想的各类通讯库再次回到人们的视线内,因为大模型必须使用并行计算框架进行跨机通信。比如,大模型训练框架 [DeepSpeed](https://github.com/microsoft/DeepSpeed) 就使用了 mpi4py 进行多机通信。 +Message Passing Interface(MPI)是一种经典的并行计算工具,它允许多个计算节点进行高效的数据交换。尽管 MPI 诞生已久,新一代程序员可能不太熟悉这项技术,但它的重要性不容忽视。随着人工智能领域大型模型的发展,MPI 及其思想衍生的通信库再次成为研究的焦点,因为这些大型模型的训练和运行往往需要并行计算框架来实现跨机器的通信。例如,[DeepSpeed](https://github.com/microsoft/DeepSpeed) 这样的大型模型训练框架就利用mpi4py 库来实现多机通信。 ## 历史 -MPI 的发展可以追溯到20世纪80年代末和90年代初,彼时已经开始出现了超级计算机,主要用于科学和工程计算,包括气象模拟、核能研究、分子建模、流体动力学等领域。超级计算机主要是一组高性能计算机组成的集群,服务科学和工程计算问题,使得程序得以在多台计算机上并行运行。MPI 出现之前,多个研究小组和机构开始独立开发并推广自己的通信库,但这导致了互操作性和可移植性的问题。因此,社区迫切需要一种标准化的方法来编写并行应用程序。 +MPI 的发展起源于20世纪80年代末和90年代初,正值超级计算机的兴起。这些超级计算机主要用于科学和工程计算,覆盖气象模拟、核能研究、分子建模、流体动力学等领域。超级计算机本质上是由一组高性能计算机组成的集群,它们共同服务于科学和工程计算问题,使得程序能够在多台计算机上并行运行。 -1992年,图灵奖得主 Jack Dongarra 联合几位学者提出了并行计算第一个草案:MPI1。第一个标准版本 MPI 1.0 最终于 1994 年发布。之后,MPI-2、MPI-3 接连发布,来自学术界和工业界的多位专家共同参与修订,不断根据最新的并行计算需求修改 MPI 标准。 +在 MPI 出现之前,多个研究小组和机构已经开始独立开发并推广自己的通信库。然而,这种做法导致了互操作性和可移植性的问题。因此,计算社区迫切需要一种标准化的方法来编写并行应用程序。 + +1992年,图灵奖得主 Jack Dongarra 与其他几位学者联合提出了并行计算的第一个草案,即 MPI1。第一个标准版本 MPI1.0 最终在1994年发布。随后,为了适应并行计算的不断发展和需求,MPI-2 和 MPI-3 标准也相继发布。这些标准的修订得到了来自学术界和工业界多位专家的共同参与,他们不断地根据最新的并行计算需求对MPI标准进行更新和完善。 ## 标准与实现 -MPI 是一个标准,不是一个编译器或者编程语言,也不是一个具体的实现或者产品。像 Dask、Ray 这样的框架是一个具体的实现,而 MPI 不一样,MPI 是一个标准,不同厂商在这个标准下可以有自己的实现。“标准”的意思是说,MPI 定义了一些标准的函数或方法,所有的厂商都需要遵循;“实现”是说,不同软硬件厂商可以根据标准去实现底层通信。比如,如果实现一个发送数据的需求,MPI 标准中定义了 `MPI_Send` 方法,所有厂商应遵循这个标准。 +MPI 是一个并行计算的标准,它不是编译器、编程语言,也不是具体的实现或产品。像 Dask、Ray 这样的框架是一个具体的实现。MPI 的区别在于它是一个标准,不同的厂商可以在这一标准下开发自己的实现。“标准”意味着 MPI 定义了一系列的标准函数和方法,所有厂商都必须遵循这些定义;而“实现”则指不同软硬件厂商基于这一标准实现具体的底层通信机制。 -MPI 标准定义了: +例如,若要实现发送数据的功能,MPI 标准中规定了 `MPI_Send` 方法,所有厂商都应按照这一标准来实现该方法。 + +MPI 标准具体定义了以下内容: -* 每个函数的函数名、参数列表; -* 每个函数的语义,或者说每个函数的预期功能,又或者说每个函数能做什么、不能做什么。 +* 每个函数的函数名和参数列表; +* 每个函数的语义,即预期功能,包括函数的作用和限制。 -在具体实现上,现在常用的有 Open MPI 、MPICH、Intel MPI、Microsoft MPI 和 NVIDIA HPC-X 等。由于 MPI 是标准,因此,同样一份代码,可以被 OpenMPI 编译,也可以被 Intel MPI 编译。每个实现是由特定的厂商或开源社区开发的,因此使用起来也有一些差异。 +在具体实现上, 目前常用的MPI实现包括 Open MPI、MPICH、Intel MPI、Microsoft MPI 和 NVIDIA HPC-X 等。由于 MPI 是一个标准,同一段代码可以被不同的 MPI 实现所编译,比如 OpenMPI 或 Intel MPI。每个实现都是由特定的厂商或开源社区开发的,因此在实际使用中也会存在一些差异。 ## 高速网络 -如果进行多机并行,机器之间需要有高速互联网络。如果你的集群已经部署了这些硬件,并安装了某个 MPI 实现,MPI 可以充分利用这些高速网络的高带宽、低延迟的特性。这些网络大部分拥有超过 100Gbps 的带宽,但其价格也非常昂贵,通常在面向高性能计算的场景上才会配置这些网络设备。数据中心经常部署的普通万兆网络,带宽在 10Gbps 量级。在万兆网络上也可以使用 MPI 也会有一定加速效果。MPI 也可以在单机上运行,即:利用单台节点上的多个计算核心。 +进行多机并行计算时,机器之间需要高速互联网络来实现高效的数据交换。如果你的集群已经配备了这些硬件,并且已经安装了某个 MPI 实现,那么 MPI 可以充分利用这些高速网络的高带宽和低延迟特性。这些网络通常拥有超过 100Gbps 的带宽,但它们的价格也非常昂贵,因此通常只在高性能计算场景中才会配置这些网络设备。数据中心常见的万兆以太网,带宽在 10Gbps 量级,也可以部署 MPI,并且在使用 MPI 时仍然可以获得一定的加速效果。此外,MPI还可以在单机上运行,这意味着它可以利用单个节点上的多个计算核心来执行并行任务。 ## 安装 -刚才提到,MPI 有不同的实现,即不同的 MPI 厂商一般会提供: +MPI 有多种实现,不同的 MPI 厂商一般会提供以下工具: -* 编译器 `mpicc`、`mpicxx` 和 `mpifort`,分别用来编译 C、C++、Fortran 语言源代码,源代码中一部分是多机通讯,一部分是单机计算,这些编译器通常将多机通讯与单机计算的代码一起编译,并生成可执行文件。 -* 执行并行程序的 `mpirun` 或 `mpiexec`。`mpiexec` 可以在多台节点拉起多个 MPI 进程。 +* 编译器 `mpicc`、`mpicxx` 和 `mpifort`,分别用来编译 C、C++、Fortran 语言源代码。源代码中一部分涉及多机通讯,一部分是单机计算,这些编译器通常将多机通讯与单机计算的代码一起编译,并生成可执行文件。 +* 用于执行并行程序的 `mpirun` 或 `mpiexec`。`mpiexec` 可以在多台节点上启动并行程序,创建多个 MPI 进程,实现跨节点的并行计算。 :::{note} 在很多 MPI 实现中,`mpirun` 和 `mpiexec` 的功能几乎相同,它们都可以将并行程序拉起。有些 MPI 实现的 `mpirun` 和 `mpiexec` 背后是同一个程序。但严谨地讲,MPI 标准中只定义了 `mpiexec`,并没有定义 `mpirun`,因此,`mpiexec` 应该更通用。 diff --git a/ch-mpi/point-to-point.ipynb b/ch-mpi/point-to-point.ipynb index ce30943..1f7ce58 100644 --- a/ch-mpi/point-to-point.ipynb +++ b/ch-mpi/point-to-point.ipynb @@ -10,13 +10,14 @@ "最简单的通信模式是点对点(Point-to-Point)通信,点对点通信又分为阻塞式(Blocking)和非阻塞式(Non-Blocking)。实现点对点时主要考虑两个问题:\n", "\n", "* 如何控制和识别不同的进程。比如,想让进程 0 给进程 1 发消息。\n", - "* 如何控制数据的读写和收发。多大的数据,数据类型是什么。\n", + "* 如何控制数据的读写操作以及消息的发送和接收。这包括确定数据的大小和数据类型。\n", "\n", "## 发送与接收\n", "\n", - "[`Comm.send()`](https://mpi4py.readthedocs.io/en/latest/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.send) 和 [`Comm.recv()`](https://mpi4py.readthedocs.io/en/latest/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.recv) 分别用来阻塞式地发送和接收数据。\n", + "[`Comm.send()`](https://mpi4py.readthedocs.io/en/latest/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.send) 和 [`Comm.recv()`](https://mpi4py.readthedocs.io/en/latest/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.recv) 是分别用于阻塞式发送和接收数据的函数。\n", "\n", - "`Comm.send(obj, dest, tag=0)` 的参数主要是 `obj` 和 `dest`。`obj` 就是我们想要发送的数据,数据可以是 Python 内置的数据类型,比如 `list` 和 `dict` 等,也可以是 NumPy 的 `ndarray`,甚至是 GPU 上的 CuPy 数据。{numref}`sec-mpi-hello-world` 我们介绍了 Communicator 和 Rank,可以通过 Rank 号来定位一个进程,`dest` 是 Rank 号码。`tag` 作用是标识,给程序员一个精细控制的选项,使用 `tag` 可以实现消息的有序传递和筛选。接收方可以选择只接收特定标签的消息,或者按照标签的顺序接收消息,以便更加灵活地控制消息的发送和接收过程。\n", + "`Comm.send(obj, dest, tag=0)` 的参数主要是 `obj` 和 `dest`。`obj` 代表要发送的数据,它可以是Python的内置数据类型,如列表 `list` 和字典 `dict` 等,也可以是 NumPy 的多维数组 `ndarray`,甚至是 GPU 上的 CuPy 数据。\n", + "我们之前讨论了通信器(Communicator)和标识号(Rank),可以通过 Rank 来确定目标进程,其中`dest` 表示目标进程的 Rank。`tag` 参数用于标识,为程序员提供了更精细的控制选项。利用 `tag` 可以实现消息的有序传递和筛选。接收方可以基于特定标签选择接收消息,或者按照标签的顺序来接收消息,这提供了更灵活的控制消息发送和接收过程的能力。\n", "\n", "## 案例1:发送 Python 对象\n", "\n", @@ -132,14 +133,14 @@ "metadata": {}, "source": [ "```{note}\n", - "这里的 `Send` 和 `Recv` 函数的首字母都大写了,因为大写的 `Send` 和 `Recv` 等方法是基于缓存(Buffer)的。对于这些基于缓存的函数,应该明确数据的类型,比如传入这样的二元组 `[data, MPI.DOUBLE]` 或三元组 `[data, count, MPI.DOUBLE]`。刚才例子中,`comm.Send(data, dest=1)` 没有明确告知 MPI 其数据类型和数据大小,是因为 MPI 对 NumPy 和 CuPy `ndarray` 做了类型的自动探测。\n", + "这里的 `Send` 和 `Recv` 函数的首字母大写,表示它们是基于缓冲区(Buffer)的操作。使用这些基于缓冲区的函数时,通常需要明确指定数据类型,例如通过传入二元组 `(data, MPI.DOUBLE)` 或三元组 `(data, count, MPI.DOUBLE)` 来指定。然而,在之前的例子中,`comm.Send(data, dest=1)` 并没有显式指定数据类型和大小,这是因为 mpi4py 对 NumPy 和 CuPy 的 `ndarray` 进行了类型和大小的自动推断。\n", "```\n", "\n", "## 案例3:Master-Worker\n", "\n", - "现在我们做一个 Master-Worker 案例,共有 `size` 个进程,前 `size-1` 个进程作为 Worker,随机生成数据,最后一个进程(Rank 为 `size-1`)作为 Master,接收数据,并将数据的大小打印出来。\n", + "我们现在来实现一个 Master-Worker 模式的案例,其中包含 `size` 个进程。`size-1` 个进程作为 Worker 执行随机数据生成任务,而 Rank 为 size-1 的最后一个进程则作为 Master,负责接收这些数据,并将接收到的数据大小打印出来。\n", "\n", - "{numref}`code-mpi-master-worker` 对 Master 与 Worker 进程间数据的发送和接收过程进行了演示。\n", + "以下代码演示 Master 与 Worker 进程间数据发送和接收的过程。\n", "\n", "```{code-block} python\n", ":caption: master-worker.py\n", @@ -170,7 +171,7 @@ "comm.Barrier()\n", "```\n", "\n", - "在这个例子中,`rank` 小于 `size - 1` 的进程是 Worker,Worker 随机生成数据,并发送给最后一个进程(进程 Rank 号为 `size - 1`)。最后一个进程接收数据,并打印出接收数据的大小。" + "在这个例子中,`rank` 小于 `size - 1` 的进程作为 Worker,生成随机数据,并发送给 Rank 为 size - 1 的 Master 进程。Master 进程接收所有 Worker 发送的数据,并打印出接收到的数据的总大小。" ] }, { @@ -304,10 +305,10 @@ "\n", "我们先分析一下阻塞式通信。`Send` 和 `Recv` 这两个基于缓存的方法:\n", "\n", - "* `Send` 直到缓存是空的时候,也就是说缓存中的数据都被发送出去后,才返回(`return`)。缓存区域可以被接下来其他的 `Send` 循环再利用。\n", - "* `Recv` 直到缓存区域填满,才返回(`return`)。\n", + "* `Send` 方法会在缓冲区的数据全部发送完毕之后才返回(`return`)。该缓冲区可以被其他 `Send` 操作再次使用。\n", + "* `Recv` 方法会在缓冲区被数据填满之后才返回。\n", "\n", - "如 {numref}`mpi-communications` 所示,阻塞通信是数据完成传输,才会返回(`return`),否则一直在等待。\n", + "如 {numref}`mpi-communications` 所示,阻塞式通信是数据完全传输完成后才会结束等待状态并返回。\n", "\n", "```{figure} ../img/ch-mpi/blocking.svg\n", "---\n", @@ -330,15 +331,15 @@ "\n", "### 非阻塞\n", "\n", - "相比之下,非阻塞通信不会等待数据传输的完成。非阻塞通信可以重叠通信和计算来增强性能,即网络侧完成通信任务,CPU 侧完成计算任务。[`isend`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.isend) 和 [`irecv`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.irecv) 可用于非阻塞通信:\n", + "与阻塞式通信相比,非阻塞通信不会等待数据传输完成才开始执行后续操作。它允许通信和计算任务并行执行,从而提高性能:网络负责数据传输,而 CPU 则可以同时进行计算任务。[`isend`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.isend) 和 [`irecv`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.irecv) 是用于非阻塞通信的函数:\n", "\n", - "* `isend`:启动一个非阻塞发送操作并立即将控制返回给程序员,允许执行后续代码。\n", + "* `isend`:启动一个非阻塞发送操作,并立即将控制权返回给调用者,允许执行后续代码。\n", "\n", - "* `irecv`:启动一个非阻塞接收操作并立即将控制返回给程序员,允许执行后续代码。\n", + "* `irecv`:启动一个非阻塞接收操作,并立即将控制权返回给调用者,允许执行后续代码。\n", "\n", - "非阻塞式通信调用后直接返回 [`Request`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Request.html#mpi4py.MPI.Request) 句柄(Handle),程序员接下来再对 `Request` 做处理,比如等待 `Request` 涉及的数据传输完毕。非阻塞式通信有大写的 I 或小写的 i 作为前缀,大写 I 前缀的基于缓存,小写 i 前缀的没有。[`isend`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.isend) 的函数参数与 [`send`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.send) 相差不大,只不过 `isend` 返回值是一个 `Request`。 `Request` 类提供了 `wait` 方法,显示地调用 `wait()` 可以等待数据传输完毕。用 `send` 写的阻塞式的代码,可以改为 `Isend` + [`Request.wait()`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Request.html#mpi4py.MPI.Request.wait) 以非阻塞方式实现。\n", + "非阻塞式通信调用会立即返回一个 [`Request`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Request.html#mpi4py.MPI.Request) 句柄(Handle),调用者随后可以对这个 `Request` 句柄进行进一步处理,例如等待数据传输完成。带有大写 I 前缀的函数是基于缓冲区的,而带有小写 i 前缀的则不是。[`isend`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.isend) 的函数参数与 [`send`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Comm.html#mpi4py.MPI.Comm.send) 类似,但 `isend` 返回的是一个 `Request` 对象。`Request` 类提供了 `wait` 方法,调用该方法可以显式地等待数据传输完成。原本使用 `send` 编写的阻塞式代码可以通过使用 `isend` 加上 `Request.wait()` 来改写为非阻塞方式。\n", "\n", - "{numref}`code-mpi-non-blocking` 展示了一个非阻塞式通信的例子。\n", + "以下是展示非阻塞通信的一个示例代码。\n", "\n", "```{code-block} python\n", ":caption: non-blocking.py\n", @@ -387,7 +388,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "{numref}`fig-non-blocking-communications` 展示非阻塞通信 `wait()` 加入后数据流的变化。\n", + "{numref}`fig-non-blocking-communications` 展示了在非阻塞通信中加入 `wait()` 调用后,数据流的变化情况。\n", "\n", "```{figure} ../img/ch-mpi/non-blocking.svg\n", "---\n", diff --git a/ch-mpi/remote-memory-access.ipynb b/ch-mpi/remote-memory-access.ipynb index 3f7299c..0995fc2 100644 --- a/ch-mpi/remote-memory-access.ipynb +++ b/ch-mpi/remote-memory-access.ipynb @@ -27,11 +27,11 @@ "\n", "## 读写操作\n", "\n", - "创建好可远程访问的 Window 后,可以使用三类方法来向内存区域读写数据:[`mpi4py.MPI.Win.Put`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Win.html#mpi4py.MPI.Win.Put),[`mpi4py.MPI.Win.Get`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Win.html#mpi4py.MPI.Win.Get) 和 [`mpi4py.MPI.Win.Accumulate`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Win.html#mpi4py.MPI.Win.Accumulate)。这三类方法都有两个参数:`origin` 和 `target_rank`,分别表示源进程和目标进程。源进程指的是调用读写方法的进程,目标进程是被读写的进程。\n", + "创建可远程访问的窗口 Window 后,可以通过三类方法对内存区域进行读写操作:[`mpi4py.MPI.Win.Put`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Win.html#mpi4py.MPI.Win.Put),[`mpi4py.MPI.Win.Get`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Win.html#mpi4py.MPI.Win.Get) 和 [`mpi4py.MPI.Win.Accumulate`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.Win.html#mpi4py.MPI.Win.Accumulate)。这些方法都接受两个参数:`origin` 和 `target_rank`,分别表示源进程和目标进程。其中,源进程是调用读写方法的进程,而目标进程是将要被读写的进程。\n", "\n", - "* `Win.Put` 将数据从源进程移动至目标进程。\n", - "* `Win.Get` 将数据从目标进程移动至源进程。\n", - "* `Win.Accumulate` 与 `Win.Put` 类似,也是将数据从源进程移动至目标进程,同时对源进程的数据和目标进程的数据进行了聚合操作,聚合操作的操作符包括:[`mpi4py.MPI.SUM`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.SUM.html)、[`mpi4py.MPI.PROD`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.PROD.html) 等。\n", + "* `Win.Put`: 此方法将数据从源进程移动到目标进程。\n", + "* `Win.Get`: 此方法将数据从目标进程移动到源进程。\n", + "* `Win.Accumulate`: 类似于 `Win.Put`,此方法也将数据从源进程移动到目标进程,但不同之处在于它还包括一个聚合操作,即对目标进程中的数据执行某种操作。可用的聚合操作符包括 [`mpi4py.MPI.SUM`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.SUM.html)、[`mpi4py.MPI.PROD`](https://mpi4py.readthedocs.io/en/stable/reference/mpi4py.MPI.PROD.html) 等。\n", "\n", "## 数据同步\n", "\n", @@ -45,7 +45,7 @@ "并行数据读写时会出现数据同步的问题,P0 和 P1 表示两个不同的进程。\n", "```\n", "\n", - "为解决这个问题,需要一定的数据同步机制。MPI 一共有几类,包括主动同步(Active Target Synchronization)和被动同步(Passive Target Synchronization),如 {numref}`fig-rma-synchronization` 所示。\n", + "为解决多机环境中的数据同步问题,需要采用一定的数据同步机制。MPI 提供了几种同步方式,包括主动目标同步(Active Target Synchronization)和被动目标同步(Passive Target Synchronization),如 {numref}`fig-rma-synchronization` 所示。\n", "\n", "```{figure} ../img/ch-mpi/rma-synchronization.svg\n", "---\n", diff --git a/ch-ray-cluster/ray-cluster.md b/ch-ray-cluster/ray-cluster.md index 42eded1..c17cec3 100644 --- a/ch-ray-cluster/ray-cluster.md +++ b/ch-ray-cluster/ray-cluster.md @@ -13,25 +13,25 @@ name: fig-ray-cluster Ray 集群由头节点和多个工作节点组成,头节点上运行着一些管理进程。 ``` -所有节点上都运行着一些进程: +在 Ray 分布式计算环境中,所有节点上都运行着一些关键进程。 * Worker -每个计算节点上运行着一个或多个 Worker 进程,Worker 进程负责计算任务的运行。每个 Worker 进程运行特定的计算任务。Worker 进程或者是无状态的,即可以被反复执行 Remote Function 对应的 Task;又或者是一个 Actor,即只能执行有状态的 Remote Class 的方法。默认情况下,Worker 的数量等于其所在的计算节点的 CPU 核数。 +每个计算节点上运行着一个或多个 Worker 进程,这些进程负责执行计算任务。Worker 进程可以是无状态的,意味着它们可以反复执行 Task 对应的任务;它们也可以是有状态的 Actor,即执行远程类的方法。默认情况下,Worker 的数量等于其所在计算节点的 CPU 核心数。 * Raylet -每个计算节点上运行着一个 Raylet。与一个计算节点上运行多个 Worker 进程不同,每个计算节点上只有一个 Raylet 进程,或者说 Raylet 被多个 Worker 进程所共享。Raylet 主要有两个组件:一个调度器(Scheduler),负责资源管理、任务分配等。各个计算节点上的 Scheduler 共同组成了整个 Ray 集群的分布式调度器;一个基于共享内存的对象存储(Share-memory Object Store),负责本地的数据存储,各个计算节点上的 Object Store 共同组成了 Ray 的分布式对象存储。 +每个计算节点上运行着一个 Raylet。每个计算节点可能运行多个 Worker 进程,但每个计算节点上只有一个 Raylet 进程,或者说 Raylet 被多个 Worker 进程所共享。Raylet 主要包含两个组件:一个是调度器(Scheduler),它负责资源管理和任务分配;另一个是基于共享内存的对象存储(Shared-memory Object Store),它负责本地数据存储,各个计算节点上的 Object Store 共同构成了 Ray 集群的分布式对象存储。 从 {numref}`fig-ray-cluster` 中也可以看到,头节点还多了: * Global Control Service(GCS) -GCS 是 Ray 集群的全局元数据管理服务,这里的元数据信息包括:某个 Actor 被分配到哪个计算节点上。它管理的元数据是所有 Worker 共享的。 +GCS 是 Ray 集群的全局元数据管理服务,负责存储和管理诸如哪个 Actor 被分配到哪个计算节点等元数据信息。这些元数据是被所有 Worker 共享的。 * Driver -Driver 执行的是程序的入口,比如,作为 Python 入口的 `__main__` 函数。一般情况下,`__main__` 函数运行时不执行大规模的计算,只是把 Task 和 Actor 调度到具有足够资源的 Worker 上。 +Driver 用于执行程序的入口点。入口点指的是Python 的 `__main__` 函数。通常,`__main__` 在运行时不应该执行大规模计算,而是负责将 Task 和 Actor 调度到具备足够资源的 Worker 上。 Ray 的头节点还运行着其他一些管理类的服务,比如计算资源自动缩放、作业提交等服务。 diff --git a/ch-ray-cluster/ray-job.md b/ch-ray-cluster/ray-job.md index ab022a0..73d8aa7 100644 --- a/ch-ray-cluster/ray-job.md +++ b/ch-ray-cluster/ray-job.md @@ -16,9 +16,9 @@ ### `ray job` -Ray Jobs 命令行指的是 `ray job` 一系列操作作业的脚本。在 Python 环境中安装好 Ray 之后(`pip install "ray[default]"`),也会安装命令行工具,其中 `ray job` 负责作业的全生命周期管理。 +`ray job` 命令行是指一系列用于操作Ray作业的工具。在 Python 环境中安装好 Ray 之后(通过命令 `pip install "ray[default]"`),会同时安装命令行工具。`ray job` 可以负责管理作业的整个生命周期。 -我们先写好一个基于 Ray 的脚本,放置在当前目录 `./` 下,名为 `scripy.py`: +首先,我们需要编写一个基于Ray的脚本,并将其保存在当前目录下,文件名为 `scripy.py`: ```python import os @@ -46,6 +46,7 @@ sequence_size = 10 results = ray.get([generate_fibonacci.remote(sequence_size) for _ in range(os.cpu_count())]) print(results) ``` + 使用 `ray job submit` 提交这个作业: ```bash @@ -154,18 +155,20 @@ logs = client.get_job_logs(job_id) print(logs) ``` -[`JobSubmissionClient.submit_job()`](https://docs.ray.io/en/latest/cluster/running-applications/job-submission/doc/ray.job_submission.JobSubmissionClient.submit_job.html) 作业提交是异步的,Ray 会马上返回作业的 ID。如果想要看到作业的运行情况,需要 `wait_until_status()` 函数,不断向 Ray 集群请求,查看该作业的状态。跟命令行类似,`submit_job()` 中传入 `runtime_env` 来指定工作目录或依赖的 Python 包;`entrypoint_num_cpus` 和 `entrypoint_num_gpus` 指定入口所需要的计算资源。 +[`JobSubmissionClient.submit_job()`](https://docs.ray.io/en/latest/cluster/running-applications/job-submission/doc/ray.job_submission.JobSubmissionClient.submit_job.html) 的作业提交是异步的。调用此方法后,Ray 会马上返回作业的 ID。如果想要查看作业的运行状态,可以使 `wait_until_status()` 函数。这个函数会不断向 Ray 集群请求,以检查作业的当前状态。 + +与命令行操作类似,在 `submit_job()` 方法中,可以通过传入 `runtime_env` 参数来指定作业的工作目录或所需的 Python 包。此外,`entrypoint_num_cpus` 和 `entrypoint_num_gpus` 用于指定作用入口(`__main()__` 函数)所需要的计算资源。 ## Ray 客户端 -Ray 客户端指的是在 Python 的 `ray.init()` 中直接指定 Ray 集群的地址:`ray.init("ray://:")`。 +Ray 客户端是指在 Python 中使用 `ray.init()` 函数,直接指定Ray集群的地址:`ray.init("ray://:")`。 :::{note} -注意,这里的 `port` 默认为 10001,或者在 `ray start --head` 时对 `--ray-client-server-port` 进行设置。 +注意,Ray集群默认的客户端服务端口为 10001。如果需要使用不同的端口,可以在启动Ray集群头节点时,通过 `--ray-client-server-port` 参数进行设置。 ::: -客户端可以运行在个人电脑上,用户可以交互地调用集群的计算资源。需要注意的是,客户端的一些功能不如命令行和 SDK 完善,复杂的任务应优先使用命令行或者 SDK。 +客户端可以在个人计算机上运行,允许用户以交互方式调用Ray集群的计算资源。然而,需要注意的是,客户端的某些功能可能不如命令行工具和 Python. SDK 那样全面。对于执行复杂任务,建议优先使用命令行或 Python SDK。 -`ray.init()` 也接收 `runtime_env` 参数,用来指定 Python 包版本或工作目录。跟 Ray Jobs 命令行一样,Ray 会将工作目录中的数据传到 Ray 集群上。 +`ray.init()` 也接收 `runtime_env` 参数,该参数用于指定 Python 包版本或工作目录。跟 Ray Jobs 命令行工具一样,Ray 会将指定工作目录中的数据传输到Ray集群上。 -如果客户端与 Ray 集群的连接断开,这个客户端创建的分布式对象或引用都会被销毁。如果客户端和 Ray 集群意外断开,Ray 会在 30 秒后重新连接,重新连接失败后会把各类引用销毁。环境变量 `RAY_CLIENT_RECONNECT_GRACE_PERIOD` 可对这个时间进行自定义。 \ No newline at end of file +如果客户端与Ray集群的连接中断,客户端创建的所有分布式对象或引用将被销毁。在客户端与Ray集群意外断开连接的情况下,Ray会尝试在30秒后重新建立连接。如果重新连接失败,Ray将销毁所有相关的引用。用户可以通过设置环境变量 `RAY_CLIENT_RECONNECT_GRACE_PERIOD` 来自定义这个重连尝试的时间间隔。 \ No newline at end of file diff --git a/ch-ray-cluster/ray-resource.md b/ch-ray-cluster/ray-resource.md index bcd8742..520c8b4 100644 --- a/ch-ray-cluster/ray-resource.md +++ b/ch-ray-cluster/ray-resource.md @@ -17,7 +17,9 @@ ray start --num-cpus=32 --num-gpus=4 ## 资源需求 -默认情况下,Ray Task 会使用 1 个逻辑 CPU,这 1 个 CPU 既用来调度,又用来运行计算任务;Ray Actor 会使用 1 个逻辑 CPU 用来调度,0 个 CPU 用来运行计算任务。Task 或 Actor 在执行时,会被 Ray 调度到满足需求的节点上。在默认情况下,Ray Task 的资源需求比较明确,而 Ray Actor 只需要 0 个 CPU 用来运行计算任务,可能导致无限个 Ray Actor 运行到一台工作节点上。对于某个具体的计算任务,可以在定义 Task 或者 Actor 时,指定使用多少计算资源;指定计算资源的数量有利于计算任务的调度和运行,避免出现一些不确定的风险。具体而言,使用 `ray.remote()` 修饰函数或类时,传入 `num_cpus` 和 `num_gpus` 参数,可以指定 Task 和 Actor 所需资源。 +默认情况下,Ray Task会使用1个逻辑CPU,这个CPU既用于任务调度,也用于执行计算任务;而Ray Actor则使用1个逻辑CPU进行任务调度,0 个 CPU 用来运行计算任务。 + +Task 或 Actor 在执行时,Ray 会将其调度到能够满足其资源需求的节点上。在默认情况下,Ray Task 的资源需求相对明确,而 Ray Actor 默认 CPU 资源需求是 0 个,如果不做额外设置,造成一种 Ray Actor 不需要计算资源的假象,导致大量 Actor 被调度到同一个计算节点上。为了更好地控制资源使用并避免潜在的风险,建议在定义 Task 或 Actor 时指定所需的计算资源数量。具体来说,使用 `ray.remote()` 修饰函数或类时,可以通过传递 `num_cpus` 和 `num_gpus` 参数来指定 Task 和 Actor 所需的计算资源。 ``` @ray.remote(num_cpus=4) @@ -50,13 +52,13 @@ Ray 集群可以自动缩放,主要面向以下场景: ## Placement Group -基于计算资源和集群,Ray 提供了 Placement Group,中文可以理解成资源组。Placement Group 允许用户**原子地**使用集群上多个节点的计算资源,所谓原子地(Atomically),是指这些资源或者都分配给该用户,或者完全不分配,不会出现只分配一部分的情况。 +关于计算资源和集群的配置,Ray提供了一个名为Placement Group的功能,中文可以理解为“资源组”。Placement Group允许用户以**原子地**(Atomically)使用集群上多个节点的计算资源。所谓原子地,是指资源要么全部分配给用户,要么完全不分配,不会出现只分配部分资源的情况。 -Placement Group 主要针对的场景案例有: +Placement Group 主要适用于以下场景: -* 一个作业需要一组资源,这些资源需要协同工作以完成任务,给这个作业分配一部分资源,无法完成任务。这种场景在集群调度中又被称为组调度(Gang Scheduling)。比如,大规模分布式训练中需要多台计算节点和多块 GPU,需要在 Ray 集群中申请并分配这些资源。 +* 组调度(Gang Scheduling):一个作业需要一组资源,这些资源需要协同工作以完成任务。要么分配,要么不分配。如果只分配给这个作业部分资源,将无法完成整个任务。例如,在大规模分布式训练中,可能需要多台计算节点和多块GPU,这时可以在Ray集群中申请并分配这些资源。 -* 作业需要在多个节点上负载均衡,每个节点承担一小部分任务。Placement Group 使得这个作业尽量分摊到多个计算节点上。比如,在一个分布式推理场景,一个作业需要 8 块 GPU,每个 GPU 加载模型,独立地进行推理。为了负载均衡,应该将作业调度到 8 个计算节点上,每个节点占用 1 块 GPU;而不是将这个作业调度到 1 个计算节点的 8 块 GPU 上。因为都调度到 1 个计算节点,节点故障后,整个推理服务不可用。 +* 负载均衡:作业需要在多个节点上进行负载均衡,每个节点承担一小部分任务。Placement Group可以确保作业尽量分散到多个计算节点上。例如,在分布式推理场景中,如果一个作业需要8块GPU,每个GPU负责加载模型并独立进行推理,为了实现负载均衡,应该将作业调度到8个计算节点上,每个节点使用1块GPU。这样做的好处是,如果一个节点发生故障,不会导致整个推理服务不可用,因为其他节点仍然可以继续工作。 Placement Group 有几个关键概念: diff --git a/ch-ray-core/remote-class.ipynb b/ch-ray-core/remote-class.ipynb index 9d0f596..255cecb 100644 --- a/ch-ray-core/remote-class.ipynb +++ b/ch-ray-core/remote-class.ipynb @@ -172,7 +172,7 @@ "origin_pos": 6 }, "source": [ - "接下来我们要使用 `Counter` 类的计数功能:`increment()` 函数,我们也要在函数后面添加 `remote()` ,即 `对象实例.函数名.remote()`。" + "接下来,我们将使用 `Counter` 类的计数功能,特别是 `increment()` 方法。在使用这个方法时,需要在函数名后附加 `remote()` ,即在对象实例函数调用时加上 `remote()`: `对象实例.函数名.remote()`。" ] }, { @@ -307,7 +307,7 @@ "\n", "* 存储数据,比如状态数据\n", "* 从别的 Actor 接收消息\n", - "* 向别的 Actor 接收消息\n", + "* 向别的 Actor 发送消息\n", "* 创建新的 Actor\n", "\n", "Actor 存储的状态数据只能由 Actor 自己来管理,不能被其他 Actor 修改。这有点像面向对象编程语言中类的实例,如果想修改实例的数据,一般通过实例的成员函数。如果我们想修改 Actor 里面存储的状态数据,应该向 Actor 发送消息,Actor 接收到消息,并基于自己存储的数据,做出决策:决定修改状态数据,或者再向其他 Actor 发送消息。比如,刚才的计数器案例中,Actor 收到 `increment()` 的消息,并根据自己存储的状态,做自增操作。\n", diff --git a/ch-ray-data/data-transform.ipynb b/ch-ray-data/data-transform.ipynb index e7d353e..a1eac3a 100644 --- a/ch-ray-data/data-transform.ipynb +++ b/ch-ray-data/data-transform.ipynb @@ -348,9 +348,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "与 `map()` 有所区别的是,`map_batches()` 是对一个批次进行处理,它模拟的是单机处理时,对整个数据集的操作。其设计思想主要为了方便将之前编写好的单机程序无缝地迁移到 Ray 上。`map_batches()` 每个批次的数据格式为 `Dict[str, np.ndarray]` 或 `pd.DataFrame` 或 `pyarrow.Table`,分别对应 NumPy 、pandas 和 Arrow。\n", + "与 `map()` 不同,`map_batches()` 是针对数据的一个批次(Batch)进行处理,它模拟在单机处理环境中对整个数据集的操作。`map_batches()` 的设计宗旨是为了方便开发者将原本为单机环境编写的程序代码无缝地迁移到 Ray 平台上。`map_batches()` 每个批次的数据格式为 `Dict[str, np.ndarray]` 或 `pd.DataFrame` 或 `pyarrow.Table`,分别对应 NumPy 、pandas 和 Arrow。\n", "\n", - "下面的例子与 `map()` 实现的功能类似,只不过通过 pandas 的形式,对每个 Batch 进行操作。" + "下面的例子展示如何使用 pandas 的形式对每个批次(Batch)进行操作,其功能与刚才展示的 `map()` 类似案例,但是通过Pandas DataFrame来实现。 " ] }, { @@ -526,7 +526,7 @@ "source": [ "### Task 与 Actor\n", "\n", - "可以看到,转换操作本质上是在执行 `fn` ,这个函数接收一个输入,进行转换,得到输出。默认情况下,Ray Data 使用 Task 并行执行转换操作。Ray Task 比较适合无状态的计算,即 `fn` 内不依赖 `fn` 外的数据。如果是有状态的计算,需要使用 Ray Actor。比如,加载一个机器学习模型,并用这个模型对不同数据进行预测。下面的例子模拟了机器学习模型预测的过程,模型本身是被反复使用的,所以是有状态的计算。这个例子仅仅作为演示,所使用的并非是训练好的模型,而是一个等价变换 `torch.nn.Identity()`,它将输入原封不动地转换为输出。" + "转换操作本质上执行函数 `fn` ,该函数接收收入数据,进行处理转换,产生输出数据。在默认设置下,Ray Data 通过 Ray Task 并行执行这些转换操作。Ray Task 适用于无状态的计算场景,即函数 `fn` 内部不依赖于外部数据。当涉及有状态的计算时,例如需要重复使用的数据或资源,应该使用 Ray Actor。例如,在机器学习模型预测的场景中,模型需要被加载并用于对不同的数据集进行预测。这种情况下,模型的状态是被反复使用的,因此属于有状态的计算。以下示例演示了模拟的机器学习模型预测过程。需要注意的是,示例中使用的并非一个训练好的模型,而是一个等价的变换 `torch.nn.Identity()`,它将输入数据直接作为输出,不做任何改变。" ] }, { @@ -677,9 +677,8 @@ "\n", "## 分组\n", "\n", - "数据处理中另外一个经常使用的原语是分组聚合,Ray Data 提供了: [groupby()](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.groupby.html)。Ray Data 先调用 `groupby()`,对数据按照某些字段进行分组,再调用 [`map_groups()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.grouped_data.GroupedData.map_groups.html) 对分组之后的数据进行聚合。\n", - "\n", - "`groupby(key)` 的参数 `key` 是需要进行分组的字段;`map_groups(fn)` 的参数 `fn`,对同一个组的数据进行操作。Ray Data 预置了一些聚合函数,比如常见的求和 `sum()`,最大值 `max()`,平均值 `mean()` 等。比如下面的例子使用 `mean()` 对 `value` 字段进行聚合。 " + "数据处理中,另一个常用的操作是分组聚合。Ray Data 提供了 [groupby()](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.groupby.html) 函数。首先使用` groupby()` 对数据按照特定字段进行分组,然后通过 `map_groups()` 对分组后的数据执行聚合操作。\n", + "`groupby(key)` 的参数 `key` 指定了用于分组的字段;`map_groups(fn)` 的参数 `fn` 定义了对同一组数据执行的操作。Ray Data 内置了一些聚合函数,包括常用的求和 `sum()`、最大值 `max()`、平均值 `mean()` 等。例如,可以使用 `mean()` 对 `value` 字段进行聚合。 " ] }, { diff --git a/ch-ray-data/index.md b/ch-ray-data/index.md index 7a0ff07..9b2eab9 100644 --- a/ch-ray-data/index.md +++ b/ch-ray-data/index.md @@ -1,6 +1,6 @@ # Ray Data -Ray Data是基于Ray Core的数据处理框架,主要解决机器学习模型训练或推理相关的数据准备与处理问题,即数据的最后一公里问题。本章主要介绍 Ray Data 的原理,以及Modin(Ray 上的 pandas)。 +Ray Data是基于Ray Core构建的数据处理框架,它专注于解决机器学习和深度学习中模型训练或推理阶段的数据准备和处理问题,通常这些问题被称为数据的“最后一公里”(Last-mile Preprocessing)问题。本章将主要介绍 Ray Data 的工作原理,以及 Modin,这是一个在Ray之上实现的Pandas接口,用于加速数据处理任务。 ```{tableofcontents} ``` \ No newline at end of file diff --git a/ch-ray-data/modin.ipynb b/ch-ray-data/modin.ipynb index 727d6e8..27cff45 100644 --- a/ch-ray-data/modin.ipynb +++ b/ch-ray-data/modin.ipynb @@ -7,15 +7,15 @@ "(sec-modin)=\n", "# Modin\n", "\n", - "Ray Data 提供的各类数据处理工具相对比较简单,只能做一些比较简单的转换,无法胜任复杂的数据处理任务。Modin 是一款专门加速 pandas 的框架。它对大数据进行了切分,使 DataFrame 分布到多核和集群上。早期,它底层使用了 Ray 作为分布式执行引擎,又被称为 Ray 上的 pandas(pandas on Ray)。之后又添加了 Dask 和 [unidist](https://github.com/modin-project/unidist/) 执行引擎,unidist 是 Modin 团队自己开发的分布式执行引擎。\n", + "Ray Data 提供的各类数据处理工具设计上较为简单,只能做一些比较简单的转换,能提供的复杂的数据处理功能可能不够多。Modin 是一个旨在提高 pandas 性能的框架,它通过将大数据分割并分布到多核和集群上,实现了对大数据集的高效处理。最初,Modin 底层使用了 Ray 作为其分布式执行引擎,有时也被称为 Ray 上的 pandas。随后,Modin 又集成了 Dask 和 [unidist](https://github.com/modin-project/unidist/) 执行引擎,其中 unidist 是 Modin 团队开发的分布式执行引擎。\n", "\n", - "在安装 Modin 时,要安装对应的执行引擎(Ray、Dask 或 unidist),比如 `pip install \"modin[ray]\"` 或 `pip install \"modin[dask]\"`。Modin 默认使用 Ray 作为其执行引擎。\n", + "安装 Modin 时,用户可以根据需要选择并安装相应的执行引擎,例如使用 `pip install \"modin[ray]\"` 或 `pip install \"modin[dask]\"`。Modin 默认使用 Ray 作为其执行引擎。\n", "\n", "## API 兼容性\n", "\n", - "Dask DataFrame 与 pandas DataFrame 其实有不少差异,很多 pandas 工作流并不能快速迁移到 Dask DataFrame 上。Modin 更看重与 pandas 的兼容性,用户只需要 `import modin.pandas as pd`,绝大多数 pandas 工作流可以快速迁移到 Modin 上。\n", + "Dask DataFrame 与 pandas DataFrame 存在不少差异,许多 pandas 工作流不能快速迁移到 Dask DataFrame。Modin 特别重视与 pandas 的兼容性,用户通过 `import modin.pandas as pd`,大部分 pandas 工作流可以快速迁移到 Modin。\n", "\n", - "Dask DataFrame 只按列对大数据进行切分,且没有记录每个 Partition 有多少数据,Modin 在多维度对数据进行切分,保留行标签和列标签。Modin 支持行索引 `iloc()`;记录了每个数据块的数据量,可以支持`median()`、`quantile()`;支持行和列的转换(比如,`pivot()`、`transpose()`)等操作。有关 Modin 的设计,可以参考其两篇论文 {cite}`petersohn2020Scalable` {cite}`petersohn2021Flexible`。" + "Dask DataFrame 按列对大数据进行切分,并未记录每个分区的数据量。相比之下,Modin 在多个维度上对数据进行切分,并保留行标签和列标签。Modin 支持 `iloc()` 行索引;记录了每个数据块的数据量,从而可以支持`median()`、`quantile()` 等操作;同时支持行和列的转换,比如,`pivot()`、`transpose()` 等。关于 Modin 的设计,可以参考其两篇论文 {cite}`petersohn2020Scalable` {cite}`petersohn2021Flexible`。" ] }, { @@ -39,7 +39,7 @@ "metadata": {}, "source": [ ":::{note}\n", - "Modin 的 API 尽量与 pandas 一致,比如,pandas 的 `read_csv()` 只能读一个文件,不能读 `*.csv` 这样的通配符。Modin 额外增加了一些 API,比如,Modin 拓展了 `read_csv()`,提出了 `read_csv_glob()` 方法 可以读取 `*.csv` 这样的通配符,适合读大数据。这些额外增加的 API 在 [`modin.experimental.pandas`](https://modin.readthedocs.io/en/stable/flow/modin/experimental/pandas.html) 中。\n", + "Modin 的 API 设计旨在与 pandas 保持一致,例如,pandas 的 `read_csv()` 函数只能读单个文件,不支持 `*.csv` 这样的通配符。Modin 拓展了 `read_csv()`,引入了 `read_csv_glob()` 方法,该方法可以读取 `*.csv` 等通配符文件,适用于处理大规模数据集。这些新增的 API 在 [`modin.experimental.pandas`](https://modin.readthedocs.io/en/stable/flow/modin/experimental/pandas.html) 模块中。\n", ":::" ] }, @@ -121,13 +121,11 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "如果某些 API 在 Modin 中还没实现,Modin 会退回(Fallback)到 pandas,这也保证了兼容性。当然,缺点也很明显:将 Modin 的 DataFrame 转换为 pandas DataFrame 时,会有额外的开销;如果这个 DataFrame 分布在多个节点上,转化回 pandas 时会把数据集中到单机内存,有可能把单机内存挤爆。\n", + "如果Modin中尚未实现某些 API,Modin 会回退到 pandas,以确保兼容性。然而,这种设计也存在明显的缺点:将 Modin 的 DataFrame 转换为 pandas DataFrame时会产生额外开销;若 DataFrame 原本分布在多个节点上,转换过程中数据将被集中到单机内存,这可能会超出单机内存的承载能力。\n", "\n", "## 立即执行\n", "\n", - "Modin 是立即执行,这一点与 pandas 一致。用户不需要像 Dask 那样调用 `.compute()` 来触发计算。\n", - "\n", - "Modin 也没有 Dask DataFrame 的数据类型推断。{numref}`sec-dask-dataframe-read-write` 中的飞机起降数据上,Dask DataFrame `tail()` 会抛出异常,但 Modin 能够得到 pandas 一样的语义。" + "Modin 的计算是立即执行的,与 pandas 一致。用户无须像使用 Dask 那样调用 `.compute()` 方法来触发计算。Modin 不需要 Dask DataFrame 的数据类型推断功能。在 {numref}`sec-dask-dataframe-read-write` 中提到的飞机起降数据示例中,Dask DataFrame 的 `tail()` 方法可能会抛出异常,而 Modin 则能够提供与pandas 相同的语义。" ] }, { diff --git a/ch-ray-data/preprocessor.ipynb b/ch-ray-data/preprocessor.ipynb index d0ce7cf..e642e55 100644 --- a/ch-ray-data/preprocessor.ipynb +++ b/ch-ray-data/preprocessor.ipynb @@ -7,7 +7,7 @@ "(sec-ray-data-preprocessor)=\n", "# Preprocessor\n", "\n", - "{numref}`sec-ray-data-transform` 介绍了通用接口 `map()` 和 `map_batches()`。对于结构化的表格类数据,Ray Data 在 `map()` 和 `map_batches()` 基础上,增加了一个高阶的 API:预处理器(Preprocessor)。[Preprocessor](https://docs.ray.io/en/latest/data/api/preprocessor.html) 是一系列特征处理操作,可与机器学习模型训练和推理更好地结合。其使用方式与 scikit-learn 的 [sklearn.preprocessing](https://scikit-learn.org/stable/modules/classes.html#module-sklearn.preprocessing) 非常相似,熟悉 scikit-learn 的用户可以快速迁移过来。对于非结构化数据,比如图片、视频等,仍然建议使用 `map()` 或者 `map_batches()`。" + "{numref}`sec-ray-data-transform` 介绍了较为通用的 `map()` 和 `map_batches()` 函数。对于结构化的表格类数据,Ray Data 在提供了预处理器([Preprocessor](https://docs.ray.io/en/latest/data/api/preprocessor.html)),这是一系列特征处理操作,可以更好地与机器学习模型的训练和推理结合。其使用方式与 scikit-learn 的 [sklearn.preprocessing](https://scikit-learn.org/stable/modules/classes.html#module-sklearn.preprocessing) 非常相似,熟悉 scikit-learn 用户可以快速迁移到 Ray Data 的 Preprocessor 上来。对于非结构化数据,比如图片、视频等,仍然建议使用 `map()` 或者 `map_batches()`。" ] }, { diff --git a/ch-ray-data/ray-data-intro.md b/ch-ray-data/ray-data-intro.md index 1018c72..2129718 100644 --- a/ch-ray-data/ray-data-intro.md +++ b/ch-ray-data/ray-data-intro.md @@ -1,9 +1,9 @@ (sec-ray-data-intro)= # Ray Data 简介 -Ray Data 是基于 Ray Core 的数据处理框架,主要解决机器学习模型训练或推理相关的数据准备与处理问题,即数据的最后一公里问题(Last-mile Preprocessing)。与 Dask DataFrame、Modin、Xorbits 相比,Ray Data 更通用,既可以处理二维表,也可以处理图片、视频;Ray Data 的通用也意味着它在很多方面还不够专业,比如 `groupby` 等操作相对比较粗糙。除了 Ray Data 外,本章还会介绍 Modin。 +Ray Data 是一个构建在 Ray Core 之上的数据处理框架,它主要针对机器学习模型训练或推理过程中的数据准备和处理问题,这些问题通常被称为数据的“最后一公里”问题。与 Dask DataFrame、Modin、Vaex 等其他数据处理框架相比,Ray Data 具有更高的通用性,不仅能够处理二维表格数据,还能够处理图像和视频等非结构化数据类型。然而,Ray Data 的通用性也意味着它在某些专业数据处理操作方面可能不如一些更专注的框架精细,例如 `groupby` 操作可能在性能或功能上相对较为粗糙。 -Ray Data 对数据提供了一个抽象的类,[`ray.data.Dataset`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.html),在 `Dataset` 上提供了常见的大数据处理的原语,覆盖了数据处理的大部分阶段,例如: +Ray Data 对数据提供了一个抽象类,[`ray.data.Dataset`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.html),它封装了数据并在上面实现了常见的大数据处理原语,这些原语覆盖了数据处理的大部分阶段。例如: * 数据的读取,比如读取 Parquet 文件等。 * 对数据的转换(Transformation)操作,比如 [`map_batches()`](https://docs.ray.io/en/latest/data/api/doc/ray.data.Dataset.map_batches.html)。 @@ -21,9 +21,11 @@ Ray Data 面向机器学习,其设计理念也与机器学习的流程高度 ## Dataset -Ray Data 主要基于 `ray.data.Dataset` 对象。`Dataset` 是一个分布式的数据对象,`Dataset` 底层的基本单元是 `Block`。`Dataset` 是多个 `Block` 组成的分布式的 `ObjectRef[Block]`。 `Block` 是一个基于 Apache Arrow 格式的数据结构。 +Ray Data 主要基于 `ray.data.Dataset` 对象。`Dataset` 是一个分布式的数据对象,`Dataset` 底层的基本单元是 `Block`。`Dataset` 实际上是一个分布式的 `ObjectRef[Block]`。 -{numref}`fig-ray-dataset-arch` 是一个示意图,这个数据由 3 个 `Block` 组成,每个 `Block` 有 1,000 行数据。 +`Block` 是一个数据结构,它基于Apache Arrow格式构建,这是一种高效率的列式存储格式,适用于在内存中处理和操作大量数据。 + +{numref}`fig-ray-dataset-arch` 是一个示意图,展示了一个由 3 个 `Block` 组成的 `Dataset`,每个 `Block` 包含 1,000 行数据。 ```{figure} ../img/ch-ray-data/dataset-arch.svg --- diff --git a/ch-ray-ml/ray-train.ipynb b/ch-ray-ml/ray-train.ipynb index 10ca20d..045fbb9 100644 --- a/ch-ray-ml/ray-train.ipynb +++ b/ch-ray-ml/ray-train.ipynb @@ -7,7 +7,9 @@ "(sec-ray-train)=\n", "# Ray Train\n", "\n", - "Ray Train 基于 Ray 的 Actor 和 Task,对机器学习和深度学习训练中的流程进行了封装,使得一些单机任务可以横向扩展。可以简单理解为:Ray Train 将原有的单机的机器学习工作封装在了 Actor 中,每个 Actor 有一份机器学习模型的拷贝,可独立完成机器学习训练任务;Ray Train 借助 Actor 的横向扩展能力,使得训练任务在 Ray 集群上横向扩展。Ray Train 对 PyTorch、PyTorch Lightning、Hugging Face Transformers、XGBoost、LightGBM 等常用机器学习库进行了封装,并向用户提供了一些接口,用户不需要自己编写 Actor 代码,只需要稍微改动原有的单机机器学习工作流,即可快速切换到集群模式。本节以 PyTorch 为例,介绍如何基于数据并行将训练任务横向扩展,关于数据并行的原理,可参考 {numref}`sec-data-parallel`。\n", + "Ray Train 利用 Ray 的 Actor 和 Task 对机器学习和深度学习训练流程进行了封装,实现了单机任务的横向扩展。简单来说,Ray Train 将单机机器学习任务封装在Actor 中,每个 Actor 都拥有一个独立的机器学习模型副本,能够独立完成训练任务。利用 Actor 的横向扩展能力,Ray Train 使得训练任务能够在 Ray 集群上实现扩展。\n", + "\n", + "Ray Train 封装了 PyTorch、PyTorch Lightning、HuggingFace Transformers、XGBoost、LightGBM 等常用机器学习库,并向用户提供了接口。用户无须编写 Actor 代码,只需对原有的单机机器学习工作流进行少量修改,就能快速切换到集群模式。以 PyTorch 为例,本节介绍如何基于数据并行实现训练任务的横向扩展。数据并行的原理详见 {numref}`sec-data-parallel`。\n", "\n", "## 关键步骤\n", "\n", @@ -430,7 +432,7 @@ "\n", "## `ScalingConfig`\n", "\n", - "`ScalingConfig(num_workers=..., use_gpu=...)` 的 `num_workers` 设置让当前的任务以多大程度并行,`use_gpu` 用来分配 GPU。`num_workers` 可以理解成启动多少个 Ray Actor,每个 Actor 独立地执行单机训练任务。如果 `use_gpu=True`,默认给每个 Actor 分配 1 个 GPU,每个 Actor 看到的 `CUDA_VISIBLE_DEVICES` 也是 1 个。如果希望每个 Actor 看到多个 GPU,可以设置:`resources_per_worker={\"GPU\": n}`。\n", + "`ScalingConfig(num_workers=..., use_gpu=...)` 中的 `num_workers` 参数用于控制任务的并行度,`use_gpu` 参数用于控制是否使用GPU资源。`num_workers` 可以理解为启动的 Ray Actor 的数量,每个 Actor 独立执行训练任务。如果 `use_gpu=True`,在默认情况下,每个 Actor 会分配到 1 个 GPU,相应地,每个 Actor 的环境变量 `CUDA_VISIBLE_DEVICES` 也是 1 个。若要使每个 Actor 能够访问多个 GPU,可以设置 `resources_per_worker` 参数:`resources_per_worker={\"GPU\": n}`。\n", "\n", "## 监控\n", "\n", diff --git a/ch-ray-ml/ray-tune.ipynb b/ch-ray-ml/ray-tune.ipynb index a0a8492..df1376c 100644 --- a/ch-ray-ml/ray-tune.ipynb +++ b/ch-ray-ml/ray-tune.ipynb @@ -201,9 +201,11 @@ "source": [ "## 搜索算法和调度器\n", "\n", - "Ray Tune 的超参数搜索中比较重要的概念是搜索算法和调度器:搜索算法确定如何从搜索空间中选择新的超参数组合(即试验);调度器提前结束一些不太有前景的试验,节省计算资源。搜索算法是必须的,调度器不是必须的。这两者可以协作来选择超参数,比如使用随机搜索算法和 ASHA 调度器,调度器对一些看起来没希望的试验提前结束。另外,一些超参数优化的包通常提供了封装好的搜索算法,比如 [Hyperopt](https://github.com/hyperopt/hyperopt)、[Optuna](https://github.com/optuna/optuna) 等;有的还提供了调度器,这些包有自己的使用方式和习惯,Ray Tune 对这些包进行了封装,尽量使得这些包的使用方式统一。\n", + "Ray Tune 的超参数搜索中,核心概念是搜索算法和调度器。搜索算法负责从搜索空间中选择超参数组合进行试验;调度器则终止表现不佳的试验,以节省计算资源。搜索算法是必需的,而调度器则不必需。例如,可以结合使用随机搜索和 ASHA 调度器,让调度器对一些表现不佳的试验予以终止。\n", "\n", - "我们先使用随机搜索:" + "此外,一些超参数优化库如 [Hyperopt](https://github.com/hyperopt/hyperopt)、[Optuna](https://github.com/optuna/optuna) 等,通常提供了封装好的搜索算法,某些库还提供了调度器,这些库有其特定的使用方式。Ray Tune 封装了这些库,以统一不同库的使用体验。\n", + "\n", + "以下是使用随机搜索的示例:" ] }, { @@ -312,9 +314,9 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "调度器会对每个试验进行分析,提前结束某些效果较差试验,提前结束又被称为早停(Early Stopping),这样可以节省计算资源,把计算资源留给最有希望的试验们。下面的例子使用了 [ASHA 算法](https://openreview.net/forum?id=S1Y7OOlRZ) {cite}`li2018Massively` 进行调度。\n", + "调度器会分析每个试验的表现,并采用早停(Early Stopping)策略提前结束效果较差的试验,以节省计算资源,将资源分配给最有希望的试验。下面的例子使用了 [ASHA 算法](https://openreview.net/forum?id=S1Y7OOlRZ) {cite}`li2018Massively` 进行调度。\n", "\n", - "前面例子中,没设置 [`ray.tune.TuneConfig`](https://docs.ray.io/en/latest/tune/api/doc/ray.tune.TuneConfig.html),默认只进行了一次试验。现在我们设置 `ray.tune.TuneConfig` 中的 `num_samples`,该参数表示希望进行多少次试验。同时使用 [ray.tune.schedulers.ASHAScheduler](https://docs.ray.io/en/latest/tune/api/doc/ray.tune.schedulers.ASHAScheduler.html) 来做选择,提前结束那些性能较差的试验,把计算结果留给更有希望的试验。`ASHAScheduler` 的参数 `metric` 和 `mode` 表示希望优化的目标,本例的目标是最大化 \"mean_accuracy\"。" + "在前面的例子中,没有设置 [`ray.tune.TuneConfig`](https://docs.ray.io/en/latest/tune/api/doc/ray.tune.TuneConfig.html),默认情况下只进行一次试验。接下来,我们设置 `ray.tune.TuneConfig` 中的 `num_samples` 参数,该参数定义了要运行的试验次数。同时,使用 [ray.tune.schedulers.ASHAScheduler](https://docs.ray.io/en/latest/tune/api/doc/ray.tune.schedulers.ASHAScheduler.html) 来终止性能较差的试验,将资源留给更有希望的试验。`ASHAScheduler` 的参数 `metric` 和 `mode` 定义了优化的指标和方向,本例中目标是最大化 \"mean_accuracy\"。" ] }, { @@ -377,7 +379,7 @@ "cell_type": "markdown", "metadata": {}, "source": [ - "屏幕上会打印出每个试验所选择的超参数值和目标,对于性能较差的试验,简单迭代几轮(`iter`)之后就早停了。我们对这些试验的结果进行可视化:" + "在训练过程中,屏幕上会打印出每个试验所选择的超参数值和目标。对于性能较差的试验,在经过几轮迭代之后,会采取早停策略。我们对这些试验的结果进行可视化:" ] }, { @@ -1805,7 +1807,7 @@ "source": [ "## 案例:基于 PBT 进行图像分类\n", "\n", - "PBT 在训练过程中会对模型权重和超参数都进行调整,因此其训练代码部分必须有更新(加载)模型权重的代码。另外一个区别是训练迭代部分,大部分 PyTorch 训练过程都有 `for epoch in range(...)` 这样显式定义迭代训练的循环,循环一般有终止条件;PBT 训练过程不设置终止条件,当模型指标达到预期或者需要早停,Ray Tune 终止,因此训练迭代处使用 `while True` 一直循环迭代,直到被 Ray Tune 终止。" + "PBT 在训练过程中会对模型权重和超参数都进行调整,因此其训练代码部分必须有更新(加载)模型权重的代码。在传统的 PyTorch 训练中,通常会有 `for epoch in range(...)` 这样的循环结构,并具有明确的终止条件。区别于通常的 PyTorch 训练,Ray Tune 的 PBT 的训练过程不设置显式的终止条件,而是用 `while True` 一直循环迭代,直到模型性能达到预期或执行早停策略,由 Ray Tune 来终止训练。" ] }, { diff --git a/ch-xorbits/xinference.ipynb b/ch-xorbits/xinference.ipynb index d325350..c387263 100644 --- a/ch-xorbits/xinference.ipynb +++ b/ch-xorbits/xinference.ipynb @@ -63,9 +63,9 @@ "\n", "## 使用模型\n", "\n", - "Xinference 可以管理模型部署的整个生命周期:启动模型、使用模型、关闭模型。\n", + "Xinference 提供了模型全生命周期管理,包括模型的启动、运行和关闭。一旦启动Xinference 服务,用户便能启动并调用模型。Xinference 提供了对多种开源模型的支持,用户可以通过网页界面选择并启动模型,Xinference 会在后端自动下载并初始化所需模型。每个模型都配备了网页版对话界面,并提供了与 OpenAI API 兼容的接口。\n", "\n", - "启动 Xinference 服务后,我们就可以启动模型并调用模型,Xinference 集成了大量开源模型,用户可以在网页中选择一个启动,Xinference 会在后台下载并启动这个模型。每个启动的模型既有网页版对话界面,又兼容 OpenAI 的 API。下来我们将展示 2 个案例,介绍如何在本地管理和使用模型,如何基于 OpenAI API 与 Xinference 模型交互,如何基于 LangChain 和向量数据库构建智能系统。" + "接下来,我们将通过两个案例来展示如何在本地环境中使用 Xinference,如何利用 OpenAI API 与 Xinference 进行交互,以及如何结合 LangChain 和向量数据库技术构建智能系统。" ] }, { @@ -132,7 +132,7 @@ "source": [ "Xinference 默认的主机和 IP 地址分别为 127.0.0.1 和 9997。\n", "\n", - "接下来通过以下命令来启动通义千问模型。其中 `size-in-billions` 参数对应使用的参数规模,当前开源的 `qwen-chat` 模型参数规模为18亿(1.8B)、70亿(7B)、140亿(14B)和720亿(72B)。这里我们尝试使用 Qwen-7B 。" + "接下来通过以下命令来启动通义千问模型。其中 `size-in-billions` 参数对应使用的参数规模,第一代通义千问模型(Xinference 中代号为 `qwen-chat`)参数规模为18亿(1.8B)、70亿(7B)、140亿(14B)和720亿(72B)。这里我们尝试使用 7B 模型。" ] }, { @@ -191,7 +191,7 @@ "source": [ "### Completion API\n", "\n", - "使用 `client.completions.create` 进行简单的文本生成。Completion API 用来通过一段文本(Prompt)引导模型进行文本生成。" + "我们可以通过 OpenAI 的 `client.completions.create` 方法进行简单的文本生成。Completion API 用于根据给定的提示(Prompt)引导模型生成文本。" ] }, { @@ -290,13 +290,13 @@ "\n", "接下来我们使用 `client.chat.completions.create` 进行简单的上下文对话。\n", "\n", - "Chat Completion API 可以为与 LLM 进行交互时提供更加结构化的方式。相比于单纯传递一段文字,我们将一个包含多个结构化信息对象的数组发送给 LLM 作为输入。这样做可以让语言模型在继续回复时有一些\"上下文\"或者\"历史\"可参考。\n", + "Chat Completion API 为与大型语言模型(LLM)交互提供了一种更加结构化的方式。与传统的文字输入相比,我们发送包含多个结构化信息对象的数组给 LLM,作为输入。这种输入方式允许大语言模型在生成回复时参考“上下文”或“历史”。\n", "\n", "通常情况下,每条信息都会有一个角色(`role`)和内容(`content`):\n", "\n", "- 系统角色(`system`)用来向语言模型传达开发者定义好的核心指令。\n", - "- 用户角色(`user`)则代表着用户自己输入或者产生出来的信息。\n", - "- 助手角色(`assistant`)则是由语言模型自动生成并回复出来。\n", + "- 用户角色(`user`)代表着用户向语言模型发送的请求。\n", + "- 助手角色(`assistant`)则是由语言模型针对用户请求返回的回复。\n", "\n", "我们先定义结构化的信息:" ] diff --git a/ch-xorbits/xorbits.ipynb b/ch-xorbits/xorbits.ipynb index ad5c7b3..f2b4dff 100644 --- a/ch-xorbits/xorbits.ipynb +++ b/ch-xorbits/xorbits.ipynb @@ -7,11 +7,11 @@ "(sec-xorbits-data)=\n", "# Xorbits Data\n", "\n", - "Xorbits Data 是一款面向数据科学的分布式计算框架,功能上与 Dask 或 Modin 有些类似,用来加速 pandas DataFrame、NumPy。它与 Dask 和 Modin 类似,对大数据进行切分,切分之后使用 pandas 或 NumPy 来执行。它底层使用自己研发的 Actor 编程框架 [Xoscar](https://github.com/xorbitsai/xoscar),而不是依赖 Ray 或者 Dask。\n", + "Xorbits Data 是面向数据科学的分布式计算框架,具有 Dask 和 Modin 相似的功能,用于加速 pandas DataFrame、NumPy。Xorbits Data 通过切分大数据集,然后利用 pandas 或 NumPy 执行操作。其底层采用了自主研发的 Actor 编程框架 [Xoscar](https://github.com/xorbitsai/xoscar),不依赖于 Ray 或者 Dask。\n", "\n", "## Xorbits 集群\n", "\n", - "在进行计算前,Xorbits 需要初始化一个集群,单机上可以直接 `xorbits.init()`,如果有一个集群,可以按照下面的方式启动:先启动一个管理进程(Supervisor),再在不同的计算节点上启动 Worker。\n", + "在进行计算前,Xorbits 需要在多节点环境中初始化集群。对于单机环境,可以直接使用 `xorbits.init()` 进行初始化。对于集群环境,可以按照以下步骤进行配置:首先启动一个管理进程(Supervisor),然后在各个计算节点上启动 Worker 进程。\n", "\n", "```shell\n", "# 先在管理节点启动 Supervisor\n", diff --git a/drawio/ch-data-science/tune-algorithms.drawio b/drawio/ch-data-science/tune-algorithms.drawio index e85c591..5b9a0c9 100644 --- a/drawio/ch-data-science/tune-algorithms.drawio +++ b/drawio/ch-data-science/tune-algorithms.drawio @@ -1,6 +1,6 @@ - + - + @@ -19,7 +19,7 @@ - + diff --git a/drawio/ch-ray-data/dataset-arch.drawio b/drawio/ch-ray-data/dataset-arch.drawio index b0ef3a8..fc62363 100644 --- a/drawio/ch-ray-data/dataset-arch.drawio +++ b/drawio/ch-ray-data/dataset-arch.drawio @@ -1,14 +1,11 @@ - + - + - - - - - + + diff --git a/img/ch-data-science/tune-algorithms.svg b/img/ch-data-science/tune-algorithms.svg index a573d8d..375264a 100644 --- a/img/ch-data-science/tune-algorithms.svg +++ b/img/ch-data-science/tune-algorithms.svg @@ -1,4 +1,4 @@ -
网格搜索
网格搜索
随机搜索
随机搜索
迭代式
迭代式
1
1
8
8
5
5
4
4
6
6
7
7
9
9
2
2
3
3
1
1
2
2
3
3
3
3
4
4
4
4
5
5
5
5
6
6
6
6
7
7
7
7
8
8
8
8
9
9
9
9
1
1
2
2
Text is not SVG - cannot display
\ No newline at end of file +
网格搜索
网格搜索
随机搜索
随机搜索
自适应迭代式
自适应迭代式
1
1
8
8
5
5
4
4
6
6
7
7
9
9
2
2
3
3
1
1
2
2
3
3
3
3
4
4
4
4
5
5
5
5
6
6
6
6
7
7
7
7
8
8
8
8
9
9
9
9
1
1
2
2
Text is not SVG - cannot display
\ No newline at end of file