Skip to main content

Inference Examples

Real-world examples for running inference jobs on Cumulus. Each example is copy-paste ready.

What you'll learn
  • Text generation and NLP tasks
  • Computer vision inference
  • Embedding generation at scale
  • Large language model inference

1. Text Generation with GPT-2

Scenario: Generate text completions for a list of prompts.

generate.py
import torch
import json
from transformers import GPT2LMHeadModel, GPT2Tokenizer

# Load model
tokenizer = GPT2Tokenizer.from_pretrained('gpt2')
model = GPT2LMHeadModel.from_pretrained('gpt2').cuda()
model.eval()

prompts = [
"The future of artificial intelligence is",
"In a world where robots",
"The secret to happiness is",
"Scientists recently discovered that"
]

results = []

with torch.no_grad():
for prompt in prompts:
inputs = tokenizer(prompt, return_tensors='pt').to('cuda')

outputs = model.generate(
inputs.input_ids,
max_length=100,
temperature=0.9,
do_sample=True,
top_p=0.95,
pad_token_id=tokenizer.eos_token_id
)

generated = tokenizer.decode(outputs[0], skip_special_tokens=True)
results.append({
"prompt": prompt,
"generated": generated
})
print(f"Generated: {prompt[:30]}...")

# Save results
with open("generations.json", "w") as f:
json.dump(results, f, indent=2)

print(f"Complete! Generated {len(results)} texts.")
submit.py
from cumulus import CumulusClient

client = CumulusClient()

job = client.submit(
script="generate.py",
requirements=["torch", "transformers"],
workload_type="inference"
)

client.wait_for_completion(job.job_id)
output = client.get_results(job.job_id, file="generations.json")
print(output)

You'll know it worked when: You get a generations.json file with completed text for each prompt.


2. Image Classification with ResNet

Scenario: Classify a batch of images using a pre-trained ResNet model.

classify.py
import torch
import json
from torchvision import models, transforms
from PIL import Image
import os

# Load pre-trained ResNet
model = models.resnet50(weights="IMAGENET1K_V1").cuda()
model.eval()

# ImageNet class labels (simplified - first 10)
LABELS = [
"tench", "goldfish", "great white shark", "tiger shark", "hammerhead",
"electric ray", "stingray", "cock", "hen", "ostrich"
]

# Standard ImageNet preprocessing
transform = transforms.Compose([
transforms.Resize(256),
transforms.CenterCrop(224),
transforms.ToTensor(),
transforms.Normalize(
mean=[0.485, 0.456, 0.406],
std=[0.229, 0.224, 0.225]
)
])

# Process all images in images/ directory
image_dir = "images"
results = []

with torch.no_grad():
for filename in os.listdir(image_dir):
if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
image_path = os.path.join(image_dir, filename)
image = Image.open(image_path).convert('RGB')
input_tensor = transform(image).unsqueeze(0).cuda()

output = model(input_tensor)
probabilities = torch.nn.functional.softmax(output[0], dim=0)
top5_prob, top5_idx = torch.topk(probabilities, 5)

results.append({
"filename": filename,
"predictions": [
{"class_id": idx.item(), "probability": prob.item()}
for idx, prob in zip(top5_idx, top5_prob)
]
})
print(f"Classified: {filename}")

# Save results
with open("classifications.json", "w") as f:
json.dump(results, f, indent=2)

print(f"Complete! Classified {len(results)} images.")
submit.py
from cumulus import CumulusClient

client = CumulusClient()

job = client.submit(
script="classify.py",
include_patterns=["images/*.jpg", "images/*.png"], # Include image files
requirements=["torch", "torchvision", "pillow"],
workload_type="inference"
)

3. Sentiment Analysis Pipeline

Scenario: Analyze sentiment for customer reviews or social media posts.

sentiment.py
import json
from transformers import pipeline

# Load sentiment analysis pipeline (uses GPU automatically)
classifier = pipeline(
"sentiment-analysis",
model="distilbert-base-uncased-finetuned-sst-2-english",
device=0 # Use GPU
)

