SDK Reference
Complete API documentation for the Cumulus Python SDK.
Installation
# Basic installation
pip install cumulus-sdk
# With PyTorch support (for CumulusTrainer)
pip install cumulus-sdk[torch]
# Full installation (all features)
pip install cumulus-sdk[full]
CumulusClient
The main client for submitting and managing GPU jobs.
Constructor
from cumulus import CumulusClient
client = CumulusClient()
The client automatically reads your API key from the CUMULUS_API_KEY environment variable:
export CUMULUS_API_KEY="your-api-key-here"
Get your API key from the Cumulus Dashboard.
submit()
Submit a job to Cumulus.
job = client.submit(
script="train.py", # Required: path to main script
job_id="my-job-001", # Optional: custom job ID (auto-generated if not provided)
additional_files=["model.py", "utils.py"], # Optional: extra files to include explicitly
include_patterns=["*.yaml", "data/*.csv"], # Optional: glob patterns to include
exclude_patterns=["*.pyc", "__pycache__/*"], # Optional: glob patterns to exclude
requirements=["torch", "transformers"], # Optional: pip packages to install
requirements_file="requirements.txt", # Optional: path to requirements.txt
gpu_count=1, # Optional: number of GPUs (default: 1)
worker_image="pytorch/pytorch:2.5.1-cuda12.4-cudnn9-runtime", # Optional: Docker image
workload_type="training", # Optional: "training", "inference", or "finetuning"
priority=5, # Optional: 1-10, higher = more important (default: 5)
memory_request="8Gi", # Optional: minimum memory (default: 8Gi)
memory_limit="16Gi", # Optional: maximum memory (default: 16Gi)
env={"KEY": "value"}, # Optional: environment variables
model_architecture={ # Optional: model info for VRAM estimation
"architecture_type": "diffusion", # transformer, diffusion, cnn, unet
"base_model": "sdxl", # Known model for accurate baselines
"lora_rank": 32, # LoRA rank for trainable param calc
},
training_config={ # Optional: training config for optimization
"batch_size": 1,
"precision": "fp16",
"peft": True, # LoRA/PEFT fine-tuning
"image_size": 1024, # For diffusion models
"gradient_checkpointing": True, # Reduces VRAM ~30%
},
auto_detect=True, # Optional: auto-detect config from script (default: True)
# Advanced options (see Configuration Reference for details)
sm_percent=50, # Optional: GPU SM percentage (1-100)
vram_gb=20.0, # Optional: VRAM allocation in GB
queue_timeout_seconds=300, # Optional: max queue wait time
service_port=8000, # Optional: port to expose via tunnel
)
Automatic dependency detection: The SDK automatically detects Python imports and data files (configs, models, datasets) referenced in your code. You typically don't need additional_files unless you have files that aren't imported or referenced.
Returns: SubmittedJob
job.job_id # "job-20240103-143052-abc123" - unique job identifier
job.status # "SUBMITTED"
get_status()
Check the current status of a job.
status = client.get_status("job-id")
Returns: One of:
"SUBMITTED"- Job uploaded, waiting to be scheduled"PENDING"- Pod created, waiting for GPU allocation"RUNNING"- Job is executing"SUCCEEDED"- Job completed successfully"FAILED"- Job failed"UNKNOWN"- Job not found or status could not be determined"ERROR"- Communication error with API
wait_for_completion()
Block until a job finishes.
final_status = client.wait_for_completion(
"job-id",
timeout=3600, # Maximum wait time in seconds (default: 3600)
poll_interval=10.0 # How often to check status (default: 10 seconds)
)
Returns: "SUCCEEDED", "FAILED", or "TIMEOUT"
get_results()
Download job output files.
# Get default results file (results.txt)
output = client.get_results("job-id")
# Get a specific file
logs = client.get_results("job-id", file="output.log")
Returns: File contents as a string, or None if not available.
cancel()
Cancel a running or pending job.
success = client.cancel("job-id")
if success:
print("Job cancelled")
Returns: True if cancelled successfully, False otherwise.
list_artifacts()
List all artifacts in a job's S3 folder.
artifacts = client.list_artifacts("job-id")
for artifact in artifacts:
print(f"{artifact['name']} ({artifact['size']} bytes)")
Returns: List of dicts with name, size, last_modified, presigned_url.
download_artifacts()
Download artifacts from a completed job.
# Download all artifacts
files = client.download_artifacts("job-id", "./outputs")
# Download only model files
files = client.download_artifacts(
"job-id",
"./models",
patterns=["*.pt", "checkpoints/*"]
)
Args:
job_id: Job identifierlocal_path: Local directory to download topatterns: Optional glob patterns to filter files
Returns: List of downloaded file paths.
get_tunnel_url()
Get the public tunnel URL for a job with an exposed service_port. Returns immediately.
tunnel_url = client.get_tunnel_url("job-id")
# Returns: "http://tunnel.cumuluslabs.io:8443/12345" or None if not ready
Returns: Tunnel URL string, or None if the tunnel isn't established yet.
Note: The tunnel takes a few seconds to establish after the job starts. Use wait_for_tunnel() if you need to block until it's ready.
Tunnel URLs require your Cumulus API key for access. When making requests to the tunnel URL, include one of:
X-API-Key: <your-api-key>header, orAuthorization: Bearer <your-api-key>header
wait_for_tunnel()
Block until the tunnel URL becomes available. Use this when you need to wait for your inference server to be accessible.
tunnel_url = client.wait_for_tunnel(
"job-id",
timeout=120, # Max wait time (default: 120s)
poll_interval=2.0 # Poll frequency (default: 2s)
)
if tunnel_url:
print(f"Server ready at: {tunnel_url}")
# Make requests to your server
else:
print("Tunnel did not become available within timeout")
Returns: Tunnel URL string, or None if timeout exceeded.
Example: Full inference server workflow
import os
# Submit inference server with exposed port
job = client.submit(
script="server.py",
service_port=8000,
workload_type="inference"
)
# Wait for tunnel
tunnel_url = client.wait_for_tunnel(job.job_id, timeout=300)
# Use the server (authentication required!)
import requests
api_key = os.environ.get("CUMULUS_API_KEY")
resp = requests.get(
f"{tunnel_url}/health",
headers={"X-API-Key": api_key}
)
CumulusJob
A clean, explicit runtime context for training jobs with checkpoint/resume support. This is the recommended way to handle checkpointing.
Basic Usage
from cumulus import CumulusJob
with CumulusJob() as job:
# Check if resuming from previous checkpoint
if job.is_resumed:
model.load_state_dict(job.checkpoint['model'])
optimizer.load_state_dict(job.checkpoint['optimizer'])
for epoch in range(job.start_epoch, num_epochs):
train_one_epoch()
# Keep state updated - saved automatically on eviction
job.state = {
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
'epoch': epoch
}
job.complete()
Properties
| Property | Type | Description |
|---|---|---|
is_resumed | bool | True if resuming from checkpoint |
start_epoch | int | 0 for fresh run, or checkpoint['epoch'] + 1 for resume |
checkpoint | dict or None | Loaded checkpoint state (if resuming) |
state | dict | Current state to save on eviction (you set this) |
job_id | str | Current job identifier |
requeue_count | int | Number of times job has been requeued |
artifacts | ArtifactManager | Manager for uploading files, models, metrics |
Methods
state (setter)
Set the state to save on eviction. Call this periodically (e.g., after each epoch).
job.state = {
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
'epoch': epoch,
'custom_data': my_data
}
complete()
Mark the job as successfully completed. Creates a completion marker in storage.
job.complete()
save_checkpoint()
Manually trigger a checkpoint save. Usually not needed - checkpoints are saved automatically on eviction.
job.save_checkpoint()
get_status()
Get current job status as a dictionary.
status = job.get_status()
# Returns: {'job_id': '...', 'is_resumed': False, 'start_epoch': 0, ...}
artifacts
Access the artifact manager to upload files during training.
with CumulusJob() as job:
# Upload a model checkpoint
job.artifacts.upload_model(model.state_dict(), "checkpoints/epoch_5")
# Upload metrics
job.artifacts.upload_metrics({"loss": 0.5, "acc": 0.95}, "metrics/epoch_5")
CumulusTrainer (Legacy)
A PyTorch training wrapper that adds automatic checkpointing and resume capability.
Note: We recommend using CumulusJob for new projects - it's simpler and more explicit.
Constructor
from cumulus import CumulusTrainer
trainer = CumulusTrainer(
model=model, # Required: PyTorch model (must have state_dict/load_state_dict)
optimizer=optimizer, # Required: PyTorch optimizer
checkpoint_port=8765, # Optional: port for checkpoint server (default: 8765)
extra_state={"lr": 0.001} # Optional: additional state to save/restore
)
start()
Initialize the trainer. If resuming from a checkpoint, this loads the saved state.
start_epoch = trainer.start()
# Returns: 0 for fresh start, or the next epoch to train if resuming
step()
Call after each training step to track progress.
for batch in dataloader:
loss = train_step(model, batch)
trainer.step() # Tracks global step count
end_epoch()
Call at the end of each epoch.
for epoch in range(start_epoch, num_epochs):
train_one_epoch()
trainer.end_epoch(epoch) # Updates epoch counter
save_checkpoint()
Manually save a checkpoint (automatic on eviction).
trainer.save_checkpoint()
mark_complete()
Signal that training finished successfully. Creates a completion marker in storage.
trainer.mark_complete()
Returns: Storage URI of the completion marker, or empty string if not configured.
stop()
Stop the checkpoint server and clean up.
trainer.stop()
Context Manager
with CumulusTrainer(model, optimizer) as trainer:
start_epoch = trainer.start()
for epoch in range(start_epoch, num_epochs):
train_one_epoch()
trainer.end_epoch(epoch)
trainer.mark_complete()
# Automatically stopped on exit
Extra State
Save and restore additional data with your checkpoints:
# Set extra state before training
trainer.set_extra_state("learning_rate", 0.001)
trainer.set_extra_state("best_loss", float('inf'))
# After resuming, retrieve extra state
lr = trainer.get_extra_state("learning_rate", default=0.001)
best = trainer.get_extra_state("best_loss", default=float('inf'))
Properties
trainer.epoch # Current epoch
trainer.global_step # Total steps across all epochs
trainer.is_resumed # True if training was resumed from checkpoint
CheckpointManager
Low-level checkpoint management. Use this for custom checkpoint logic beyond what CumulusTrainer provides.
Constructor
from cumulus import CheckpointManager
manager = CheckpointManager()
When running on Cumulus, configuration is automatically read from environment variables set by Cumulus.
should_resume()
Check if the job should resume from a previous checkpoint.
if manager.should_resume():
state = manager.load_checkpoint()
model.load_state_dict(state['model'])
start_epoch = state['epoch'] + 1
else:
start_epoch = 0
save_checkpoint()
Save state to cloud storage.
uri = manager.save_checkpoint(
state={
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
'epoch': epoch,
'custom_data': my_data
},
filename="checkpoint.pt", # Optional: default is "checkpoint.pt"
use_torch=True # Optional: use torch.save (default: True)
)
Returns: Storage URI where checkpoint was saved.
load_checkpoint()
Load state from cloud storage.
state = manager.load_checkpoint(
filename="checkpoint.pt", # Optional: default is "checkpoint.pt"
use_torch=True, # Optional: use torch.load (default: True)
map_location="cpu" # Optional: device mapping for torch.load
)
Returns: The loaded state dictionary.
mark_complete()
Create a completion marker in storage.
uri = manager.mark_complete()
Properties
manager.is_configured # True if storage and job ID are set
manager.is_requeued # True if this is a requeued job (requeue_count > 0)
manager.job_id # Current job ID
manager.original_job_id # Original job ID (same as job_id if not requeued)
ArtifactManager
Upload files, models, and metrics to cloud storage during job execution.
Access via CumulusJob
with CumulusJob() as job:
manager = job.artifacts
upload_file()
Upload a single file.
uri = job.artifacts.upload_file("output.csv", "results/output")
upload_directory()
Upload a directory recursively.
uri = job.artifacts.upload_directory(
"logs/",
"training_logs",
exclude={".git", "__pycache__"} # Optional exclusions
)
upload_model()
Upload a PyTorch model state dictionary.
uri = job.artifacts.upload_model(
model.state_dict(),
"checkpoints/best_model"
)
upload_metrics()
Upload metrics as JSON.
uri = job.artifacts.upload_metrics(
{"epoch": 5, "loss": 0.123, "accuracy": 0.956},
"metrics/epoch_5"
)
Environment Variables
These are automatically set inside your job when running on Cumulus:
| Variable | Description | Example |
|---|---|---|
JOB_ID | Current job identifier | "job-20240103-143052-abc123" |
ORIGINAL_JOB_ID | Original job ID (for requeued jobs) | "job-20240103-143052-abc123" |
RESUME_FROM_CHECKPOINT | Whether to resume from checkpoint | "true" or "false" |
CUDA_VISIBLE_DEVICES | Assigned GPU index | "0" |
Helper Functions
generate_job_id()
Generate a unique job ID.
from cumulus import generate_job_id
job_id = generate_job_id() # e.g., "job-20240103-143052-a1b2c3"
Error Handling
from cumulus import CumulusClient
client = CumulusClient()
try:
job = client.submit(script="train.py")
except FileNotFoundError:
print("Script not found")
except ValueError as e:
print(f"Invalid configuration: {e}")
# Status checks return "UNKNOWN" on errors
status = client.get_status("nonexistent-job") # Returns "UNKNOWN"
# Results return None if not available
results = client.get_results("job-id")
if results is None:
print("Results not yet available")