Tutorial 13: Data Loading

Async data loading with automatic VRAM management. The DataLoader handles batching, shuffling, device transfer, and prefetching so your training loop stays clean.

Prerequisites: Tensors and Training. CUDA GPU recommended but not required.

Time: ~15 minutes.

Quick start

use flodl::*;
use std::sync::Arc;

// Implement the dataset trait
struct MyDataset { /* ... */ }

impl BatchDataSet for MyDataset {
    fn len(&self) -> usize { 60_000 }
    fn get_batch(&self, indices: &[usize]) -> Vec<Tensor> {
        let images = /* load images for indices */;
        let labels = /* load labels for indices */;
        vec![images, labels]
    }
}

let dataset = Arc::new(MyDataset::new());

let mut loader = DataLoader::from_batch_dataset(dataset)
    .batch_size(32)
    .device(Device::CUDA(0))
    .names(&["image", "label"])
    .build()?;

for epoch in 0..100 {
    for batch in loader.epoch(epoch) {
        let batch = batch?;
        let images = &batch["image"];   // already on GPU
        let labels = &batch["label"];
        // ... training ...
    }
}

Dataset traits

floDl provides two dataset traits. Choose based on how your data is stored:

DataSet (per-item)

Returns one sample at a time. The loader stacks samples into batches automatically.

impl DataSet for MnistDataset {
    fn len(&self) -> usize { self.images.len() }

    fn get(&self, index: usize) -> Vec<Tensor> {
        vec![
            self.images[index].clone(),
            self.labels[index].clone(),
        ]
    }
}

Best for: datasets where each sample is a separate file or DB row.

BatchDataSet (per-batch)

Returns an entire batch at once. More efficient when the data source supports bulk access (memory-mapped files, databases, pre-batched tensors).

impl BatchDataSet for PreloadedDataset {
    fn len(&self) -> usize { self.num_samples }

    fn get_batch(&self, indices: &[usize]) -> Vec<Tensor> {
        let idx = Tensor::from_slice_i64(
            &indices.iter().map(|&i| i as i64).collect::<Vec<_>>()
        );
        vec![
            self.images.index_select(0, &idx),
            self.labels.index_select(0, &idx),
        ]
    }
}

Best for: pre-loaded datasets, memory-mapped files, GPU-resident data.

Both traits require Send + Sync so the prefetch worker can access them from a background thread.

Named batch access

The Batch type supports both positional and named access:

let loader = DataLoader::from_batch_dataset(dataset)
    .batch_size(32)
    .names(&["image", "letter", "case", "origin"])
    .build()?;

for batch in loader.epoch(0) {
    let batch = batch?;

    // Positional access
    let image = &batch[0];

    // Named access
    let image = &batch["image"];
    let letter = &batch["letter"];

    // Introspection
    assert!(batch.has("origin"));
    assert_eq!(batch.names(), &["image", "letter", "case", "origin"]);
    assert_eq!(batch.len(), 4);
}

If .names() is not called, auto-generated positional names (“0”, “1”, …) are used.

Resident vs streaming mode

The loader automatically selects the best mode based on available VRAM:

Resident mode

When the dataset fits in 75% of free VRAM, the entire dataset is loaded onto the GPU once at build() time. Per-epoch reshuffling uses GPU-side index_select with a shuffled permutation tensor. Zero CPU-GPU transfer after the initial load.

Build:   pin_memory() -> to_device() (one-time transfer)
Epoch:   index_select(shuffled_permutation) (GPU-only)

Streaming mode

When the dataset is too large, a persistent background worker thread handles batching and transfer:

Worker thread:  get_batch(indices) -> pin_memory() -> StreamGuard + to_device_async()
                -> CudaEvent (signals readiness)
Main thread:    event.synchronize() (typically instant due to prefetch)
                -> use batch

The worker runs on a dedicated CUDA stream, overlapping data transfer with training computation on the default stream.

Forcing a mode

// Force streaming (useful for benchmarking or preserving VRAM headroom)
.streaming()

// Force a specific prefetch depth
.prefetch(8)

VRAM-aware prefetch

In streaming mode, the prefetch depth adapts automatically to VRAM:

Configuration

// Use up to 90% of total VRAM for data (default)
.vram_max_usage(0.90)

// Use up to 80% (more headroom for activations)
.vram_max_usage(0.80)

// Manual override (disables automatic adaptation)
.prefetch(16)

// Manual resize between epochs
loader.auto_resize();

The default cap of 90% leaves 10% headroom for activation memory, gradients, and CUDA allocator overhead.

Shuffling and sampling

By default, data is shuffled each epoch using a RandomSampler with deterministic per-epoch permutations:

// epoch 0: seed=42+0 -> permutation A
// epoch 1: seed=42+1 -> permutation B
// epoch 0 again: same seed -> same permutation A (reproducible)

Control shuffling

// Custom seed
.seed(12345)

// Disable shuffling (sequential order every epoch)
.shuffle(false)

// Custom sampler
.sampler(Box::new(MyCustomSampler::new()))

Drop last batch

// Drop incomplete final batch (default: true)
.drop_last(true)

The default is true to avoid a BatchNorm footgun: a final batch of size 1 produces NaN variance. Set to false for evaluation/inference where every sample matters.

DDP integration

When used with Graph::set_data_loader(), the loader automatically upgrades to distributed mode:

Ddp::setup(&model, &builder, |p| Adam::new(p, 0.001))?;

let loader = DataLoader::from_batch_dataset(dataset)
    .batch_size(32)
    .names(&["image", "label"])
    .build()?;

model.set_data_loader(loader, "image");

for batch in model.epoch(0) {
    let batch = batch?;
    let loss = model.forward_batch(&batch)?;
    model.step()?;
}

In distributed mode:

For the DDP Builder, pass the dataset directly:

let ddp = Ddp::builder(model_factory, optim_factory, train_fn)
    .dataset(dataset)    // Arc<dyn BatchDataSet>
    .batch_size(32)
    .num_epochs(10)
    .run()?;

Builder reference

Method Default Description
.batch_size(usize) Required Batch size per GPU
.device(Device) CPU Target device (leave as CPU for DDP)
.seed(u64) 42 RNG seed for shuffling
.shuffle(bool) true Enable shuffling
.sampler(Box<dyn Sampler>) Custom sampler (overrides shuffle)
.prefetch(usize) Auto Override auto-detected prefetch depth
.vram_max_usage(f64) 0.90 Max VRAM fraction for prefetch
.streaming() Auto Force streaming mode
.names(&[&str]) Positional Name batch tensor positions
.drop_last(bool) true Drop incomplete final batch

DataLoader methods

Method Description
.epoch(n) Get epoch iterator (reshuffles, adapts prefetch)
.len() Number of samples
.num_batches() Number of batches per epoch
.batch_size() Batch size
.device() Target or gather device
.is_resident() Whether in resident mode
.is_distributed() Whether in distributed mode
.prefetch_depth() Current prefetch depth
.set_prefetch_depth(n) Override prefetch depth
.auto_resize() Re-probe VRAM and adapt prefetch
.names() Tensor names for each batch position