Skip to content

Data Pipeline

Molfun's data pipeline transforms raw biological data into training-ready batches through four stages: Source, Parser, Dataset, DataLoader.

Pipeline overview

flowchart LR
    subgraph Sources
        PDB["PDBFetcher"]
        AFF["AffinityFetcher"]
        MSA["MSAProvider"]
        COL["fetch_collection()"]
    end

    subgraph Parsers
        PDBP["PDBParser"]
        MMCIF["MMCIFParser"]
        A3M["A3MParser"]
        FASTA["FASTAParser"]
        SDF["SDFParser"]
        MOL2["Mol2Parser"]
        RES["ResidueParser"]
    end

    subgraph Datasets
        SD["StructureDataset"]
        AD["AffinityDataset"]
        SSD["StreamingStructureDataset"]
    end

    subgraph Training
        DL["DataLoader"]
        SPL["DataSplitter"]
    end

    PDB --> PDBP
    PDB --> MMCIF
    AFF --> SDF
    AFF --> MOL2
    MSA --> A3M
    COL --> PDB

    PDBP --> SD
    MMCIF --> SD
    MMCIF --> SSD
    A3M --> SD
    FASTA --> SD
    SDF --> AD
    MOL2 --> AD

    SD --> SPL
    AD --> SPL
    SSD --> DL
    SPL --> DL

Stage 1: Sources

Sources fetch raw data from external services or local caches.

Source What it provides Input
PDBFetcher PDB/mmCIF structure files PDB IDs, RCSB search queries
AffinityFetcher Protein-ligand binding affinity data Target IDs, compound libraries
MSAProvider Multiple sequence alignments Sequences, databases
from molfun.data import PDBFetcher, AffinityFetcher, MSAProvider

# Fetch structures
fetcher = PDBFetcher(cache_dir="data/pdb_cache")
records = fetcher.fetch(["1AKE", "4HHB", "6LU7"])

# Fetch binding data
affinity = AffinityFetcher()
pairs = affinity.fetch(target_ids=["P00533"])

# Get MSAs
msa = MSAProvider(database="uniref90")
alignment = msa.search("MKFLILLFNILCLFPVLAADNH...")

Collections

Collections are curated sets of PDB IDs for specific research tasks.

from molfun.data import fetch_collection, list_collections

# See available collections
print(list_collections())

# Fetch a domain-specific set
records = fetch_collection("kinases")

Stage 2: Parsers

Parsers read raw file formats and extract typed feature dictionaries.

Parser Input format Key outputs
PDBParser .pdb Coordinates, residues, chains, B-factors
MMCIFParser .cif Coordinates, residues, metadata, resolution
A3MParser .a3m MSA matrix, deletion matrix
FASTAParser .fasta Sequences, headers
SDFParser .sdf Atom coords, bonds, molecular features
Mol2Parser .mol2 Atom coords, bonds, partial charges
ResidueParser -- Residue-level feature extraction
from molfun.data.parsers import PDBParser, A3MParser

# Parse a structure
parser = PDBParser()
features = parser.parse("data/1ake.pdb")
# features["aatype"], features["all_atom_positions"], features["residue_index"], ...

# Parse an MSA
msa_parser = A3MParser()
msa_features = msa_parser.parse("data/1ake.a3m")
# msa_features["msa"], msa_features["msa_mask"], ...

Stage 3: Datasets

Datasets wrap parsed features into PyTorch Dataset objects.

StructureDataset

Standard map-style dataset for protein structures. Each item is a feature dict ready for model consumption.

from molfun.data import StructureDataset

dataset = StructureDataset(
    records=records,
    max_length=512,
)

AffinityDataset

For protein-ligand binding affinity prediction. Returns (features, target) tuples.

from molfun.data import AffinityDataset

dataset = AffinityDataset(
    records=affinity_records,
    target_column="pKd",
)

StreamingStructureDataset

Iterable dataset for large-scale training. Streams data from local or remote storage without loading everything into memory.

from molfun.data import StreamingStructureDataset

dataset = StreamingStructureDataset(
    index_path="s3://bucket/train_index.csv",
    storage_options={"endpoint_url": "http://minio:9000"},
)

Stage 4: Splitting

DataSplitter provides static methods that return (train, val, test) subsets. All methods return torch.utils.data.Subset objects.

Method Strategy When to use
DataSplitter.random() Random shuffle Quick experiments, non-protein tasks
DataSplitter.by_sequence_identity() Cluster by sequence identity Avoid homology leakage (recommended for structure tasks)
DataSplitter.by_family() Split by protein family Evaluate generalization to unseen families
from molfun.data import DataSplitter

# Random split (fast, ignores homology)
train, val, test = DataSplitter.random(dataset, val_frac=0.1, test_frac=0.1, seed=42)

# Identity-based split (prevents data leakage)
train, val, test = DataSplitter.by_sequence_identity(dataset, threshold=0.3)

Sequence identity leakage

Random splits can place homologous proteins in both train and test sets, inflating benchmark scores. For structure prediction tasks, always use by_sequence_identity() or by_family().


Storage abstraction

All data I/O goes through molfun.data.storage, a thin wrapper around fsspec that transparently handles local and remote paths.

flowchart TB
    API["open_path() / list_files() / exists()"]

    API --> Local["Local filesystem"]
    API --> S3["S3 / MinIO"]
    API --> GCS["Google Cloud Storage"]
    API --> Azure["Azure Blob"]
    API --> HTTP["HTTP/HTTPS"]

    style API fill:#0891b2,stroke:#0e7490,color:#ffffff
Function Purpose
open_path(path, mode, storage_options) Open any path (local or remote) as a file handle
list_files(glob_pattern) List files matching a glob on any filesystem
exists(path) Check if a path exists
ensure_dir(path) Create directory (local only)
is_remote(path) Check if a path is a remote URI
from molfun.data.storage import open_path, list_files

# Local -- works without fsspec
with open_path("data/index.csv") as f:
    ...

# S3
with open_path("s3://bucket/index.csv") as f:
    ...

# MinIO (S3-compatible)
with open_path(
    "s3://bucket/index.csv",
    storage_options={"endpoint_url": "http://localhost:9000",
                     "key": "minioadmin", "secret": "minioadmin"},
) as f:
    ...

# List remote files
files = list_files("s3://bucket/pdbs/*.cif")

Graceful fallback

Local paths work without fsspec installed. The library is only imported when a remote URI (s3://, gs://, etc.) is detected.