# Sample data - replace with your actual data
reviews = [
"This product exceeded my expectations! Highly recommend.",
"Terrible quality. Broke after one day.",
"It's okay, nothing special but works as advertised.",
"Best purchase I've made this year!",
"Would not recommend. Customer service was unhelpful.",
"Decent value for the price point.",
"Absolutely love it! Already ordered another one.",
"Disappointed with the shipping time."
]

# Process all reviews
results = []
for review in reviews:
prediction = classifier(review)[0]
results.append({
"text": review,
"sentiment": prediction["label"],
"confidence": round(prediction["score"], 4)
})
print(f"{prediction['label']}: {review[:50]}...")

# Calculate summary statistics
positive = sum(1 for r in results if r["sentiment"] == "POSITIVE")
negative = len(results) - positive

summary = {
"total_reviews": len(results),
"positive": positive,
"negative": negative,
"positive_ratio": round(positive / len(results), 2),
"results": results
}

# Save results
with open("sentiment_analysis.json", "w") as f:
json.dump(summary, f, indent=2)

print(f"\nSummary: {positive} positive, {negative} negative ({summary['positive_ratio']*100:.0f}% positive)")
submit.py
from cumulus import CumulusClient

client = CumulusClient()

job = client.submit(
script="sentiment.py",
requirements=["torch", "transformers"],
workload_type="inference",
memory_request="8Gi"
)

4. Batch Embeddings with BERT

Scenario: Generate embeddings for semantic search or clustering.

embeddings.py
import torch
import json
import numpy as np
from transformers import AutoModel, AutoTokenizer

# Load BERT model
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
model = AutoModel.from_pretrained("bert-base-uncased").cuda()
model.eval()

# Sample documents - replace with your data
documents = [
"Machine learning is a subset of artificial intelligence.",
"Deep learning uses neural networks with many layers.",
"Natural language processing enables computers to understand text.",
"Computer vision allows machines to interpret images.",
"Reinforcement learning trains agents through rewards.",
"Transfer learning applies knowledge from one task to another.",
"Generative models create new data similar to training data.",
"Supervised learning uses labeled data for training."
]

# Process in batches for efficiency
batch_size = 4
all_embeddings = []

with torch.no_grad():
for i in range(0, len(documents), batch_size):
batch = documents[i:i+batch_size]

inputs = tokenizer(
batch,
return_tensors="pt",
padding=True,
truncation=True,
max_length=512
).to("cuda")

outputs = model(**inputs)

# Use [CLS] token embedding as document representation
embeddings = outputs.last_hidden_state[:, 0, :].cpu().numpy()
all_embeddings.extend(embeddings.tolist())

print(f"Processed batch {i//batch_size + 1}/{(len(documents)-1)//batch_size + 1}")

# Save results
output = {
"documents": documents,
"embeddings": all_embeddings,
"embedding_dim": len(all_embeddings[0])
}

with open("embeddings.json", "w") as f:
json.dump(output, f)

# Also save as numpy for efficient loading
np.save("embeddings.npy", np.array(all_embeddings))

print(f"Complete! Generated {len(all_embeddings)} embeddings of dimension {len(all_embeddings[0])}")
submit.py
from cumulus import CumulusClient

client = CumulusClient()

job = client.submit(
script="embeddings.py",
requirements=["torch", "transformers", "numpy"],
workload_type="inference",
memory_request="16Gi" # BERT needs more memory
)

client.wait_for_completion(job.job_id)

# Get embeddings
embeddings_json = client.get_results(job.job_id, file="embeddings.json")

5. Large Language Model Inference

Scenario: Run inference with a large model like LLaMA or Mistral.

llm_inference.py
import torch
import json
from transformers import AutoModelForCausalLM, AutoTokenizer

# Model configuration
model_name = "mistralai/Mistral-7B-Instruct-v0.1" # Or your preferred model

print(f"Loading {model_name}...")

tokenizer = AutoTokenizer.from_pretrained(model_name)
model = AutoModelForCausalLM.from_pretrained(
model_name,
torch_dtype=torch.float16, # Use FP16 to save memory
device_map="auto" # Automatically use available GPUs
)

# Prompts to process
prompts = [
"Explain quantum computing in simple terms:",
"Write a haiku about programming:",
"What are three benefits of renewable energy?"
]

results = []

for prompt in prompts:
# Format for instruction-tuned model
formatted_prompt = f"[INST] {prompt} [/INST]"

