Skip to main content

SDK Reference

Complete API documentation for the Cumulus Python SDK.

Installation

# Basic installation
pip install cumulus-sdk

# With PyTorch support (for CumulusTrainer)
pip install cumulus-sdk[torch]

# Full installation (all features)
pip install cumulus-sdk[full]

CumulusClient

The main client for submitting and managing GPU jobs.

Constructor

from cumulus import CumulusClient

client = CumulusClient()

The client automatically reads your API key from the CUMULUS_API_KEY environment variable:

export CUMULUS_API_KEY="your-api-key-here"
Getting an API Key

Get your API key from the Cumulus Dashboard.

submit()

Submit a job to Cumulus.

job = client.submit(
script="train.py", # Required: path to main script
job_id="my-job-001", # Optional: custom job ID (auto-generated if not provided)
additional_files=["model.py", "utils.py"], # Optional: extra files to include explicitly
include_patterns=["*.yaml", "data/*.csv"], # Optional: glob patterns to include
exclude_patterns=["*.pyc", "__pycache__/*"], # Optional: glob patterns to exclude
requirements=["torch", "transformers"], # Optional: pip packages to install
requirements_file="requirements.txt", # Optional: path to requirements.txt
gpu_count=1, # Optional: number of GPUs (default: 1)
worker_image="pytorch/pytorch:2.5.1-cuda12.4-cudnn9-runtime", # Optional: Docker image
workload_type="training", # Optional: "training", "inference", or "finetuning"
priority=5, # Optional: 1-10, higher = more important (default: 5)
memory_request="8Gi", # Optional: minimum memory (default: 8Gi)
memory_limit="16Gi", # Optional: maximum memory (default: 16Gi)
env={"KEY": "value"}, # Optional: environment variables
model_architecture={ # Optional: model info for VRAM estimation
"architecture_type": "diffusion", # transformer, diffusion, cnn, unet
"base_model": "sdxl", # Known model for accurate baselines
"lora_rank": 32, # LoRA rank for trainable param calc
},
training_config={ # Optional: training config for optimization
"batch_size": 1,
"precision": "fp16",
"peft": True, # LoRA/PEFT fine-tuning
"image_size": 1024, # For diffusion models
"gradient_checkpointing": True, # Reduces VRAM ~30%
},
auto_detect=True, # Optional: auto-detect config from script (default: True)
# Advanced options (see Configuration Reference for details)
sm_percent=50, # Optional: GPU SM percentage (1-100)
vram_gb=20.0, # Optional: VRAM allocation in GB
queue_timeout_seconds=300, # Optional: max queue wait time
service_port=8000, # Optional: port to expose via tunnel
)

Automatic dependency detection: The SDK automatically detects Python imports and data files (configs, models, datasets) referenced in your code. You typically don't need additional_files unless you have files that aren't imported or referenced.

Returns: SubmittedJob

job.job_id      # "job-20240103-143052-abc123" - unique job identifier
job.status # "SUBMITTED"

get_status()

Check the current status of a job.

status = client.get_status("job-id")

Returns: One of:

  • "SUBMITTED" - Job uploaded, waiting to be scheduled
  • "PENDING" - Pod created, waiting for GPU allocation
  • "RUNNING" - Job is executing
  • "SUCCEEDED" - Job completed successfully
  • "FAILED" - Job failed
  • "UNKNOWN" - Job not found or status could not be determined
  • "ERROR" - Communication error with API

wait_for_completion()

Block until a job finishes.

final_status = client.wait_for_completion(
"job-id",
timeout=3600, # Maximum wait time in seconds (default: 3600)
poll_interval=10.0 # How often to check status (default: 10 seconds)
)

Returns: "SUCCEEDED", "FAILED", or "TIMEOUT"

get_results()

Download job output files.

# Get default results file (results.txt)
output = client.get_results("job-id")

# Get a specific file
logs = client.get_results("job-id", file="output.log")

Returns: File contents as a string, or None if not available.

cancel()

Cancel a running or pending job.

success = client.cancel("job-id")
if success:
print("Job cancelled")

Returns: True if cancelled successfully, False otherwise.

list_artifacts()

List all artifacts in a job's S3 folder.

artifacts = client.list_artifacts("job-id")
for artifact in artifacts:
print(f"{artifact['name']} ({artifact['size']} bytes)")

Returns: List of dicts with name, size, last_modified, presigned_url.

download_artifacts()

Download artifacts from a completed job.

# Download all artifacts
files = client.download_artifacts("job-id", "./outputs")

# Download only model files
files = client.download_artifacts(
"job-id",
"./models",
patterns=["*.pt", "checkpoints/*"]
)

