Skip to main content

Training Examples

Real-world examples to get you training models on Cumulus. Each example is copy-paste ready.

What you'll learn
  • How to structure training scripts for Cumulus
  • Best practices for checkpoint/resume
  • Automatic dependency detection in action

1. Hello World: Simple Training

Scenario: You want to verify everything works before running a real job.

Time: 2 minutes

train.py
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

# Simple neural network
model = nn.Sequential(
nn.Linear(784, 256),
nn.ReLU(),
nn.Linear(256, 10)
).cuda()

# Synthetic data (replace with your real data)
X = torch.randn(1000, 784)
y = torch.randint(0, 10, (1000,))
loader = DataLoader(TensorDataset(X, y), batch_size=32)

optimizer = torch.optim.Adam(model.parameters())

for epoch in range(10):
for batch_x, batch_y in loader:
batch_x, batch_y = batch_x.cuda(), batch_y.cuda()
loss = nn.CrossEntropyLoss()(model(batch_x), batch_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"Epoch {epoch}: loss={loss.item():.4f}")

print("Training complete!")
submit.py
from cumulus import CumulusClient

client = CumulusClient()
job = client.submit(
script="train.py",
requirements=["torch"]
)

print(f"Job submitted: {job.job_id}")
print(f"Check status: client.get_status('{job.job_id}')")

You'll know it worked when: You see "Training complete!" in the results.


2. Production Training with Checkpointing

Scenario: Train a model for hours and automatically resume if interrupted.

Time: 5 minutes to set up

train.py
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from cumulus import CumulusJob

class SimpleNet(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(784, 256)
self.fc2 = nn.Linear(256, 10)

def forward(self, x):
return self.fc2(torch.relu(self.fc1(x)))

# Initialize model and optimizer
model = SimpleNet().cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)

# Create data loader (replace with your real data)
X = torch.randn(10000, 784)
y = torch.randint(0, 10, (10000,))
loader = DataLoader(TensorDataset(X, y), batch_size=64, shuffle=True)

# Use CumulusJob for automatic checkpoint/resume
with CumulusJob() as job:
# Resume from checkpoint if this is a requeued job
if job.is_resumed:
print(f"Resuming from epoch {job.checkpoint['epoch']}")
model.load_state_dict(job.checkpoint['model'])
optimizer.load_state_dict(job.checkpoint['optimizer'])

for epoch in range(job.start_epoch, 100):
epoch_loss = 0.0
for batch_x, batch_y in loader:
batch_x, batch_y = batch_x.cuda(), batch_y.cuda()

optimizer.zero_grad()
loss = nn.CrossEntropyLoss()(model(batch_x), batch_y)
loss.backward()
optimizer.step()
epoch_loss += loss.item()

avg_loss = epoch_loss / len(loader)
print(f"Epoch {epoch}: avg_loss={avg_loss:.4f}")

# Update checkpoint state after each epoch
job.state = {
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
'epoch': epoch,
'loss': avg_loss
}

job.complete()
print("Training complete!")
submit.py
from cumulus import CumulusClient

client = CumulusClient()
job = client.submit(
script="train.py",
requirements=["torch"],
workload_type="training",
priority=7 # Higher priority = less likely to be evicted
)

print(f"Job: {job.job_id}")

# Optional: wait for completion
status = client.wait_for_completion(job.job_id, timeout=7200) # 2 hour timeout
print(f"Final status: {status}")

What happens if interrupted:

  1. If your job is interrupted, your job.state is automatically saved
  2. Your job is automatically requeued with higher priority
  3. On restart, job.is_resumed is True and job.checkpoint has your saved state
  4. Training continues from exactly where it left off

3. Fine-Tuning a Transformer (BERT)

Scenario: Fine-tune a HuggingFace model for text classification.

finetune.py
import torch
from transformers import BertForSequenceClassification, BertTokenizer, AdamW
from torch.utils.data import DataLoader
from cumulus import CumulusJob

# Load pre-trained model
model = BertForSequenceClassification.from_pretrained(
'bert-base-uncased',
num_labels=2
).cuda()
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')