inputs = tokenizer(formatted_prompt, return_tensors="pt").to(model.device)

with torch.no_grad():
outputs = model.generate(
**inputs,
max_new_tokens=256,
temperature=0.7,
do_sample=True,
top_p=0.9,
pad_token_id=tokenizer.eos_token_id
)

response = tokenizer.decode(outputs[0], skip_special_tokens=True)
# Extract just the response part
response = response.split("[/INST]")[-1].strip()

results.append({
"prompt": prompt,
"response": response
})
print(f"Completed: {prompt[:40]}...")

# Save results
with open("llm_responses.json", "w") as f:
json.dump(results, f, indent=2)

print(f"Complete! Generated {len(results)} responses.")
submit.py
from cumulus import CumulusClient
import os

client = CumulusClient()

job = client.submit(
script="llm_inference.py",
requirements=["torch", "transformers", "accelerate"],
workload_type="inference",
memory_request="32Gi",
memory_limit="64Gi",
env={
"HF_TOKEN": os.environ.get("HF_TOKEN", "your-huggingface-token")
}
)
Large Models

7B+ parameter models require significant GPU VRAM. Ensure your job has adequate memory_request and memory_limit settings.


6. Object Detection with YOLO

Scenario: Detect objects in images using YOLOv5.

detect.py
import torch
import json
from PIL import Image
import os

# Load YOLOv5 model
model = torch.hub.load('ultralytics/yolov5', 'yolov5s', pretrained=True)
model.cuda()
model.eval()

# Process images
image_dir = "images"
results = []

for filename in os.listdir(image_dir):
if filename.lower().endswith(('.png', '.jpg', '.jpeg')):
image_path = os.path.join(image_dir, filename)
image = Image.open(image_path)

# Run detection
detections = model(image)

# Parse results
df = detections.pandas().xyxy[0]
objects = []
for _, row in df.iterrows():
objects.append({
"class": row["name"],
"confidence": round(row["confidence"], 3),
"bbox": {
"x1": int(row["xmin"]),
"y1": int(row["ymin"]),
"x2": int(row["xmax"]),
"y2": int(row["ymax"])
}
})

results.append({
"filename": filename,
"detections": objects,
"object_count": len(objects)
})

print(f"{filename}: Found {len(objects)} objects")

# Save annotated image
detections.save(f"output/{filename}")

# Save detection results
with open("detections.json", "w") as f:
json.dump(results, f, indent=2)

total_objects = sum(r["object_count"] for r in results)
print(f"\nComplete! Processed {len(results)} images, found {total_objects} total objects.")
submit.py
from cumulus import CumulusClient

client = CumulusClient()

job = client.submit(
script="detect.py",
include_patterns=["images/*.jpg", "images/*.png"],
requirements=["torch", "pillow", "ultralytics", "pandas"],
workload_type="inference"
)

7. Speech-to-Text with Whisper

Scenario: Transcribe audio files using OpenAI's Whisper model.

transcribe.py
import torch
import json
import os
from transformers import WhisperProcessor, WhisperForConditionalGeneration
import librosa

# Load Whisper model
processor = WhisperProcessor.from_pretrained("openai/whisper-base")
model = WhisperForConditionalGeneration.from_pretrained("openai/whisper-base").cuda()
model.eval()

# Process audio files
audio_dir = "audio"
results = []

for filename in os.listdir(audio_dir):
if filename.lower().endswith(('.wav', '.mp3', '.flac', '.m4a')):
audio_path = os.path.join(audio_dir, filename)

# Load audio
audio, sample_rate = librosa.load(audio_path, sr=16000)

# Process with Whisper
inputs = processor(audio, sampling_rate=16000, return_tensors="pt")
input_features = inputs.input_features.cuda()

with torch.no_grad():
generated_ids = model.generate(input_features)

transcription = processor.batch_decode(generated_ids, skip_special_tokens=True)[0]

results.append({
"filename": filename,
"transcription": transcription,
"duration_seconds": len(audio) / sample_rate
})

print(f"{filename}: {transcription[:50]}...")

# Save results
with open("transcriptions.json", "w") as f:
json.dump(results, f, indent=2)

