Training Examples
Real-world examples to get you training models on Cumulus. Each example is copy-paste ready.
- How to structure training scripts for Cumulus
- Best practices for checkpoint/resume
- Automatic dependency detection in action
1. Hello World: Simple Training
Scenario: You want to verify everything works before running a real job.
Time: 2 minutes
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
# Simple neural network
model = nn.Sequential(
nn.Linear(784, 256),
nn.ReLU(),
nn.Linear(256, 10)
).cuda()
# Synthetic data (replace with your real data)
X = torch.randn(1000, 784)
y = torch.randint(0, 10, (1000,))
loader = DataLoader(TensorDataset(X, y), batch_size=32)
optimizer = torch.optim.Adam(model.parameters())
for epoch in range(10):
for batch_x, batch_y in loader:
batch_x, batch_y = batch_x.cuda(), batch_y.cuda()
loss = nn.CrossEntropyLoss()(model(batch_x), batch_y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"Epoch {epoch}: loss={loss.item():.4f}")
print("Training complete!")
from cumulus import CumulusClient
client = CumulusClient()
job = client.submit(
script="train.py",
requirements=["torch"]
)
print(f"Job submitted: {job.job_id}")
print(f"Check status: client.get_status('{job.job_id}')")
You'll know it worked when: You see "Training complete!" in the results.
2. Production Training with Checkpointing
Scenario: Train a model for hours and automatically resume if interrupted.
Time: 5 minutes to set up
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset
from cumulus import CumulusJob
class SimpleNet(nn.Module):
def __init__(self):
super().__init__()
self.fc1 = nn.Linear(784, 256)
self.fc2 = nn.Linear(256, 10)
def forward(self, x):
return self.fc2(torch.relu(self.fc1(x)))
# Initialize model and optimizer
model = SimpleNet().cuda()
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
# Create data loader (replace with your real data)
X = torch.randn(10000, 784)
y = torch.randint(0, 10, (10000,))
loader = DataLoader(TensorDataset(X, y), batch_size=64, shuffle=True)
# Use CumulusJob for automatic checkpoint/resume
with CumulusJob() as job:
# Resume from checkpoint if this is a requeued job
if job.is_resumed:
print(f"Resuming from epoch {job.checkpoint['epoch']}")
model.load_state_dict(job.checkpoint['model'])
optimizer.load_state_dict(job.checkpoint['optimizer'])
for epoch in range(job.start_epoch, 100):
epoch_loss = 0.0
for batch_x, batch_y in loader:
batch_x, batch_y = batch_x.cuda(), batch_y.cuda()
optimizer.zero_grad()
loss = nn.CrossEntropyLoss()(model(batch_x), batch_y)
loss.backward()
optimizer.step()
epoch_loss += loss.item()
avg_loss = epoch_loss / len(loader)
print(f"Epoch {epoch}: avg_loss={avg_loss:.4f}")
# Update checkpoint state after each epoch
job.state = {
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
'epoch': epoch,
'loss': avg_loss
}
job.complete()
print("Training complete!")
from cumulus import CumulusClient
client = CumulusClient()
job = client.submit(
script="train.py",
requirements=["torch"],
workload_type="training",
priority=7 # Higher priority = less likely to be evicted
)
print(f"Job: {job.job_id}")
# Optional: wait for completion
status = client.wait_for_completion(job.job_id, timeout=7200) # 2 hour timeout
print(f"Final status: {status}")
What happens if interrupted:
- If your job is interrupted, your
job.stateis automatically saved - Your job is automatically requeued with higher priority
- On restart,
job.is_resumedisTrueandjob.checkpointhas your saved state - Training continues from exactly where it left off
3. Fine-Tuning a Transformer (BERT)
Scenario: Fine-tune a HuggingFace model for text classification.
import torch
from transformers import BertForSequenceClassification, BertTokenizer, AdamW
from torch.utils.data import DataLoader
from cumulus import CumulusJob
# Load pre-trained model
model = BertForSequenceClassification.from_pretrained(
'bert-base-uncased',
num_labels=2
).cuda()
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
# Sample training data (replace with your dataset)
texts = ["I love this!", "This is terrible.", "Great product!", "Waste of money."]
labels = [1, 0, 1, 0]
# Tokenize
encodings = tokenizer(texts, truncation=True, padding=True, return_tensors='pt')
dataset = torch.utils.data.TensorDataset(
encodings['input_ids'],
encodings['attention_mask'],
torch.tensor(labels)
)
loader = DataLoader(dataset, batch_size=2, shuffle=True)
optimizer = AdamW(model.parameters(), lr=2e-5)
with CumulusJob() as job:
if job.is_resumed:
model.load_state_dict(job.checkpoint['model'])
optimizer.load_state_dict(job.checkpoint['optimizer'])
for epoch in range(job.start_epoch, 3):
model.train()
total_loss = 0
for input_ids, attention_mask, batch_labels in loader:
input_ids = input_ids.cuda()
attention_mask = attention_mask.cuda()
batch_labels = batch_labels.cuda()
optimizer.zero_grad()
outputs = model(input_ids, attention_mask=attention_mask, labels=batch_labels)
loss = outputs.loss
loss.backward()
optimizer.step()
total_loss += loss.item()
avg_loss = total_loss / len(loader)
print(f"Epoch {epoch}: loss={avg_loss:.4f}")
job.state = {
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
'epoch': epoch
}
job.complete()
print("Fine-tuning complete!")
from cumulus import CumulusClient
client = CumulusClient()
job = client.submit(
script="finetune.py",
requirements=["torch", "transformers"],
workload_type="finetuning",
memory_request="16Gi", # Transformers need more memory
memory_limit="32Gi"
)
4. Multi-File Project with Config
Scenario: Real projects have multiple files. Cumulus automatically detects them.
my-project/
├── main.py # Entry point
├── model.py # Model definition
├── data.py # Data loading
├── config.yaml # Configuration (auto-detected!)
└── requirements.txt
import torch.nn as nn
class MyModel(nn.Module):
def __init__(self, input_dim, hidden_dim, output_dim):
super().__init__()
self.layers = nn.Sequential(
nn.Linear(input_dim, hidden_dim),
nn.ReLU(),
nn.Dropout(0.2),
nn.Linear(hidden_dim, output_dim)
)
def forward(self, x):
return self.layers(x)
model:
input_dim: 784
hidden_dim: 256
output_dim: 10
training:
batch_size: 64
learning_rate: 0.001
epochs: 50
import torch
import yaml
from model import MyModel # Local import - auto-detected!
from cumulus import CumulusJob
# Load config - auto-detected because we reference config.yaml!
with open('config.yaml') as f:
config = yaml.safe_load(f)
model = MyModel(
config['model']['input_dim'],
config['model']['hidden_dim'],
config['model']['output_dim']
).cuda()
optimizer = torch.optim.Adam(
model.parameters(),
lr=config['training']['learning_rate']
)
with CumulusJob() as job:
if job.is_resumed:
model.load_state_dict(job.checkpoint['model'])
for epoch in range(job.start_epoch, config['training']['epochs']):
# Training loop...
x = torch.randn(config['training']['batch_size'], config['model']['input_dim']).cuda()
y = torch.randint(0, config['model']['output_dim'], (config['training']['batch_size'],)).cuda()
loss = torch.nn.functional.cross_entropy(model(x), y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
print(f"Epoch {epoch}: loss={loss.item():.4f}")
job.state = {'model': model.state_dict(), 'epoch': epoch}
job.complete()
from cumulus import CumulusClient
client = CumulusClient()
# Just submit main.py - everything else is auto-detected!
job = client.submit(
script="main.py",
requirements_file="requirements.txt"
)
# model.py, config.yaml are automatically included!
Cumulus analyzes your code and automatically includes:
- Python imports:
from model import MyModel→ includesmodel.py - Data files:
open('config.yaml')→ includesconfig.yaml - Nested packages: If you import from a package, the entire package is included
5. Using Include/Exclude Patterns
Scenario: You need explicit control over which files are included.
from cumulus import CumulusClient
client = CumulusClient()
job = client.submit(
script="main.py",
# Include specific patterns
include_patterns=[
"*.yaml", # All YAML files
"data/*.csv", # CSV files in data folder
"models/**/*.pt" # All .pt files in models (recursive)
],
# Exclude patterns
exclude_patterns=[
"*.pyc", # Compiled Python
"__pycache__/*", # Cache directories
"*.log", # Log files
".git/*" # Git directory
],
requirements_file="requirements.txt"
)
6. SDXL LoRA Fine-Tuning (Diffusion Models)
Scenario: Fine-tune SDXL or Flux for custom image generation with LoRA.
import torch
from diffusers import StableDiffusionXLPipeline, AutoencoderKL
from peft import LoraConfig, get_peft_model
from cumulus import CumulusJob
# Load SDXL with LoRA
model_id = "stabilityai/stable-diffusion-xl-base-1.0"
pipe = StableDiffusionXLPipeline.from_pretrained(
model_id,
torch_dtype=torch.float16,
variant="fp16"
).to("cuda")
# Configure LoRA
lora_config = LoraConfig(
r=32, # LoRA rank
lora_alpha=32,
target_modules=["to_q", "to_v", "to_k", "to_out.0"],
lora_dropout=0.05
)
# Apply LoRA to UNet
pipe.unet = get_peft_model(pipe.unet, lora_config)
optimizer = torch.optim.AdamW(pipe.unet.parameters(), lr=1e-4)
# Enable gradient checkpointing for memory efficiency
pipe.unet.enable_gradient_checkpointing()
with CumulusJob() as job:
if job.is_resumed:
pipe.unet.load_state_dict(job.checkpoint['unet_lora'])
optimizer.load_state_dict(job.checkpoint['optimizer'])
for step in range(job.checkpoint.get('step', 0) if job.is_resumed else 0, 1000):
# Your training loop here
loss = train_step(pipe, optimizer, batch)
if step % 100 == 0:
print(f"Step {step}: loss={loss:.4f}")
job.state = {
'unet_lora': pipe.unet.state_dict(),
'optimizer': optimizer.state_dict(),
'step': step
}
# Save final LoRA weights
pipe.unet.save_pretrained("sdxl_lora_weights")
job.artifacts.upload_directory("sdxl_lora_weights", "lora_weights")
job.complete()
from cumulus import CumulusClient
client = CumulusClient()
job = client.submit(
script="sdxl_lora_train.py",
requirements=["torch", "diffusers", "peft", "transformers", "accelerate"],
workload_type="finetuning",
# Model architecture hints for accurate VRAM estimation
model_architecture={
"architecture_type": "diffusion",
"base_model": "sdxl",
"lora_rank": 32,
"total_params": 2_600_000_000
},
training_config={
"batch_size": 1,
"precision": "fp16",
"peft": True,
"image_size": 1024,
"gradient_checkpointing": True,
"cfg_scale": 7.5
}
)
print(f"Job submitted: {job.job_id}")
# Expected VRAM: ~12-16GB with gradient checkpointing
- Always enable gradient checkpointing for diffusion models - it reduces VRAM by ~30%
- Set
peft: Truewhen doing LoRA to get accurate memory estimates - Include
base_model(e.g., "sdxl", "flux") for known model baselines - Specify
image_size(512, 1024, etc.) as it significantly affects memory usage
7. Distributed Training (Multi-GPU)
Scenario: Train on multiple GPUs for faster training.
import torch
import torch.nn as nn
import torch.distributed as dist
from torch.nn.parallel import DistributedDataParallel as DDP
from cumulus import CumulusJob
def setup_distributed():
dist.init_process_group("nccl")
local_rank = int(os.environ.get("LOCAL_RANK", 0))
torch.cuda.set_device(local_rank)
return local_rank
def main():
local_rank = setup_distributed()
model = nn.Sequential(
nn.Linear(784, 256),
nn.ReLU(),
nn.Linear(256, 10)
).cuda()
model = DDP(model, device_ids=[local_rank])
optimizer = torch.optim.Adam(model.parameters())
with CumulusJob() as job:
if job.is_resumed:
model.module.load_state_dict(job.checkpoint['model'])
for epoch in range(job.start_epoch, 100):
# Training loop
x = torch.randn(32, 784).cuda()
y = torch.randint(0, 10, (32,)).cuda()
loss = nn.CrossEntropyLoss()(model(x), y)
optimizer.zero_grad()
loss.backward()
optimizer.step()
if local_rank == 0: # Only save checkpoint on rank 0
print(f"Epoch {epoch}: loss={loss.item():.4f}")
job.state = {
'model': model.module.state_dict(),
'epoch': epoch
}
if local_rank == 0:
job.complete()
if __name__ == "__main__":
main()
from cumulus import CumulusClient
client = CumulusClient()
job = client.submit(
script="train_distributed.py",
requirements=["torch"],
gpu_count=2, # Request 2 GPUs
memory_request="32Gi"
)
Common Patterns
Save Model Artifact
# At the end of training, save the model
torch.save(model.state_dict(), "model.pt")
print("Saved model.pt")
# This file will be available via client.get_results(job_id, file="model.pt")
Early Stopping
best_loss = float('inf')
patience = 5
patience_counter = 0
for epoch in range(job.start_epoch, max_epochs):
loss = train_one_epoch()
if loss < best_loss:
best_loss = loss
patience_counter = 0
torch.save(model.state_dict(), "best_model.pt")
else:
patience_counter += 1
if patience_counter >= patience:
print(f"Early stopping at epoch {epoch}")
break
job.state = {'model': model.state_dict(), 'epoch': epoch, 'best_loss': best_loss}
Learning Rate Scheduling
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=10, gamma=0.1)
if job.is_resumed:
scheduler.load_state_dict(job.checkpoint['scheduler'])
for epoch in range(job.start_epoch, epochs):
train_one_epoch()
scheduler.step()
job.state = {
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
'scheduler': scheduler.state_dict(),
'epoch': epoch
}
8. Running an Inference Server
Deploy a long-running inference server (SGLang, vLLM, etc.) with a public tunnel.
Tunnel URLs require your Cumulus API key for access. Include it via:
X-API-Key: <your-api-key>header, orAuthorization: Bearer <your-api-key>header
from cumulus import CumulusClient
import requests
import os
client = CumulusClient()
# Submit inference server job with exposed port
job = client.submit(
script="sglang_server.py",
requirements=["sglang", "torch"],
service_port=30000, # SGLang default port
workload_type="inference",
gpu_count=1
)
print(f"Job submitted: {job.job_id}")
# Wait for tunnel to become available
tunnel_url = client.wait_for_tunnel(job.job_id, timeout=300)
if tunnel_url:
print(f"Server ready at: {tunnel_url}")
# Make authenticated requests to tunnel_url
api_key = os.environ.get("CUMULUS_API_KEY")
response = requests.get(
f"{tunnel_url}/health",
headers={"X-API-Key": api_key}
)
print(f"Health check: {response.status_code}")
else:
print("Tunnel not ready within timeout")
9. Uploading Artifacts During Training
Save model checkpoints and metrics during training using the artifact manager.
from cumulus import CumulusJob
import torch
model = MyModel()
optimizer = torch.optim.AdamW(model.parameters())
with CumulusJob() as job:
for epoch in range(job.start_epoch, 10):
train_loss = train_one_epoch(model, optimizer)
val_metrics = evaluate(model)
# Upload metrics every epoch
job.artifacts.upload_metrics(
{"epoch": epoch, "train_loss": train_loss, **val_metrics},
f"metrics/epoch_{epoch}"
)
# Save checkpoint every 5 epochs
if epoch % 5 == 0:
job.artifacts.upload_model(
model.state_dict(),
f"checkpoints/epoch_{epoch}"
)
# Update state for eviction recovery
job.state = {
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
'epoch': epoch
}
# Upload final model
job.artifacts.upload_model(model.state_dict(), "final_model")
job.complete()
Troubleshooting
"Job keeps getting evicted"
- Increase
priority(7-10 for important jobs) - Check if you're using more VRAM than estimated
- Use
memory_requestto ensure adequate memory
"Checkpoint not loading"
- Ensure
job.stateis set before eviction - Verify the checkpoint contains all necessary keys
- Check that model architecture matches between runs
"Files not found"
- Use
include_patternsto explicitly include files - Check paths are relative to your script's directory
- Ensure files exist locally before submitting