Args:

  • job_id: Job identifier
  • local_path: Local directory to download to
  • patterns: Optional glob patterns to filter files

Returns: List of downloaded file paths.

get_tunnel_url()

Get the public tunnel URL for a job with an exposed service_port. Returns immediately.

tunnel_url = client.get_tunnel_url("job-id")
# Returns: "http://tunnel.cumuluslabs.io:8443/12345" or None if not ready

Returns: Tunnel URL string, or None if the tunnel isn't established yet.

Note: The tunnel takes a few seconds to establish after the job starts. Use wait_for_tunnel() if you need to block until it's ready.

Authentication Required

Tunnel URLs require your Cumulus API key for access. When making requests to the tunnel URL, include one of:

  • X-API-Key: <your-api-key> header, or
  • Authorization: Bearer <your-api-key> header

wait_for_tunnel()

Block until the tunnel URL becomes available. Use this when you need to wait for your inference server to be accessible.

tunnel_url = client.wait_for_tunnel(
"job-id",
timeout=120, # Max wait time (default: 120s)
poll_interval=2.0 # Poll frequency (default: 2s)
)

if tunnel_url:
print(f"Server ready at: {tunnel_url}")
# Make requests to your server
else:
print("Tunnel did not become available within timeout")

Returns: Tunnel URL string, or None if timeout exceeded.

Example: Full inference server workflow

import os

# Submit inference server with exposed port
job = client.submit(
script="server.py",
service_port=8000,
workload_type="inference"
)

# Wait for tunnel
tunnel_url = client.wait_for_tunnel(job.job_id, timeout=300)

# Use the server (authentication required!)
import requests

api_key = os.environ.get("CUMULUS_API_KEY")

resp = requests.get(
f"{tunnel_url}/health",
headers={"X-API-Key": api_key}
)

CumulusJob

A clean, explicit runtime context for training jobs with checkpoint/resume support. This is the recommended way to handle checkpointing.

Basic Usage

from cumulus import CumulusJob

with CumulusJob() as job:
# Check if resuming from previous checkpoint
if job.is_resumed:
model.load_state_dict(job.checkpoint['model'])
optimizer.load_state_dict(job.checkpoint['optimizer'])

for epoch in range(job.start_epoch, num_epochs):
train_one_epoch()

# Keep state updated - saved automatically on eviction
job.state = {
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
'epoch': epoch
}

job.complete()

Properties

PropertyTypeDescription
is_resumedboolTrue if resuming from checkpoint
start_epochint0 for fresh run, or checkpoint['epoch'] + 1 for resume
checkpointdict or NoneLoaded checkpoint state (if resuming)
statedictCurrent state to save on eviction (you set this)
job_idstrCurrent job identifier
requeue_countintNumber of times job has been requeued
artifactsArtifactManagerManager for uploading files, models, metrics

Methods

state (setter)

Set the state to save on eviction. Call this periodically (e.g., after each epoch).

job.state = {
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
'epoch': epoch,
'custom_data': my_data
}

complete()

Mark the job as successfully completed. Creates a completion marker in storage.

job.complete()

save_checkpoint()

Manually trigger a checkpoint save. Usually not needed - checkpoints are saved automatically on eviction.

job.save_checkpoint()

get_status()

Get current job status as a dictionary.

status = job.get_status()
# Returns: {'job_id': '...', 'is_resumed': False, 'start_epoch': 0, ...}

artifacts

Access the artifact manager to upload files during training.

with CumulusJob() as job:
# Upload a model checkpoint
job.artifacts.upload_model(model.state_dict(), "checkpoints/epoch_5")

# Upload metrics
job.artifacts.upload_metrics({"loss": 0.5, "acc": 0.95}, "metrics/epoch_5")

CumulusTrainer (Legacy)

A PyTorch training wrapper that adds automatic checkpointing and resume capability.

Note: We recommend using CumulusJob for new projects - it's simpler and more explicit.

Constructor

from cumulus import CumulusTrainer

trainer = CumulusTrainer(
model=model, # Required: PyTorch model (must have state_dict/load_state_dict)
optimizer=optimizer, # Required: PyTorch optimizer
checkpoint_port=8765, # Optional: port for checkpoint server (default: 8765)
extra_state={"lr": 0.001} # Optional: additional state to save/restore
)

start()

Initialize the trainer. If resuming from a checkpoint, this loads the saved state.

start_epoch = trainer.start()
# Returns: 0 for fresh start, or the next epoch to train if resuming

step()

Call after each training step to track progress.

for batch in dataloader:
loss = train_step(model, batch)
trainer.step() # Tracks global step count

end_epoch()

Call at the end of each epoch.