total_duration = sum(r["duration_seconds"] for r in results)
print(f"\nComplete! Transcribed {len(results)} files ({total_duration:.1f} seconds of audio)")
submit.py
from cumulus import CumulusClient

client = CumulusClient()

job = client.submit(
script="transcribe.py",
include_patterns=["audio/*.wav", "audio/*.mp3"],
requirements=["torch", "transformers", "librosa"],
workload_type="inference",
memory_request="16Gi"
)

8. Resumable Large-Scale Processing

Scenario: Process millions of items with automatic resume on interruption.

batch_process.py
import torch
import json
from transformers import AutoModel, AutoTokenizer
from cumulus import CumulusJob

# Load model
model = AutoModel.from_pretrained("bert-base-uncased").cuda()
tokenizer = AutoTokenizer.from_pretrained("bert-base-uncased")
model.eval()

# Load large dataset
with open("dataset.json") as f:
all_items = json.load(f)

print(f"Total items to process: {len(all_items)}")

with CumulusJob() as job:
# Resume from checkpoint if interrupted
if job.is_resumed:
processed = job.checkpoint.get("processed", [])
start_idx = job.checkpoint.get("last_index", 0) + 1
print(f"Resuming from item {start_idx}")
else:
processed = []
start_idx = 0

batch_size = 32

for i in range(start_idx, len(all_items), batch_size):
batch = all_items[i:i+batch_size]
texts = [item["text"] for item in batch]

inputs = tokenizer(
texts,
return_tensors="pt",
padding=True,
truncation=True
).to("cuda")

with torch.no_grad():
outputs = model(**inputs)
embeddings = outputs.last_hidden_state[:, 0, :].cpu().tolist()

for item, embedding in zip(batch, embeddings):
processed.append({
"id": item["id"],
"embedding": embedding
})

# Update checkpoint after each batch
job.state = {
"processed": processed,
"last_index": i + len(batch) - 1
}