# Sample training data (replace with your dataset)
texts = ["I love this!", "This is terrible.", "Great product!", "Waste of money."]
labels = [1, 0, 1, 0]

# Tokenize
encodings = tokenizer(texts, truncation=True, padding=True, return_tensors='pt')
dataset = torch.utils.data.TensorDataset(
encodings['input_ids'],
encodings['attention_mask'],
torch.tensor(labels)
)
loader = DataLoader(dataset, batch_size=2, shuffle=True)

optimizer = AdamW(model.parameters(), lr=2e-5)

with CumulusJob() as job:
if job.is_resumed:
model.load_state_dict(job.checkpoint['model'])
optimizer.load_state_dict(job.checkpoint['optimizer'])

for epoch in range(job.start_epoch, 3):
model.train()
total_loss = 0

for input_ids, attention_mask, batch_labels in loader:
input_ids = input_ids.cuda()
attention_mask = attention_mask.cuda()
batch_labels = batch_labels.cuda()

optimizer.zero_grad()
outputs = model(input_ids, attention_mask=attention_mask, labels=batch_labels)
loss = outputs.loss
loss.backward()
optimizer.step()
total_loss += loss.item()

avg_loss = total_loss / len(loader)
print(f"Epoch {epoch}: loss={avg_loss:.4f}")

job.state = {
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
'epoch': epoch
}

job.complete()
print("Fine-tuning complete!")
submit.py
from cumulus import CumulusClient

client = CumulusClient()
job = client.submit(
script="finetune.py",
requirements=["torch", "transformers"],
workload_type="finetuning",
memory_request="16Gi", # Transformers need more memory
memory_limit="32Gi"
)

4. Multi-File Project with Config

Scenario: Real projects have multiple files. Cumulus automatically detects them.

my-project/
├── main.py # Entry point
├── model.py # Model definition
├── data.py # Data loading
├── config.yaml # Configuration (auto-detected!)
└── requirements.txt
model.py
import torch.nn as nn

class MyModel(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim, output_dim)
)

def forward(self, x):
return self.layers(x)
config.yaml
model:
input_dim: 784
hidden_dim: 256
output_dim: 10

training:
batch_size: 64
learning_rate: 0.001
epochs: 50
main.py
import torch
import yaml
from model import MyModel # Local import - auto-detected!
from cumulus import CumulusJob

# Load config - auto-detected because we reference config.yaml!
with open('config.yaml') as f:
config = yaml.safe_load(f)

model = MyModel(
config['model']['input_dim'],
config['model']['hidden_dim'],
config['model']['output_dim']
).cuda()

optimizer = torch.optim.Adam(
model.parameters(),
lr=config['training']['learning_rate']
)

with CumulusJob() as job:
if job.is_resumed:
model.load_state_dict(job.checkpoint['model'])

for epoch in range(job.start_epoch, config['training']['epochs']):
# Training loop...
x = torch.randn(config['training']['batch_size'], config['model']['input_dim']).cuda()
y = torch.randint(0, config['model']['output_dim'], (config['training']['batch_size'],)).cuda()

loss = torch.nn.functional.cross_entropy(model(x), y)
optimizer.zero_grad()
loss.backward()
optimizer.step()

print(f"Epoch {epoch}: loss={loss.item():.4f}")
job.state = {'model': model.state_dict(), 'epoch': epoch}

job.complete()
submit.py
from cumulus import CumulusClient

client = CumulusClient()

# Just submit main.py - everything else is auto-detected!
job = client.submit(
script="main.py",
requirements_file="requirements.txt"
)
# model.py, config.yaml are automatically included!
Automatic Detection

Cumulus analyzes your code and automatically includes:

  • Python imports: from model import MyModel → includes model.py
  • Data files: open('config.yaml') → includes config.yaml
  • Nested packages: If you import from a package, the entire package is included

5. Using Include/Exclude Patterns

Scenario: You need explicit control over which files are included.

submit.py
from cumulus import CumulusClient

client = CumulusClient()
job = client.submit(
script="main.py",
# Include specific patterns
include_patterns=[
"*.yaml", # All YAML files
"data/*.csv", # CSV files in data folder
"models/**/*.pt" # All .pt files in models (recursive)
],
# Exclude patterns
exclude_patterns=[
"*.pyc", # Compiled Python
"__pycache__/*", # Cache directories
"*.log", # Log files
".git/*" # Git directory
],
requirements_file="requirements.txt"
)

