Ray
What is Ray?
Ray is an open-source distributed computing framework for scaling AI and Python applications. Built by Anyscale, it lets you parallelize Python code across multiple nodes and GPUs with minimal code changes.
Ray is three layers:
- Ray Core — Low-level distributed computing primitives (tasks, actors, objects)
- Ray AI Libraries — High-level ML libraries (Train, Tune, Serve, Data, RLlib) built on Core
- Ray Clusters — Infrastructure for running Ray on Kubernetes, cloud VMs, or bare metal
The key insight: you write normal Python, decorate functions/classes, and Ray handles distribution, scheduling, fault tolerance, and autoscaling.
Core Concepts
Tasks
A task is a remote function — an async function executed on a worker process. Decorate with @ray.remote.
import ray
@ray.remote
def square(x):
return x * x
# Launch 4 tasks in parallel
futures = [square.remote(i) for i in range(4)]
results = ray.get(futures) # [0, 1, 4, 9]
Tasks can declare resource requirements:
@ray.remote(num_cpus=2, num_gpus=1)
def train_model(data):
...
Actors
An actor is a stateful worker — a remote class. Each actor runs on a dedicated worker process and maintains state between method calls.
@ray.remote
class Counter:
def __init__(self):
self.count = 0
def increment(self):
self.count += 1
return self.count
counter = Counter.remote()
ray.get(counter.increment.remote()) # 1
ray.get(counter.increment.remote()) # 2
Use actors for: model serving, stateful stream processing, parameter servers in distributed training.
Objects
Tasks and actors produce remote objects stored in Ray’s distributed shared-memory object store (one per node). You reference them via ObjectRefs (futures).
ref = square.remote(5) # returns ObjectRef, not the result
result = ray.get(ref) # blocks until ready, returns 25
Objects are immutable once created. Ray handles data transfer between nodes automatically.
Placement Groups
Placement groups atomically reserve resources across multiple nodes. Two strategies:
| Strategy | Behavior | Use case |
|---|---|---|
PACK | Pack resources onto fewest nodes | Minimize inter-node communication |
SPREAD | Spread across different nodes | Fault tolerance |
STRICT_PACK | All on same node (fail if can’t) | Co-located GPU tasks |
STRICT_SPREAD | One per node (fail if can’t) | One replica per node |
from ray.util.placement_group import placement_group
pg = placement_group([{"GPU": 1}, {"GPU": 1}], strategy="PACK")
ray.get(pg.ready()) # wait for resources
@ray.remote(num_gpus=1)
def gpu_task():
...
gpu_task.options(placement_group=pg).remote()
Architecture
Ray Cluster
graph TD
subgraph Head["Head Node"]
GCS["GCS<br/><i>Global Control Service</i>"]
Auto["Autoscaler"]
Driver["Driver<br/><i>runs user script</i>"]
Sched1["Raylet<br/><i>scheduler + object store</i>"]
end
subgraph W1["Worker Node 1"]
Sched2["Raylet"]
T1["Worker process"]
T2["Worker process"]
end
subgraph W2["Worker Node 2"]
Sched3["Raylet"]
T3["Worker process"]
T4["Worker process"]
end
Driver -->|"submits tasks"| GCS
GCS --> Sched1
GCS --> Sched2
GCS --> Sched3
Auto -->|"scales nodes"| W1
Auto -->|"scales nodes"| W2
| Component | Role |
|---|---|
| Head Node | Runs GCS, autoscaler, and driver. Can also run tasks (not recommended at scale). |
| Worker Node | Runs user tasks and actors. |
| GCS (Global Control Service) | Cluster metadata store — tracks nodes, actors, placement groups. |
| Raylet | Per-node daemon. Local scheduler + object store manager. |
| Autoscaler | Monitors resource demands, adds/removes worker nodes. Reacts to task/actor resource requests, not CPU utilization. |
| Object Store | Per-node shared memory (Apache Arrow/Plasma). Stores task outputs. |
Autoscaling
The autoscaler runs on the head node and:
- Scales up when pending tasks/actors request resources the cluster can’t satisfy
- Scales down when worker nodes are idle (no running tasks/actors)
- Reacts to resource requests, not physical utilization
Ray AI Libraries
| Library | Purpose | Example |
|---|---|---|
| Ray Data | Distributed data loading and preprocessing | ray.data.read_parquet("s3://...") |
| Ray Train | Distributed model training (PyTorch, TF, XGBoost) | Multi-node DDP with fault tolerance |
| Ray Tune | Hyperparameter tuning | Grid/random/Bayesian search at scale |
| Ray Serve | Model serving with online inference | Deploy models as REST APIs |
| RLlib | Reinforcement learning | Distributed RL training |
# Example: distributed PyTorch training with Ray Train
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig
trainer = TorchTrainer(
train_func,
scaling_config=ScalingConfig(
num_workers=4,
use_gpu=True,
),
)
result = trainer.fit()
Ray on Kubernetes (KubeRay)
KubeRay is the Kubernetes operator for Ray. It provides three CRDs:
graph LR
subgraph KubeRay["KubeRay Operator"]
RC["RayCluster<br/><i>long-lived cluster</i>"]
RJ["RayJob<br/><i>cluster + job, auto-cleanup</i>"]
RS["RayService<br/><i>cluster + Serve, zero-downtime upgrades</i>"]
end
RC --> Pods["Head + Worker Pods"]
RJ --> Pods
RS --> Pods
| CRD | What it does | When to use |
|---|---|---|
| RayCluster | Creates and manages a long-lived Ray cluster | Development, interactive use, multiple jobs on same cluster |
| RayJob | Creates a RayCluster, submits a job, optionally deletes cluster on completion | Batch jobs, CI/CD, cost optimization (auto-delete) |
| RayService | RayCluster + Ray Serve deployment with zero-downtime upgrades | Model serving in production |
RayJob Example
apiVersion: ray.io/v1
kind: RayJob
metadata:
name: training-job
spec:
entrypoint: python train.py
shutdownAfterJobFinishes: true # delete cluster when done
rayClusterSpec:
headGroupSpec:
rayStartParams:
dashboard-host: "0.0.0.0"
template:
spec:
containers:
- name: ray-head
image: rayproject/ray:2.40.0-gpu
resources:
requests:
cpu: "2"
memory: 8Gi
workerGroupSpecs:
- replicas: 4
minReplicas: 1
maxReplicas: 8
groupName: gpu-workers
rayStartParams: {}
template:
spec:
containers:
- name: ray-worker
image: rayproject/ray:2.40.0-gpu
resources:
requests:
cpu: "4"
memory: 16Gi
nvidia.com/gpu: "1"
Which CRD to Choose
graph TD
Q1{"Model serving?"} -->|Yes| RS["RayService"]
Q1 -->|No| Q2{"Need auto-cleanup<br/>after job?"}
Q2 -->|Yes| RJ["RayJob"]
Q2 -->|No| Q3{"Interactive dev<br/>or multi-job?"}
Q3 -->|Yes| RC["RayCluster"]
Q3 -->|No| RJ
Key Features
| Feature | Description |
|---|---|
| Zero-change scaling | Same code runs locally and on 100+ nodes |
| Heterogeneous resources | Mix CPUs, GPUs, TPUs, custom resources in one cluster |
| Autoscaling | Worker nodes scale based on workload demand |
| Fault tolerance | Tasks retry on failure, actors can be restarted, objects reconstructed |
| Shared memory object store | Zero-copy reads for objects on the same node (Arrow/Plasma) |
| Placement groups | Gang scheduling and co-location control |
| Runtime environments | Per-task/actor Python dependency isolation |
| Dashboard | Web UI for monitoring jobs, actors, resources, logs |
| Multi-tenancy | Multiple jobs/users share one cluster via namespaces |
Practical Example: Batch Inference
import ray
ray.init()
@ray.remote(num_gpus=1)
def predict_batch(model_path: str, batch: list[str]) -> list[float]:
model = load_model(model_path)
return [model.predict(item) for item in batch]
# Fan out across all available GPUs
data = [...] # 10,000 items
batches = [data[i:i+100] for i in range(0, len(data), 100)]
futures = [predict_batch.remote("s3://models/v1.pt", b) for b in batches]
results = ray.get(futures) # waits for all, returns list of lists
When to Use / When Not to Use
Good fit:
- Distributed ML training (multi-node, multi-GPU)
- Batch inference at scale
- Hyperparameter tuning (many parallel trials)
- Model serving with complex pre/post-processing
- Parallelizing any CPU/GPU-bound Python workload
- Reinforcement learning
Not the right tool for:
- Simple single-machine scripts (overhead not worth it)
- Streaming / event-driven pipelines (use Flink, Kafka Streams)
- Workflow orchestration / DAG management (use Flyte, Airflow — they can call Ray)
- Cluster-level quota/queue management (use Kueue, Volcano)
Further Resources
- Official docs
- GitHub
- KubeRay operator
- Ray Core key concepts
- Ray on Kubernetes
- Anyscale — managed Ray platform