for epoch in range(start_epoch, num_epochs):
train_one_epoch()
trainer.end_epoch(epoch) # Updates epoch counter

save_checkpoint()

Manually save a checkpoint (automatic on eviction).

trainer.save_checkpoint()

mark_complete()

Signal that training finished successfully. Creates a completion marker in storage.

trainer.mark_complete()

Returns: Storage URI of the completion marker, or empty string if not configured.

stop()

Stop the checkpoint server and clean up.

trainer.stop()

Context Manager

with CumulusTrainer(model, optimizer) as trainer:
start_epoch = trainer.start()
for epoch in range(start_epoch, num_epochs):
train_one_epoch()
trainer.end_epoch(epoch)
trainer.mark_complete()
# Automatically stopped on exit

Extra State

Save and restore additional data with your checkpoints:

# Set extra state before training
trainer.set_extra_state("learning_rate", 0.001)
trainer.set_extra_state("best_loss", float('inf'))

# After resuming, retrieve extra state
lr = trainer.get_extra_state("learning_rate", default=0.001)
best = trainer.get_extra_state("best_loss", default=float('inf'))

Properties

trainer.epoch        # Current epoch
trainer.global_step # Total steps across all epochs
trainer.is_resumed # True if training was resumed from checkpoint

CheckpointManager

Low-level checkpoint management. Use this for custom checkpoint logic beyond what CumulusTrainer provides.

Constructor

from cumulus import CheckpointManager

manager = CheckpointManager()

When running on Cumulus, configuration is automatically read from environment variables set by Cumulus.

should_resume()

Check if the job should resume from a previous checkpoint.

if manager.should_resume():
state = manager.load_checkpoint()
model.load_state_dict(state['model'])
start_epoch = state['epoch'] + 1
else:
start_epoch = 0

save_checkpoint()

Save state to cloud storage.

uri = manager.save_checkpoint(
state={
'model': model.state_dict(),
'optimizer': optimizer.state_dict(),
'epoch': epoch,
'custom_data': my_data
},
filename="checkpoint.pt", # Optional: default is "checkpoint.pt"
use_torch=True # Optional: use torch.save (default: True)
)

Returns: Storage URI where checkpoint was saved.

load_checkpoint()

Load state from cloud storage.

state = manager.load_checkpoint(
filename="checkpoint.pt", # Optional: default is "checkpoint.pt"
use_torch=True, # Optional: use torch.load (default: True)
map_location="cpu" # Optional: device mapping for torch.load
)

Returns: The loaded state dictionary.

mark_complete()

Create a completion marker in storage.

uri = manager.mark_complete()

Properties

manager.is_configured    # True if storage and job ID are set
manager.is_requeued # True if this is a requeued job (requeue_count > 0)
manager.job_id # Current job ID
manager.original_job_id # Original job ID (same as job_id if not requeued)

ArtifactManager

Upload files, models, and metrics to cloud storage during job execution.

Access via CumulusJob

with CumulusJob() as job:
manager = job.artifacts

upload_file()

Upload a single file.

uri = job.artifacts.upload_file("output.csv", "results/output")

upload_directory()

Upload a directory recursively.

uri = job.artifacts.upload_directory(
"logs/",
"training_logs",
exclude={".git", "__pycache__"} # Optional exclusions
)

upload_model()

Upload a PyTorch model state dictionary.

uri = job.artifacts.upload_model(
model.state_dict(),
"checkpoints/best_model"
)

upload_metrics()

Upload metrics as JSON.

uri = job.artifacts.upload_metrics(
{"epoch": 5, "loss": 0.123, "accuracy": 0.956},
"metrics/epoch_5"
)

Environment Variables

These are automatically set inside your job when running on Cumulus:

VariableDescriptionExample
JOB_IDCurrent job identifier"job-20240103-143052-abc123"
ORIGINAL_JOB_IDOriginal job ID (for requeued jobs)"job-20240103-143052-abc123"
RESUME_FROM_CHECKPOINTWhether to resume from checkpoint"true" or "false"
CUDA_VISIBLE_DEVICESAssigned GPU index"0"

Helper Functions

generate_job_id()

Generate a unique job ID.

from cumulus import generate_job_id

job_id = generate_job_id() # e.g., "job-20240103-143052-a1b2c3"

Error Handling

from cumulus import CumulusClient

client = CumulusClient()

try:
job = client.submit(script="train.py")
except FileNotFoundError:
print("Script not found")
except ValueError as e:
print(f"Invalid configuration: {e}")

# Status checks return "UNKNOWN" on errors
status = client.get_status("nonexistent-job") # Returns "UNKNOWN"

# Results return None if not available
results = client.get_results("job-id")
if results is None:
print("Results not yet available")