Ray

What is Ray?

Ray is an open-source distributed computing framework for scaling AI and Python applications. Built by Anyscale, it lets you parallelize Python code across multiple nodes and GPUs with minimal code changes.

Ray is three layers:

  1. Ray Core — Low-level distributed computing primitives (tasks, actors, objects)
  2. Ray AI Libraries — High-level ML libraries (Train, Tune, Serve, Data, RLlib) built on Core
  3. Ray Clusters — Infrastructure for running Ray on Kubernetes, cloud VMs, or bare metal

The key insight: you write normal Python, decorate functions/classes, and Ray handles distribution, scheduling, fault tolerance, and autoscaling.


Core Concepts

Tasks

A task is a remote function — an async function executed on a worker process. Decorate with @ray.remote.

import ray

@ray.remote
def square(x):
    return x * x

# Launch 4 tasks in parallel
futures = [square.remote(i) for i in range(4)]
results = ray.get(futures)  # [0, 1, 4, 9]

Tasks can declare resource requirements:

@ray.remote(num_cpus=2, num_gpus=1)
def train_model(data):
    ...

Actors

An actor is a stateful worker — a remote class. Each actor runs on a dedicated worker process and maintains state between method calls.

@ray.remote
class Counter:
    def __init__(self):
        self.count = 0

    def increment(self):
        self.count += 1
        return self.count

counter = Counter.remote()
ray.get(counter.increment.remote())  # 1
ray.get(counter.increment.remote())  # 2

Use actors for: model serving, stateful stream processing, parameter servers in distributed training.

Objects

Tasks and actors produce remote objects stored in Ray’s distributed shared-memory object store (one per node). You reference them via ObjectRefs (futures).

ref = square.remote(5)   # returns ObjectRef, not the result
result = ray.get(ref)     # blocks until ready, returns 25

Objects are immutable once created. Ray handles data transfer between nodes automatically.

Placement Groups

Placement groups atomically reserve resources across multiple nodes. Two strategies:

StrategyBehaviorUse case
PACKPack resources onto fewest nodesMinimize inter-node communication
SPREADSpread across different nodesFault tolerance
STRICT_PACKAll on same node (fail if can’t)Co-located GPU tasks
STRICT_SPREADOne per node (fail if can’t)One replica per node
from ray.util.placement_group import placement_group

pg = placement_group([{"GPU": 1}, {"GPU": 1}], strategy="PACK")
ray.get(pg.ready())  # wait for resources

@ray.remote(num_gpus=1)
def gpu_task():
    ...

gpu_task.options(placement_group=pg).remote()

Architecture

Ray Cluster

graph TD
    subgraph Head["Head Node"]
        GCS["GCS<br/><i>Global Control Service</i>"]
        Auto["Autoscaler"]
        Driver["Driver<br/><i>runs user script</i>"]
        Sched1["Raylet<br/><i>scheduler + object store</i>"]
    end

    subgraph W1["Worker Node 1"]
        Sched2["Raylet"]
        T1["Worker process"]
        T2["Worker process"]
    end

    subgraph W2["Worker Node 2"]
        Sched3["Raylet"]
        T3["Worker process"]
        T4["Worker process"]
    end

    Driver -->|"submits tasks"| GCS
    GCS --> Sched1
    GCS --> Sched2
    GCS --> Sched3
    Auto -->|"scales nodes"| W1
    Auto -->|"scales nodes"| W2
ComponentRole
Head NodeRuns GCS, autoscaler, and driver. Can also run tasks (not recommended at scale).
Worker NodeRuns user tasks and actors.
GCS (Global Control Service)Cluster metadata store — tracks nodes, actors, placement groups.
RayletPer-node daemon. Local scheduler + object store manager.
AutoscalerMonitors resource demands, adds/removes worker nodes. Reacts to task/actor resource requests, not CPU utilization.
Object StorePer-node shared memory (Apache Arrow/Plasma). Stores task outputs.

Autoscaling

The autoscaler runs on the head node and:

  • Scales up when pending tasks/actors request resources the cluster can’t satisfy
  • Scales down when worker nodes are idle (no running tasks/actors)
  • Reacts to resource requests, not physical utilization

Ray AI Libraries

LibraryPurposeExample
Ray DataDistributed data loading and preprocessingray.data.read_parquet("s3://...")
Ray TrainDistributed model training (PyTorch, TF, XGBoost)Multi-node DDP with fault tolerance
Ray TuneHyperparameter tuningGrid/random/Bayesian search at scale
Ray ServeModel serving with online inferenceDeploy models as REST APIs
RLlibReinforcement learningDistributed RL training
# Example: distributed PyTorch training with Ray Train
from ray.train.torch import TorchTrainer
from ray.train import ScalingConfig