6. SDXL LoRA Fine-Tuning (Diffusion Models)

Scenario: Fine-tune SDXL or Flux for custom image generation with LoRA.

sdxl_lora_train.py
import torch
from diffusers import StableDiffusionXLPipeline, AutoencoderKL
from peft import LoraConfig, get_peft_model
from cumulus import CumulusJob

# Load SDXL with LoRA
model_id = "stabilityai/stable-diffusion-xl-base-1.0"
pipe = StableDiffusionXLPipeline.from_pretrained(
model_id,
torch_dtype=torch.float16,
variant="fp16"
).to("cuda")

# Configure LoRA
lora_config = LoraConfig(
r=32, # LoRA rank
lora_alpha=32,
target_modules=["to_q", "to_v", "to_k", "to_out.0"],
lora_dropout=0.05
)

# Apply LoRA to UNet
pipe.unet = get_peft_model(pipe.unet, lora_config)
optimizer = torch.optim.AdamW(pipe.unet.parameters(), lr=1e-4)

# Enable gradient checkpointing for memory efficiency
pipe.unet.enable_gradient_checkpointing()

with CumulusJob() as job:
if job.is_resumed:
pipe.unet.load_state_dict(job.checkpoint['unet_lora'])
optimizer.load_state_dict(job.checkpoint['optimizer'])

for step in range(job.checkpoint.get('step', 0) if job.is_resumed else 0, 1000):
# Your training loop here
loss = train_step(pipe, optimizer, batch)

if step % 100 == 0:
print(f"Step {step}: loss={loss:.4f}")
job.state = {
'unet_lora': pipe.unet.state_dict(),
'optimizer': optimizer.state_dict(),
'step': step
}

# Save final LoRA weights
pipe.unet.save_pretrained("sdxl_lora_weights")
job.artifacts.upload_directory("sdxl_lora_weights", "lora_weights")
job.complete()
submit.py
from cumulus import CumulusClient

client = CumulusClient()
job = client.submit(
script="sdxl_lora_train.py",
requirements=["torch", "diffusers", "peft", "transformers", "accelerate"],
workload_type="finetuning",
# Model architecture hints for accurate VRAM estimation
model_architecture={
"architecture_type": "diffusion",
"base_model": "sdxl",
"lora_rank": 32,
"total_params": 2_600_000_000
},
training_config={
"batch_size": 1,
"precision": "fp16",
"peft": True,
"image_size": 1024,
"gradient_checkpointing": True,
"cfg_scale": 7.5
}
)

print(f"Job submitted: {job.job_id}")
# Expected VRAM: ~12-16GB with gradient checkpointing
Diffusion Model Tips
  • Always enable gradient checkpointing for diffusion models - it reduces VRAM by ~30%
  • Set peft: True when doing LoRA to get accurate memory estimates
  • Include base_model (e.g., "sdxl", "flux") for known model baselines
  • Specify image_size (512, 1024, etc.) as it significantly affects memory usage

7. Distributed Training (Multi-GPU)

Scenario: Train on multiple GPUs for faster training.

train_distributed.py
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from cumulus import CumulusJob

def setup_distributed():
dist.init_process_group("nccl")
local_rank = int(os.environ.get("LOCAL_RANK", 0))
torch.cuda.set_device(local_rank)
return local_rank

def main():
local_rank = setup_distributed()

model = nn.Sequential(
nn.Linear(784, 256),
nn.ReLU(),
nn.Linear(256, 10)
).cuda()

model = DDP(model, device_ids=[local_rank])
optimizer = torch.optim.Adam(model.parameters())

with CumulusJob() as job:
if job.is_resumed:
model.module.load_state_dict(job.checkpoint['model'])

for epoch in range(job.start_epoch, 100):
# Training loop
x = torch.randn(32, 784).cuda()
y = torch.randint(0, 10, (32,)).cuda()

loss = nn.CrossEntropyLoss()(model(x), y)
optimizer.zero_grad()
loss.backward()
optimizer.step()