# Progress logging every 100 batches
if (i // batch_size) % 100 == 0:
print(f"Progress: {len(processed)}/{len(all_items)} ({100*len(processed)/len(all_items):.1f}%)")

job.complete()

# Save final results
with open("results.json", "w") as f:
json.dump(processed, f)

print(f"Complete! Processed {len(processed)} items.")
submit.py
from cumulus import CumulusClient

client = CumulusClient()

job = client.submit(
script="batch_process.py",
include_patterns=["dataset.json"],
requirements=["torch", "transformers"],
workload_type="inference",
priority=5, # Medium priority - can be evicted but will resume
memory_request="16Gi"
)

9. Live Inference Server with Public URL

Scenario: Deploy a long-running inference server (vLLM, SGLang, FastAPI) and get a public URL to access it.

vllm_server.py
from vllm import LLM, SamplingParams
from fastapi import FastAPI
from pydantic import BaseModel
import uvicorn

app = FastAPI()

# Load model at startup
print("Loading model...")
llm = LLM(model="mistralai/Mistral-7B-Instruct-v0.1")
print("Model loaded!")

class CompletionRequest(BaseModel):
prompt: str
max_tokens: int = 256
temperature: float = 0.7

@app.post("/v1/completions")
async def generate(request: CompletionRequest):
sampling_params = SamplingParams(
temperature=request.temperature,
max_tokens=request.max_tokens
)
outputs = llm.generate([request.prompt], sampling_params)
return {
"text": outputs[0].outputs[0].text,
"prompt": request.prompt
}

@app.get("/health")
async def health():
return {"status": "healthy"}

if __name__ == "__main__":
# IMPORTANT: Listen on 0.0.0.0, not localhost
uvicorn.run(app, host="0.0.0.0", port=8000)
Authentication Required

Tunnel URLs require your Cumulus API key for access. Include it via:

  • X-API-Key: <your-api-key> header, or
  • Authorization: Bearer <your-api-key> header
submit_server.py
from cumulus import CumulusClient
import requests
import os

client = CumulusClient()

# Submit server with exposed port and guaranteed resources
job = client.submit(
script="vllm_server.py",
requirements=["vllm", "fastapi", "uvicorn"],
service_port=8000, # Expose this port via tunnel
workload_type="inference",
sm_percent=100, # Full GPU (guaranteed)
vram_gb=40.0, # 40GB VRAM (guaranteed)
gpu_count=1
)

print(f"Job submitted: {job.job_id}")
print("Waiting for server to start...")

# Wait for tunnel URL (server takes time to load model)
tunnel_url = client.wait_for_tunnel(job.job_id, timeout=600)

# Get API key for authenticated requests
api_key = os.environ.get("CUMULUS_API_KEY")

if tunnel_url:
print(f"\n✓ Server ready at: {tunnel_url}")

# Test the server (auth required!)
response = requests.get(
f"{tunnel_url}/health",
headers={"X-API-Key": api_key}
)
print(f"Health check: {response.json()}")

# Make an inference request (auth required!)
response = requests.post(
f"{tunnel_url}/v1/completions",
headers={"X-API-Key": api_key},
json={
"prompt": "[INST] Write a haiku about GPUs [/INST]",
"max_tokens": 100
}
)
print(f"Response: {response.json()['text']}")
else:
print("Server did not start within timeout")
print(f"Check job status: client.get_status('{job.job_id}')")

How it works:

  1. Your server starts and listens on the specified port (e.g., 8000)
  2. Cumulus creates a tunnel from tunnel.cumuluslabs.io to your server
  3. wait_for_tunnel() returns the public URL (e.g., http://tunnel.cumuluslabs.io:8443/54321)
  4. Access requires API key authentication via X-API-Key header

Important notes:

  • Your server must bind to 0.0.0.0, not localhost or 127.0.0.1
  • Use sm_percent and vram_gb for dedicated GPU resources
  • The tunnel URL stays active as long as the job is running
  • All requests to the tunnel URL require your Cumulus API key
  • To stop the server, use client.cancel(job.job_id)

Alternative: SGLang server

sglang_server.py
# SGLang has a built-in server, just run it
import subprocess
subprocess.run([
"python", "-m", "sglang.launch_server",
"--model-path", "mistralai/Mistral-7B-Instruct-v0.1",
"--port", "30000",
"--host", "0.0.0.0"
])
submit_sglang.py
job = client.submit(
script="sglang_server.py",
requirements=["sglang[all]"],
service_port=30000, # SGLang default port
workload_type="inference",
sm_percent=100,
vram_gb=40.0
)

tunnel_url = client.wait_for_tunnel(job.job_id, timeout=600)
print(f"SGLang server at: {tunnel_url}")

Common Patterns

Save Multiple Output Formats

import json
import numpy as np
import pickle

# JSON for human-readable results
with open("results.json", "w") as f:
json.dump(results, f, indent=2)

# NumPy for efficient array storage
np.save("embeddings.npy", embeddings_array)

# Pickle for complex Python objects
with open("model_outputs.pkl", "wb") as f:
pickle.dump(outputs, f)

Progress Logging

import time

start_time = time.time()
total_items = len(data)

for i, item in enumerate(data):
process(item)

if (i + 1) % 100 == 0:
elapsed = time.time() - start_time
rate = (i + 1) / elapsed
remaining = (total_items - i - 1) / rate
print(f"Progress: {i+1}/{total_items} ({rate:.1f} items/sec, ~{remaining:.0f}s remaining)")

Memory-Efficient Processing

import torch

# Clear GPU cache periodically
if i % 100 == 0:
torch.cuda.empty_cache()

# Use gradient checkpointing for large models
model.gradient_checkpointing_enable()

# Process in smaller batches if OOM
try:
outputs = model(large_batch)
except RuntimeError as e:
if "out of memory" in str(e):
torch.cuda.empty_cache()
# Retry with smaller batch
outputs = model(large_batch[:len(large_batch)//2])

Troubleshooting

"CUDA out of memory"

  • Reduce batch size
  • Use FP16: model.half()
  • Clear cache: torch.cuda.empty_cache()
  • Increase memory_limit in job config

"Model not found"

  • Check model name spelling
  • Ensure HF_TOKEN is set for gated models
  • Verify internet access in container

"Job times out"

  • Increase timeout in wait_for_completion()
  • Use CumulusJob for checkpointing on long jobs
  • Consider splitting into smaller jobs

"Results file not found"

  • Ensure file is saved in working directory (not subdirectory)
  • Check filename matches exactly in get_results()
  • Verify job completed successfully before fetching