trainer = TorchTrainer(
    train_func,
    scaling_config=ScalingConfig(
        num_workers=4,
        use_gpu=True,
    ),
)
result = trainer.fit()

Ray on Kubernetes (KubeRay)

KubeRay is the Kubernetes operator for Ray. It provides three CRDs:

graph LR
    subgraph KubeRay["KubeRay Operator"]
        RC["RayCluster<br/><i>long-lived cluster</i>"]
        RJ["RayJob<br/><i>cluster + job, auto-cleanup</i>"]
        RS["RayService<br/><i>cluster + Serve, zero-downtime upgrades</i>"]
    end

    RC --> Pods["Head + Worker Pods"]
    RJ --> Pods
    RS --> Pods
CRDWhat it doesWhen to use
RayClusterCreates and manages a long-lived Ray clusterDevelopment, interactive use, multiple jobs on same cluster
RayJobCreates a RayCluster, submits a job, optionally deletes cluster on completionBatch jobs, CI/CD, cost optimization (auto-delete)
RayServiceRayCluster + Ray Serve deployment with zero-downtime upgradesModel serving in production

RayJob Example

apiVersion: ray.io/v1
kind: RayJob
metadata:
  name: training-job
spec:
  entrypoint: python train.py
  shutdownAfterJobFinishes: true  # delete cluster when done
  rayClusterSpec:
    headGroupSpec:
      rayStartParams:
        dashboard-host: "0.0.0.0"
      template:
        spec:
          containers:
            - name: ray-head
              image: rayproject/ray:2.40.0-gpu
              resources:
                requests:
                  cpu: "2"
                  memory: 8Gi
    workerGroupSpecs:
      - replicas: 4
        minReplicas: 1
        maxReplicas: 8
        groupName: gpu-workers
        rayStartParams: {}
        template:
          spec:
            containers:
              - name: ray-worker
                image: rayproject/ray:2.40.0-gpu
                resources:
                  requests:
                    cpu: "4"
                    memory: 16Gi
                    nvidia.com/gpu: "1"

Which CRD to Choose

graph TD
    Q1{"Model serving?"} -->|Yes| RS["RayService"]
    Q1 -->|No| Q2{"Need auto-cleanup<br/>after job?"}
    Q2 -->|Yes| RJ["RayJob"]
    Q2 -->|No| Q3{"Interactive dev<br/>or multi-job?"}
    Q3 -->|Yes| RC["RayCluster"]
    Q3 -->|No| RJ

Key Features

FeatureDescription
Zero-change scalingSame code runs locally and on 100+ nodes
Heterogeneous resourcesMix CPUs, GPUs, TPUs, custom resources in one cluster
AutoscalingWorker nodes scale based on workload demand
Fault toleranceTasks retry on failure, actors can be restarted, objects reconstructed
Shared memory object storeZero-copy reads for objects on the same node (Arrow/Plasma)
Placement groupsGang scheduling and co-location control
Runtime environmentsPer-task/actor Python dependency isolation
DashboardWeb UI for monitoring jobs, actors, resources, logs
Multi-tenancyMultiple jobs/users share one cluster via namespaces

Practical Example: Batch Inference

import ray

ray.init()

@ray.remote(num_gpus=1)
def predict_batch(model_path: str, batch: list[str]) -> list[float]:
    model = load_model(model_path)
    return [model.predict(item) for item in batch]

# Fan out across all available GPUs
data = [...]  # 10,000 items
batches = [data[i:i+100] for i in range(0, len(data), 100)]
futures = [predict_batch.remote("s3://models/v1.pt", b) for b in batches]
results = ray.get(futures)  # waits for all, returns list of lists

When to Use / When Not to Use

Good fit:

  • Distributed ML training (multi-node, multi-GPU)
  • Batch inference at scale
  • Hyperparameter tuning (many parallel trials)
  • Model serving with complex pre/post-processing
  • Parallelizing any CPU/GPU-bound Python workload
  • Reinforcement learning

Not the right tool for:

  • Simple single-machine scripts (overhead not worth it)
  • Streaming / event-driven pipelines (use Flink, Kafka Streams)
  • Workflow orchestration / DAG management (use Flyte, Airflow — they can call Ray)
  • Cluster-level quota/queue management (use Kueue, Volcano)

Further Resources