if local_rank == 0: # Only save checkpoint on rank 0
print(f"Epoch {epoch}: loss={loss.item():.4f}")
job.state = {
'model': model.module.state_dict(),
'epoch': epoch
}

if local_rank == 0:
job.complete()

if __name__ == "__main__":
main()
submit.py
from cumulus import CumulusClient

client = CumulusClient()
job = client.submit(
script="train_distributed.py",
requirements=["torch"],
gpu_count=2, # Request 2 GPUs
memory_request="32Gi"
)

Common Patterns

Save Model Artifact

# At the end of training, save the model
torch.save(model.state_dict(), "model.pt")
print("Saved model.pt")
# This file will be available via client.get_results(job_id, file="model.pt")

Early Stopping

best_loss = float('inf')
patience = 5
patience_counter = 0

for epoch in range(job.start_epoch, max_epochs):
loss = train_one_epoch()

if loss < best_loss:
best_loss = loss
patience_counter = 0
torch.save(model.state_dict(), "best_model.pt")
else:
patience_counter += 1
if patience_counter >= patience:
print(f"Early stopping at epoch {epoch}")
break

job.state = {'model': model.state_dict(), 'epoch': epoch, 'best_loss': best_loss}

Learning Rate Scheduling

scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)

if job.is_resumed:
scheduler.load_state_dict(job.checkpoint['scheduler'])

for epoch in range(job.start_epoch, epochs):
train_one_epoch()
scheduler.step()

job.state = {
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
'scheduler': scheduler.state_dict(),
'epoch': epoch
}

8. Running an Inference Server

Deploy a long-running inference server (SGLang, vLLM, etc.) with a public tunnel.

Authentication Required

Tunnel URLs require your Cumulus API key for access. Include it via:

  • X-API-Key: <your-api-key> header, or
  • Authorization: Bearer <your-api-key> header
submit_server.py
from cumulus import CumulusClient
import requests
import os

client = CumulusClient()

# Submit inference server job with exposed port
job = client.submit(
script="sglang_server.py",
requirements=["sglang", "torch"],
service_port=30000, # SGLang default port
workload_type="inference",
gpu_count=1
)

print(f"Job submitted: {job.job_id}")

# Wait for tunnel to become available
tunnel_url = client.wait_for_tunnel(job.job_id, timeout=300)

if tunnel_url:
print(f"Server ready at: {tunnel_url}")

# Make authenticated requests to tunnel_url
api_key = os.environ.get("CUMULUS_API_KEY")
response = requests.get(
f"{tunnel_url}/health",
headers={"X-API-Key": api_key}
)
print(f"Health check: {response.status_code}")
else:
print("Tunnel not ready within timeout")

9. Uploading Artifacts During Training

Save model checkpoints and metrics during training using the artifact manager.

train.py
from cumulus import CumulusJob
import torch

model = MyModel()
optimizer = torch.optim.AdamW(model.parameters())

with CumulusJob() as job:
for epoch in range(job.start_epoch, 10):
train_loss = train_one_epoch(model, optimizer)
val_metrics = evaluate(model)

# Upload metrics every epoch
job.artifacts.upload_metrics(
{"epoch": epoch, "train_loss": train_loss, **val_metrics},
f"metrics/epoch_{epoch}"
)

# Save checkpoint every 5 epochs
if epoch % 5 == 0:
job.artifacts.upload_model(
model.state_dict(),
f"checkpoints/epoch_{epoch}"
)

# Update state for eviction recovery
job.state = {
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
'epoch': epoch
}

# Upload final model
job.artifacts.upload_model(model.state_dict(), "final_model")
job.complete()

Troubleshooting

"Job keeps getting evicted"

  • Increase priority (7-10 for important jobs)
  • Check if you're using more VRAM than estimated
  • Use memory_request to ensure adequate memory

"Checkpoint not loading"

  • Ensure job.state is set before eviction
  • Verify the checkpoint contains all necessary keys
  • Check that model architecture matches between runs

"Files not found"

  • Use include_patterns to explicitly include files
  • Check paths are relative to your script's directory
  • Ensure files exist locally before submitting