Skip to content

Commit

Permalink
dask ml distributed (#43)
Browse files Browse the repository at this point in the history
  • Loading branch information
luweizheng authored May 5, 2024
1 parent 6652b8a commit 99fb5b7
Show file tree
Hide file tree
Showing 4 changed files with 3,593 additions and 3 deletions.
3 changes: 3 additions & 0 deletions _toc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ subtrees:
- file: ch-dask-dataframe/indexing
- file: ch-dask-dataframe/map-partitions
- file: ch-dask-dataframe/shuffle
- file: ch-dask-ml/index
entries:
- file: ch-dask-ml/distributed-training
- file: ch-ray-core/index
entries:
- file: ch-ray-core/ray-intro
Expand Down
6 changes: 3 additions & 3 deletions ch-dask-dataframe/shuffle.ipynb
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@
"\n",
"## Shuffle 实现机制\n",
"\n",
"{numref}`sec-dask-task-graph` 介绍了 Dask 主要基于 Task Graph 构建, Dask 的 Task Graph 是一个有向无环图。有向边表示下游 Partition 的输入依赖上游 Partition 的输出,任何数据移动都会在 Task Graph 上生成一条有向边。很多计算任务的 Shuffle 有大量的数据移动,有的场景下,所有数据都会打散,也意味着上游会有多条指向下游的边。这种基于 Task Graph 的 Shuffle 会使得 Task Graph 非常庞大,Task Graph 过大会使得 Dask Scheduler 的负载过重,进一步导致计算极其缓慢。如 {numref}`fig-dask-shuffle` 左侧所示,`tasks` 是基于 Task Graph 的机制,上游和下游之间建立了有向边,如果有中间层(通常因为上游流入的数据太大,需要将数据进一步切分成多个 Partition),那么中间层还会进一步增加 Task Graph 的复杂程度。\n",
"{numref}`sec-dask-task-graph` 介绍了 Dask 主要基于 Task Graph 构建, Dask 的 Task Graph 是一个有向无环图。有向边表示下游 Partition 的输入依赖上游 Partition 的输出,任何数据移动都会在 Task Graph 上生成一条有向边。很多计算任务的 Shuffle 有大量的数据移动,有的场景下,所有数据都会打散,也意味着上游会有多条指向下游的边。这种基于 Task Graph 的 Shuffle 会使得 Task Graph 非常庞大,Task Graph 过大会使得 Dask Scheduler 的负载过重,进一步导致计算极其缓慢。如 {numref}`fig-shuffle-tasks-p2p` 左侧所示,`tasks` 是基于 Task Graph 的机制,上游和下游之间建立了有向边,如果有中间层(通常因为上游流入的数据太大,需要将数据进一步切分成多个 Partition),那么中间层还会进一步增加 Task Graph 的复杂程度。\n",
"\n",
"为解决 Task Graph 过大的问题,Dask 设计了一种点对点(Peer-to-peer)的 Shuffle 机制。如 {numref}`fig-dask-shuffle` 右侧所示,`p2p` 在 Task Graph 中引入了一个虚拟的障碍(Barrier)节点。Barrier 并不是一个真正的 Task,引入 Barrier 节点可以使 Task Graph 复杂度显著下降。\n",
"为解决 Task Graph 过大的问题,Dask 设计了一种点对点(Peer-to-peer)的 Shuffle 机制。如 {numref}`fig-shuffle-tasks-p2p` 右侧所示,`p2p` 在 Task Graph 中引入了一个虚拟的障碍(Barrier)节点。Barrier 并不是一个真正的 Task,引入 Barrier 节点可以使 Task Graph 复杂度显著下降。\n",
"\n",
"```{figure} ../img/ch-dask-dataframe/shuffle-tasks-p2p.png\n",
"---\n",
Expand All @@ -26,7 +26,7 @@
"目前,Dask 提供了两类 Shuffle 实现策略:单机和分布式。\n",
"\n",
"* 单机。如果数据大小超出了内存空间,可以将中间数据写到磁盘上。单机场景默认使用这种策略。\n",
"* 分布式。如 {numref}`fig-dask-shuffle` 所示,分布式场景提供了两种 Shuffle 策略,`tasks` 和 `p2p`。`tasks` 是基于 Task Graph 的 Shuffle 实现,很多场景效率比较低,会遇到刚提到的 Task Graph 过大的问题。`p2p` 基于点对点的 Shuffle 实现,Task Graph 的复杂性显著降低,性能也显著提升。Dask 会优先选择 `p2p`。\n",
"* 分布式。如 {numref}`fig-shuffle-tasks-p2p` 所示,分布式场景提供了两种 Shuffle 策略,`tasks` 和 `p2p`。`tasks` 是基于 Task Graph 的 Shuffle 实现,很多场景效率比较低,会遇到刚提到的 Task Graph 过大的问题。`p2p` 基于点对点的 Shuffle 实现,Task Graph 的复杂性显著降低,性能也显著提升。Dask 会优先选择 `p2p`。\n",
"\n",
"`dask.config.set({\"dataframe.shuffle.method\": \"p2p\"})` 对当前 Python 脚本的所有计算都使用 `p2p` 方式进行 Shuffle。也可以针对某个算子设置 Shuffle 策略,比如 `ddf.merge(shuffle_method=\"p2p\")`。\n",
"\n",
Expand Down
Loading

0 comments on commit 99fb5b7

Please sign in to comment.