> [!abstract] 概要 > 次世代 AI アプリケーションは環境と継続的に対話し、その相互作用から学習し続ける。これらのアプリケーションは性能と柔軟性の両面で新たな高い要件を課す。本論文ではこれらの要件を検討し、それに応えるための分散システムである Ray を提案する。Ray はタスク並列とアクターベースの計算を統一インタフェースで表現でき、単一の動的実行エンジンによって支えられている。性能要件を満たすため、Ray は分散スケジューラと、システムの制御状態を管理する分散かつ耐障害性を持つストアを採用する。実験では毎秒 180 万件超のタスク処理能力へのスケーリングと、いくつかの挑戦的な強化学習アプリケーションにおける既存特化システムを上回る性能を実証する。 ## 論文情報 | 項目 | 内容 | |------|------| | 著者 | Philipp Moritz\*, Robert Nishihara\*, Stephanie Wang, Alexey Tumanov, Richard Liaw, Eric Liang, Melih Elibol, Zongheng Yang, William Paul, Michael I. Jordan, Ion Stoica | | 所属 | University of California, Berkeley | | 会議 | OSDI 2018(第 13 回 USENIX Symposium on Operating Systems Design and Implementation) | | 開催 | 2018 年 10 月 8〜10 日、カールズバッド、CA | | \* | equal contribution | ## 概要 強化学習(RL)アプリケーションは訓練(training)・サービング(serving)・シミュレーション(simulation)という 3 つのワークロードを**密結合**で必要とする。既存の分散フレームワーク(MapReduce・Spark・TensorFlow・Clipper など)はこの 3 つを同時に効率よく扱えない。Ray はこの課題を解決する汎用クラスタ計算フレームワークである。 Ray の設計上の 3 大貢献は次のとおりである。 1. **統一プログラミングモデル** — タスク並列(ステートレス)とアクター(ステートフル)の両抽象を同一 API 上に統合 2. **ボトムアップ分散スケジューラ** — ローカルスケジューラがまず判断し、過負荷時のみグローバルスケジューラへ転送 3. **Global Control Store(GCS)** — Redis シャード + チェーン複製で全制御状態を集中保持しつつ水平スケール ## 問題設定 RL ベースアプリケーションには以下の要件が共存する。 - **細粒度・異質計算**: タスク所要時間がミリ秒〜時間にわたる - **柔軟な計算モデル**: ステートレス(シミュレーション)とステートフル(訓練・パラメータサーバ)の混在 - **動的実行**: 前のタスク結果が次のタスクを決定する - **高スループット**: 毎秒 100 万件超のタスク処理(200 ノード 32 コア × 5ms タスクで 1.28M/s の試算) MapReduce/Spark は BSP モデルで細粒度シミュレーションに不向き、TensorFlow/MXNet は静的 DAG 前提でシミュレーション/サービングと密結合できない、Clipper/TF Serving はサービング専用で訓練・シミュレーションを扱えない。 ## 提案手法 ### プログラミングモデル Ray は動的タスクグラフ計算モデルを採用する。計算グラフはデータオブジェクト・タスク・エッジ(データ辺・制御辺・ステートフル辺)から構成される。 **タスク** はステートレスな遠隔関数実行を表す。`f.remote(args)` でフューチャを返し、`ray.get(futures)` でブロック取得する。べき等性があり、障害時は再実行で回復できる。 **アクター** はステートフルな計算を表す。`Class.remote()` でリモートにインスタンス化し、メソッド呼び出しを直列実行する。パラメータサーバ・GPU バックド反復計算・サードパーティシミュレータのラッパーに適している。 ステートフル辺(stateful edge)は同一アクター上の連続したメソッド呼び出し間の依存を表し、ステートレスタスクグラフにアクターを埋め込みながら血統追跡を可能にする。 **主要 API**: | 呼び出し形式 | 意味 | |---|---| | `futures = f.remote(args)` | 遠隔関数をノンブロッキング実行 | | `objects = ray.get(futures)` | フューチャのブロック取得 | | `ready = ray.wait(futures, k, timeout)` | k 件完了待ち(タイムアウト付き) | | `actor = Class.remote(args)` | アクターをリモート起動 | | `futures = actor.method.remote(args)` | アクターメソッドをノンブロッキング呼び出し | ### アーキテクチャ #### アプリケーション層 3 種のプロセスから構成される。 - **ドライバ**: ユーザープログラムを実行するプロセス - **ワーカー**: ドライバまたは別ワーカーから呼び出されるステートレスタスク実行プロセス - **アクター**: ステートフルメソッドを直列実行するプロセス #### システム層 **Global Control Store(GCS)**: システム全体の制御状態を管理する中心コンポーネント。Redis キーバリューストア + シャーディング + チェーン複製(OSDI04 の chain replication)で実装する。オブジェクトテーブル・タスクテーブル・関数テーブル・イベントログを格納する。全コンポーネントをステートレスにし、障害時の再起動とスケールアウトを容易にする。 **ボトムアップ分散スケジューラ**: ローカルスケジューラが先に判断し、ノード過負荷・GPU 不足時のみグローバルスケジューラへ転送する 2 段構成。グローバルスケジューラは推定待機時間(キューサイズ×平均実行時間 + リモート入力転送時間)が最小のノードを選択する。グローバルスケジューラがボトルネックになれば同一 GCS を参照する複数レプリカを起動できる。 **インメモリ分散オブジェクトストア**: 全タスクの入出力をノード内共有メモリ(Apache Arrow)で管理し、同一ノードのタスク間はゼロコピーでデータを渡す。入力がローカルにない場合はリモートからレプリケーションし、実行後はローカルに書き込む。LRU ポリシーで必要に応じてディスクへ退避する。 ### 耐障害性 **タスクの障害回復**: GCS の血統情報を用いて失われたオブジェクトを再計算する。ノード障害時もスループットを維持し、再参加後は元のスループットへ回復する。 **アクターの障害回復**: ステートフル辺を依存グラフに明示的に含めることで、タスク再実行と同一機構でアクターを再構成する。ユーザー定義チェックポイントで再実行量を制限する(チェックポイントなし 1 万回の再実行 → チェックポイントあり 500 回に削減)。 **GCS の耐障害性**: チェーン複製でメンバー障害が発生しても 30ms 以下の最大遅延で回復する。定期的な GCS フラッシュにより長期実行アプリケーションのメモリ消費を制限できる。 ### 実装 - 総コード量: 約 40K 行 (C++ 72%、Python 28%) - GCS は Redis を各シャードに 1 インスタンス使用、単一キー操作のみ - ローカル・グローバルスケジューラはいずれもイベント駆動シングルスレッドプロセス - 大きなオブジェクトの転送は複数 TCP 接続にストライピング ## 新規性 | 観点 | 既存手法の限界 | Rayの解決策 | |---|---|---| | 計算モデル | CIEL はタスクのみ、Orleans/Akka はアクターのみ | タスクとアクターを単一動的タスクグラフ上に統合 | | スケジューラ | Spark/CIEL は中央集権スケジューラ(数十 ms オーバーヘッド)。Sparrow は独立判断でデータ局所性なし | ボトムアップ: ローカル優先 + グローバル転送 + 完全分散化 | | 制御状態 | 既存フレームワークはスケジューラに状態を同居させボトルネック化 | GCS に状態を分離することでスケジューラをステートレスに | | 耐障害性 | Orleans はアクターのチェックポイントを手動実装が必要。Dask は血統耐障害性なし | ステートフル辺を血統グラフに含めることで統一的な再実行 | ## 実験設定 - クラウドプラットフォーム: Amazon Web Services - CPU インスタンス: m4.16xlarge、GPU インスタンス: p3.16xlarge - スケーラビリティ実験: 空タスクワークロード、ノード数 10〜100 で評価 - 訓練比較: Horovod + TF / 分散 TF と ResNet-101 で比較(p3.16xl、25Gbps Ethernet) - サービング比較: Clipper vs Ray アクター(残差ネット + 小型全結合ネット) - シミュレーション比較: OpenAI Gym Pendulum-v0、MPI BSP vs Ray 非同期タスク - RL 比較: ES アルゴリズム vs 参照実装(Redis メッセージング)、PPO vs OpenMPI 最適化実装 ## 実験結果 ### マイクロベンチマーク - **スケーラビリティ**: 60 ノードで 100 万 tasks/s を超え、100 ノードで **1.8 million tasks/s** を達成。ほぼ線形スケール。100 ノードで 1 億タスクを 54 秒で処理 - **局所性スケジューリング**: 局所性認識なしの場合、10〜100MB 入力で 1〜2 桁のレイテンシ増加。局所性認識により入力サイズによらず安定 - **オブジェクトストア**: 単一クライアントから 15 GB/s を超える書き込みスループット、18K IOPS(小オブジェクト) - **GCS 耐障害性**: チェーン再構成中の最大遅延は **30ms 未満** - **Allreduce**: 16 ノード 100MB で約 200ms、1GB で約 1200ms。OpenMPI v1.10 を **100MB で 1.5×、1GB で 2× 上回る** ### 個別ワークロード - **分散訓練**: Horovod と同等、分散 TF の 10% 以内の性能。勾配計算・転送・集約のパイプライン化を Ray の汎用 API で実現 - **サービング**: 小型全結合ネットでは Clipper(REST 経由)比 **約 1.4× 高スループット**(6200 vs 4400 states/sec)。100KB 入力の残差ネットでは 6900 vs 290 と **1 桁以上の差** - **シミュレーション**: 256 CPU で Ray 非同期タスクが MPI BSP の **1.87×** のタイムステップ/秒を達成 ### RL アプリケーション - **Evolution Strategies**: Ray 実装が 8192 コアにスケール。中央値 3.7 分で解決(公開済み最良結果の 2 倍高速)。参照実装は 2048 コアを超えると失敗 - **PPO**: Ray 実装が最適化 MPI 実装を全実験で上回り、GPU 使用量は最大 1/8 以下。PPO のコストを **4.5× 削減**。フォールトトレランス + リソース対応スケジューリングでスポットインスタンス利用によりコスト **18× 削減** ## 考察 **API 設計の最小性**: 最初はタスク抽象のみで開始し、ヘテロジニアス期間の対応に `wait()`、サードパーティシミュレータとステートフル計算の効率化にアクターを後から追加した。 **耐障害性の必要性**: AI アルゴリズムの確率的性質から「障害を無視できる」という意見に対し、(1) 耐障害性があるとアプリケーションが書きやすく推論しやすい、(2) 決定論的再実行でデバッグが大幅に簡単になる、という 2 点から必要性を確認。スポットインスタンス利用でコスト削減に直結。 **GCS の役割**: デバッグ時にシステム全体の状態をクエリできる基盤としても機能。タイムライン可視化ツールのバックエンドになる。 **制限事項**: 計算グラフを事前に知らないためスケジューリング最適化に限界。タスクごとの血統保存はガベージコレクション機構が必要(開発中)。 ## 強み / 弱点・課題 ### 強み - タスクとアクターを単一フレームワークに統合し、RL の訓練・サービング・シミュレーションを 1 アプリケーション内で完結できる - ボトムアップスケジューラにより中央集権スケジューラのミリ秒オーバーヘッドを回避、Allreduce など通信集約型プリミティブを汎用 API で実装可能 - GCS でコンポーネントをステートレス化し、耐障害性・デバッグ・独立スケールアウトを一元実現 - 血統ベース耐障害性でチェックポイントコストを最小化しつつ透明な回復を提供 ### 弱点・課題 - 計算グラフの事前知識なしでのスケジューリング最適化に限界がある - タスクごとの血統保存によるメモリ増大には GC 機構が必要(当時開発中) - 分散オブジェクトは各オブジェクトが単一ノードに収まる制約あり(行列等は複数フューチャのコレクションとして実装が必要) - 汎用性ゆえ特化最適化が難しく、Spark の straggler 緩和やクエリ最適化等のリッチな API は欠く