Customizing Pipelines and Data Workflows: Advanced Models and Efficient Processing

Introduction: From Magic Pipelines to Master Chef — Why Custom Data Workflows Matter — Article 8

Rick Hightower

Originally published on Medium.

Introduction: From Magic Pipelines to Master Chef — Why Custom Data Workflows Matter — Article 8

Customizing Pipelines and Data Workflows: Advanced Models and Efficient Processing

Customizing Pipelines and Data Workflows: Advanced Models and Efficient Processing

  • Covers Pipeline Anatomy including components, customization, debugging
  • Explores Custom Workflows with preprocessing, composition, business logic
  • Details Efficient Data handling with Datasets library and streaming
  • Shows Optimization techniques from batching to edge deployment
  • Presents Synthetic Data generation for augmentation and fairness
# Install poetry if 
not
 already installed
curl 
-
sSL 
<
https:
/
/
install.python
-
poetry.org
>
 
|
 python3 
-
# 
Create
 
new
 project
poetry 
new
 huggingface
-
workflows
cd huggingface
-
workflows
# 
Add
 dependencies 
with
 
2025
 versions
poetry 
add
 "transformers>=4.53.0,<5.0.0" torch torchvision torchaudio
poetry 
add
 "datasets>=3.6.0" "diffusers>=0.30.0" accelerate sentencepiece
poetry 
add
 "peft>=1.0.0" pillow soundfile bitsandbytes
poetry 
add
 
--group dev jupyter ipykernel matplotlib
  • Poetry installation provides isolated package management
  • Project creation establishes clean workspace structure
  • Flexible versioning (e.g., >=4.40.0,<5.0.0) allows minor updates while preventing breaking changes
  • Development dependencies (-group dev) separate production from exploration tools
# Download and install mini-conda from <https://docs.conda.io/en/latest/miniconda.html>
# Create environment with Python 3.12.10
conda create 
-
n huggingface
-
workflows python
=
3.12
.10
conda activate huggingface
-
workflows
# Install packages
conda install 
-
c
 pytorch 
-
c
 huggingface transformers torch torchvision torchaudio
conda install 
-
c
 conda
-
forge datasets diffusers accelerate pillow soundfile matplotlib
pip install sentencepiece bitsandbytes
  • Conda environments provide complete Python isolation
  • Channel specification (-c pytorch) ensures compatible binaries
  • Mixed conda/pip installation handles packages not available in conda
  • Python 3.12.9 provides latest stable features and performance
# Install pyenv (macOS/Linux)
curl https://pyenv.run | bash
# Configure shell (add to ~/.bashrc or ~/.zshrc)
export
 PATH=
"
$HOME
/.pyenv/bin:
$PATH
"
eval
 
"
$(pyenv init -)
"
# Install Python 3.12.10 with pyenv
pyenv install 3.12.10
pyenv 
local
 3.12.10
# Create virtual environment
python -m venv venv
source
 venv/bin/activate  
# On Windows: venv\\Scripts\\activate
# Install packages with 2025 versions
pip install 
"transformers>=4.53.0,<5.0.0"
 torch torchvision torchaudio
pip install 
"datasets>=3.6.0"
 
"diffusers>=0.30.0"
 
"peft>=1.0.0"
 accelerate sentencepiece
pip install pillow soundfile bitsandbytes jupyter matplotlib
  • Pyenv manages multiple Python versions without system conflicts
  • Local Python version (.python-version file) ensures consistency
  • Virtual environment isolation prevents package conflicts
  • Flexible versioning protects against breaking changes
# Safe 
update
 process
poetry 
update
 
--dry-run  # Preview changes
poetry 
show
 
--outdated   # Check available updates
# 
Update
 
specific
 packages safely
poetry 
add
 transformers
@latest
 
--dry-run
poetry 
add
 transformers@
~
4.53
.0
  # Allow patch updates 
only
# 
For
 pip users
pip install 
--upgrade transformers --dry-run
pip install transformers
=
=
4.53
.2
  # Pin 
to
 
specific
 version
# Modern quick-start with explicit model and device
from
 transformers 
import
 pipeline
# Specify model checkpoint and device for reproducibility
clf = pipeline(
    
'sentiment-analysis'
,
    model=
'microsoft/Phi-3-mini-4k-instruct'
,  
# 2025 state-of-the-art
    device=
0
  
# 0 for CUDA GPU, -1 for CPU, 'mps' for Apple Silicon
)
# Run prediction on text
result = clf(
'I love Hugging Face!'
)
print
(result)
# Output: [{'label': 'POSITIVE', 'score': 0.9998}]
# Check model card: <https://huggingface.co/distilbert-base-uncased-finetuned-sst-2-english>
  • Pipeline initialization downloads and caches the model automatically.

  • Explicit model specification ensures consistent results across runs.

  • Device parameter controls hardware acceleration (GPU/CPU/MPS).

  • Single function call handles tokenization, inference, and result formatting.

  • Model card URL provides training details and limitations.

  • Custom data cleaning (HTML, emojis, multilingual text).

  • Chained models (sentiment + entity recognition).

  • Speed optimization (batching, device placement).

  • Business logic (filtering, compliance checks).

  • Scale (streaming, batch processing).

  • Clean data from Twitter, Amazon, and internal systems.

  • Add product metadata.

  • Process 10,000 reviews per minute.

  • Log for compliance.

  • Stream from S3 buckets.

def
 
custom_preprocess
(
text
):
    
# Normalize text for consistent predictions
    
import
 string
    text = text.lower()
    
return
 text.translate(
str
.maketrans(
''
, 
''
, string.punctuation))
texts = [
"Wow! Amazing product!!!"
, 
"I don't like this..."
]
# Clean then predict
cleaned = [custom_preprocess(t) 
for
 t 
in
 texts]
results = clf(cleaned, batch_size=
16
)  
# Batch for speed!
print
(results)
  1. Define preprocessing (lowercase, strip punctuation)
  2. Clean inputs before pipeline processing
  3. Use batch_size for 5x faster inference
  4. Get reliable predictions on normalized data
from
 transformers 
import
 Pipeline
class
 
CustomSentimentPipeline
(
Pipeline
):
    
def
 
_sanitize_parameters
(
self, **kwargs
):
        
"""Separate preprocessing, forward, and postprocessing parameters."""
        preprocess_kwargs = {}
        postprocess_kwargs = {}
        
# Extract custom parameters
        
if
 
"normalize_text"
 
in
 kwargs:
            preprocess_kwargs[
"normalize_text"
] = kwargs.pop(
"normalize_text"
)
        
if
 
"confidence_threshold"
 
in
 kwargs:
            postprocess_kwargs[
"confidence_threshold"
] = kwargs.pop(
"confidence_threshold"
)
        
return
 preprocess_kwargs, {}, postprocess_kwargs
    
def
 
preprocess
(
self, inputs, normalize_text=
True
):
        
# Strip HTML, normalize text
        
if
 normalize_text:
            text = inputs.lower()
            
import
 string
            text = text.translate(
str
.maketrans(
''
, 
''
, string.punctuation))
        
else
:
            text = inputs
        
return
 
super
().preprocess(text)
    
def
 
postprocess
(
self, outputs, confidence_threshold=
0.95
):
        
# Add confidence thresholds
        results = 
super
().postprocess(outputs)
        
for
 r 
in
 results:
            r[
'confident'
] = r[
'score'
] > confidence_threshold
        
return
 results
  • _sanitize_parameters method separates pipeline stage parameters
  • Custom parameters enable runtime configuration flexibility
  • preprocess method intercepts input before tokenization
  • Text normalization ensures consistent model behavior
  • postprocess method enhances output with business logic
  • Confidence flag enables downstream filtering
  • Inheritance preserves all pipeline functionality
from
 datasets 
import
 load_dataset
# Stream massive datasets without memory issues
dataset = load_dataset(
'csv'
, data_files=
'reviews.csv'
,
                      split=
'train'
, streaming=
True
)
batch_size = 
32
batch = []
for
 example 
in
 dataset:
    batch.append(custom_preprocess(example[
'text'
]))
    
if
 
len
(batch) == batch_size:
        results = clf(batch, batch_size=batch_size)
        
# Process results (save, log, etc.)
        batch = []
  • Streaming mode loads data on-demand, not all at once.

  • Batch accumulation balances memory use and processing speed.

  • Results processing happens immediately, preventing memory buildup.

  • Empty batch reset prevents memory leaks.

  • Pattern scales to terabyte-sized datasets.

  • Pipelines provide fast starts but limit production flexibility.

  • Always specify model + device for reproducibility.

  • Custom workflows handle real business requirements.

  • Batch processing can deliver 10x throughput improvements.

Customizing Pipelines and Data Workflows: Advanced Models and Efficient Processing

  • Pipeline orchestrates the complete inference workflow.

  • Tokenizer converts text to model-compatible token IDs.

  • Model performs actual neural network computation.

  • Processor handles multimodal inputs (images, audio).

  • Arrows show dependency relationships between components.

  • Tokenizer: The translator. Converts “Hello world” → [101, 7592, 2088, 102]

  • Model: The brain. Neural network processing tokens → predictions

  • Processor: The prep cook. Resizes images, extracts audio features (multimodal tasks)

from
 transformers 
import
 pipeline
clf = pipeline(
'text-classification'
)
print
(
'Model:'
, clf.model)
print
(
'Tokenizer:'
, clf.tokenizer)
print
(
'Processor:'
, 
getattr
(clf, 
'processor'
, 
None
))
print
(
'Framework:'
, clf.framework)  
# pytorch or tensorflow
  • Pipeline creation automatically selects compatible components
  • Model inspection reveals architecture details
  • Tokenizer check ensures vocabulary compatibility
  • Processor presence indicates multimodal capabilities
  • Framework detection helps with debugging
  1. Swap components — Use custom models/tokenizers
  2. Compose pipelines — Chain multiple tasks
  3. Register new types — Create reusable workflows
from
 transformers 
import
 Pipeline, pipeline
from
 transformers.pipelines 
import
 register_pipeline
class
 
SentimentNERPipeline
(
Pipeline
):
    
def
 
__init__
(
self, sentiment_pipeline, ner_pipeline, **kwargs
):
        self.sentiment_pipeline = sentiment_pipeline
        self.ner_pipeline = ner_pipeline
        
super
().__init__(
            model=sentiment_pipeline.model,
            tokenizer=sentiment_pipeline.tokenizer,
            **kwargs
        )
    
def
 
_forward
(
self, inputs
):
        sentiment = self.sentiment_pipeline(inputs)
        entities = self.ner_pipeline(inputs)
        
return
 {
"sentiment"
: sentiment, 
"entities"
: entities}
# Register for reuse
register_pipeline(
    task=
"sentiment-ner"
,
    pipeline_class=SentimentNERPipeline,
    pt_model=
True
)
# Use it!
pipe = pipeline(
"sentiment-ner"
)
result = pipe(
"Apple Inc. makes amazing products!"
)
# {'sentiment': [{'label': 'POSITIVE', 'score': 0.99}],
#  'entities': [{'word': 'Apple Inc.', 'entity': 'ORG'}]}
  • Custom pipeline class combines two existing pipelines.
  • _forward method orchestrates both models.
  • Registration makes pipeline reusable across projects.
  • Single call returns combined analysis.
  • Pattern extends to any pipeline combination.
from
 transformers.utils 
import
 logging
logging.set_verbosity_debug()
# Now see EVERYTHING
clf = pipeline(
'text-classification'
)
result = clf(
'Debug me!'
)
  • Debug logging exposes tokenization steps.

  • Model loading progress becomes visible.

  • Token-to-ID mapping appears in logs.

  • Inference timing helps identify bottlenecks.

  • Model/tokenizer mismatch → Check families match.

  • Wrong input format → Pipelines expect strings, lists, or dicts.

  • Memory errors → Reduce batch size or max_length.

  • Slow inference → Enable Flash Attention (GPU) or batch more.

Customizing Pipelines and Data Workflows: Advanced Models and Efficient Processing

  • Raw data flows through loading decision based on size.
  • Small datasets load entirely into memory for speed.
  • Large datasets stream to avoid memory limits.
  • Transformations apply via map operations.
  • Filtering removes unwanted examples.
  • Batch processing optimizes model inference.
  • Version control and annotation integrate seamlessly.
from
 datasets 
import
 load_dataset
# Load IMDB reviews
dataset = load_dataset(
'imdb'
, split=
'train'
)
print
(
f"Dataset size: 
{
len
(dataset)}
"
)  
# 25,000 examples
print
(dataset[
0
])  
# {'text': '...', 'label': 1}
# Custom data? Easy!
custom = load_dataset(
'csv'
, data_files=
'reviews.csv'
)
  • load_dataset downloads and caches data automatically
  • Split specification loads only needed portions.
  • First access downloads; subsequent uses hit cache.
  • CSV loading works identically to hub datasets.
  • Memory mapping prevents RAM overflow.
def
 
preprocess
(
batch
):
    
# Process entire batches at once
    batch[
'text'
] = [text.lower() 
for
 text 
in
 batch[
'text'
]]
    batch[
'length'
] = [
len
(text.split()) 
for
 text 
in
 batch[
'text'
]]
    
return
 batch
# Transform with parallel processing
dataset = dataset.
map
(preprocess, batched=
True
, num_proc=
4
)
# Filter short reviews
dataset = dataset.
filter
(
lambda
 x: x[
'length'
] > 
20
)
  • Batch processing operates on multiple examples simultaneously
  • List comprehensions process entire columns efficiently
  • Parallel processing (num_proc=4) uses multiple CPU cores
  • Filtering creates view without copying data
  • Chaining operations maintains memory efficiency
# Stream without loading everything
wiki = load_dataset(
'wikipedia'
, 
'20220301.en'
,
                   split=
'train'
, streaming=
True
)
# Process as you go
for
 i, article 
in
 
enumerate
(wiki):
    
if
 i >= 
1000
:  
# Process first 1000
        
break
    
# Your processing here
    process_article(article[
'text'
])
  • streaming=True enables lazy loading
  • Data loads only when accessed
  • Enumeration provides progress tracking
  • Early break prevents infinite processing
  • Memory stays constant regardless of dataset size
# Best practices for annotation
from
 datasets 
import
 Dataset
# 1. Start small - annotate 100 examples
pilot_data = dataset.select(
range
(
100
))
# 2. Use Argilla for team annotation
# See Article 12 for Argilla + HF integration
# 3. Version your annotations
# dataset.push_to_hub("company/product-reviews-v2")
# 4. Track changes with lakeFS for compliance
  • Pilot annotation validates labeling guidelines
  • Small batches prevent wasted effort
  • Team tools ensure consistency
  • Version control enables reproducibility
  • Change tracking satisfies compliance requirements

Customizing Pipelines and Data Workflows: Advanced Models and Efficient Processing

  • Original model branches into three optimization paths
  • Quantization reduces precision for smaller models
  • Pruning removes unnecessary parameters
  • Compilation optimizes for specific hardware
  • Each technique targets different deployment scenarios
  • Performance gains stack when combined
# Slow: One by one
texts 
=
 
[
"Review 1"
, 
"Review 2"
, 
"Review 3"
]
for text in 
texts
:
    result 
=
 clf
(
text
)
  
# 3 separate calls
# Fast: Batch processing
results 
=
 clf
(
texts,
             padding
=
True
,      
# Align lengths
             truncation
=
True
,   
# Cap at max_length
             max_length
=
128
)
    
# Prevent memory spikes
# 10x faster on GPU!
  • Individual calls waste GPU parallelism
  • Batching fills GPU compute units
  • Padding ensures uniform tensor shapes
  • Truncation prevents memory overflow
  • Max length balances speed and accuracy
from
 transformers 
import
 AutoModelForSequenceClassification
# Standard model: 400MB
try
:
    model = AutoModelForSequenceClassification.from_pretrained(
        
"bert-base-uncased"
    )
except
 Exception 
as
 e:
    
print
(
f"Error loading model: 
{e}
"
)
    
# Fallback to smaller model
    model = AutoModelForSequenceClassification.from_pretrained(
        
"distilbert-base-uncased"
    )
# Quantized model: 100MB, 4x faster!
try
:
    
from
 transformers 
import
 BitsAndBytesConfig
    quantization_config = BitsAndBytesConfig(
        load_in_8bit=
True
,
        bnb_8bit_compute_dtype=torch.float16
    )
    model_int8 = AutoModelForSequenceClassification.from_pretrained(
        
"bert-base-uncased"
,
        quantization_config=quantization_config,
        device_map=
"auto"
    )
except
 ImportError:
    
print
(
"bitsandbytes not installed. Using standard model."
)
    model_int8 = model
# For LLMs: INT4 quantization
model_int4 = AutoModelForCausalLM.from_pretrained(
    
"meta-llama/Llama-3.2-3B"
,  
# Smaller, more efficient model
    load_in_4bit=
True
,
    bnb_4bit_compute_dtype=torch.float16
)
  • INT8 quantization reduces memory by 75%
  • device_map="auto" optimally distributes layers
  • INT4 enables 7B parameter models on consumer GPUs
  • Compute dtype maintains accuracy during forward pass
  • Automatic mixed precision balances speed and quality
# 1. Choose efficient model
model_name = 
"microsoft/MiniLM-L6-H256-uncased"
  
# 6x smaller than BERT
# 2. Quantize for edge
import torch
quantized = torch.quantization.quantize_dynamic(
    model, {torch.nn.Linear}, dtype=torch.qint8
)
# 3. Export to ONNX/GGUF
model.save_pretrained(
"model_mobile"
, push_to_hub=False)
# 4. Benchmark on target device
# iPhone 14: 15ms/inference
# Raspberry Pi: 100ms/inference
  • Model selection prioritizes size over absolute accuracy.
  • Dynamic quantization adapts to input ranges.
  • Export formats enable cross-platform deployment.
  • Device-specific benchmarking ensures performance.
  • Edge inference eliminates network latency.
# Adapt Llama-3 with 0.1% of parameters
peft_config = LoraConfig(
    task_type=TaskType.CAUSAL_LM,
    r=16,  
# LoRA rank
    lora_alpha=32,
    lora_dropout=0.1,
    target_modules=[
"q_proj"
, 
"v_proj"
]
)
model = AutoModelForCausalLM.from_pretrained(
"meta-llama/Llama-3.2-3B"
)
peft_model = get_peft_model(model, peft_config)
# Only 8MB of trainable parameters instead of 3GB!
peft_model.print_trainable_parameters()
# trainable params: 2,097,152 || all params: 3,213,084,416 || trainable%: 0.065%
  • LoRA adds small trainable matrices to frozen model.
  • Rank (r=16) controls capacity vs efficiency tradeoff.
  • Target modules focus updates on attention layers.
  • Dropout prevents overfitting on small datasets.
  • 0.06% trainable parameters enable consumer GPU training.
from
 peft 
import
 LoraConfig, get_peft_model, TaskType
from
 transformers 
import
 AutoModelForCausalLM, BitsAndBytesConfig
import
 torch
# QLoRA configuration for 4-bit quantization
quantization_config = BitsAndBytesConfig(
    load_in_4bit=
True
,
    bnb_4bit_quant_type=
"nf4"
,
    bnb_4bit_compute_dtype=torch.float16,
    bnb_4bit_use_double_quant=
True
,
)
# Load Llama-3 with 4-bit quantization
model = AutoModelForCausalLM.from_pretrained(
    
"meta-llama/Llama-3-8b-hf"
,  
# Updated for 2025
    quantization_config=quantization_config,
    device_map=
"auto"
,
)
# Configure LoRA for quantized model
peft_config = LoraConfig(
    task_type=TaskType.CAUSAL_LM,
    r=
16
,
    lora_alpha=
32
,
    lora_dropout=
0.1
,
    target_modules=[
"q_proj"
, 
"v_proj"
, 
"k_proj"
, 
"o_proj"
],
    bias=
"none"
,
)
# Create PEFT model
peft_model = get_peft_model(model, peft_config)
peft_model.print_trainable_parameters()
  • 4-bit quantization reduces memory by 75% vs standard LoRA.

  • NF4 (NormalFloat4) maintains accuracy better than INT4.

  • Double quantization further compresses quantization constants.

  • Target all attention projections for comprehensive adaptation.

  • Compatible with Flash Attention 2 for speed.

  • LoRA High (13GB) 0.06% Moderate 24GB

  • QLoRA Low (4GB) 0.06% High (2x) 8GB

Customizing Pipelines and Data Workflows: Advanced Models and Efficient Processing

Customizing Pipelines and Data Workflows: Advanced Models and Efficient Processing

  • Dataset analysis identifies gaps and issues
  • Different problems require different generation approaches
  • LLMs handle text, diffusion models create images
  • Quality filters ensure synthetic data improves training
  • Validation confirms synthetic data matches real distribution
from
 transformers 
import
 pipeline
# Latest open LLM
gen = pipeline(
    
'text-generation'
,
    model=
'mistralai/Mistral-7B-Instruct-v0.2'
,
    device_map=
'auto'
)
# Generate product reviews
prompt = 
"""Generate a realistic negative product review for headphones.
Include specific details about sound quality and comfort."""
reviews = gen(
    prompt,
    max_new_tokens=
100
,
    num_return_sequences=
5
,
    temperature=
0.8
  
# More variety
)
# Quality check
for
 review 
in
 reviews:
    
if
 is_realistic(review[
'generated_text'
]):
        dataset.add_item(review)
  • Instruction-tuned models follow prompts precisely
  • Specific details in prompts improve generation quality
  • Temperature controls creativity vs consistency
  • Multiple sequences provide variety
  • Quality checking prevents model collapse
from diffusers import DiffusionPipeline
import torch
# Load latest Stable Diffusion 3.5
pipe = DiffusionPipeline.from_pretrained(
    
"stabilityai/stable-diffusion-3.5-large"
,
    torch_dtype=torch.float16,
    variant=
"fp16"
)
pipe = pipe.to(
"cuda"
)
# Generate training images
prompts = [
    
"smartphone with cracked screen, product photo"
,
    
"laptop with coffee spill damage, repair documentation"
,
    
"pristine condition vintage watch, auction listing"
]
for prompt in prompts:
    image = pipe(prompt, num_inference_steps=30).images[0]
    
# Add to training set with appropriate labels
  • SD 3.5 provides state-of-the-art photorealistic generation.
  • FP16 variant reduces memory usage by half.
  • Specific prompts create targeted training data.
  • Inference steps balance quality and speed.
  • Generated images augment rare classes effectively.
def
 
validate_synthetic_data
(
synthetic, real
):
    
"""Ensure synthetic data improves dataset"""
    
# 1. Statistical similarity
    real_stats = calculate_statistics(real)
    synth_stats = calculate_statistics(synthetic)
    
assert
 similarity(real_stats, synth_stats) > 
0.85
    
# 2. Diversity check
    
assert
 
len
(
set
(synthetic)) / 
len
(synthetic) > 
0.95
    
# 3. Quality filters
    synthetic = filter_nsfw(synthetic)
    synthetic = filter_toxic(synthetic)
    
# 4. Human review sample
    sample = random.sample(synthetic, 
100
)
    
# Send sample for manual QA
    
return
 synthetic
  • Statistical validation ensures distribution match.
  • Diversity check prevents repetitive generation.
  • Content filters remove inappropriate material.
  • Human review catches subtle quality issues.
  • Multi-stage validation prevents model degradation.
from
 transformers 
import
 pipeline
import
 pandas 
as
 pd
class
 
FairnessPipeline
:
    
"""Wrapper that adds fairness monitoring to any pipeline."""
    
def
 
__init__
(
self, base_pipeline
):
        self.pipeline = base_pipeline
        self.bias_keywords = self._load_bias_keywords()
    
def
 
__call__
(
self, texts, **kwargs
):
        
# Get predictions
        results = self.pipeline(texts, **kwargs)
        
# Analyze for bias
        bias_analysis = self._analyze_bias(texts, results)
        
# Log if bias detected
        
if
 bias_analysis[
'bias_score'
] > 
0.15
:
            self._log_bias_incident(bias_analysis)
        
return
 results, bias_analysis
    
def
 
_analyze_bias
(
self, texts, results
):
        
"""Detect potential bias in predictions."""
        bias_metrics = {
            
'demographic_parity'
: self._check_demographic_parity(results),
            
'equalised_odds'
: self._check_equalised_odds(texts, results),
            
'keyword_bias'
: self._check_keyword_bias(texts, results)
        }
        bias_score = 
sum
(bias_metrics.values()) / 
len
(bias_metrics)
        
return
 {
            
'bias_score'
: bias_score,
            
'metrics'
: bias_metrics,
            
'flagged_samples'
: self._get_flagged_samples(texts, results)
        }
# Usage
sentiment = pipeline(
'sentiment-analysis'
)
fair_sentiment = FairnessPipeline(sentiment)
results, bias_report = fair_sentiment([
    
"The doctor was excellent at her job"
,
    
"The nurse helped him with the medication"
,
    
"They are a talented engineer"
])
print
(
f"Bias score: 
{bias_report[
'bias_score'
]:
.3
f}
"
)
  • Wrapper pattern adds fairness monitoring to any pipeline
  • Bias keywords help detect problematic patterns
  • Multiple metrics capture different fairness dimensions
  • Automatic logging enables bias tracking over time
  • Non-intrusive design maintains pipeline compatibility
def
 
generate_fairness_augmented_data
(
dataset, protected_attributes
):
    
"""Generate synthetic data to improve fairness."""
    
# Analyze representation gaps
    representation = analyze_representation(dataset, protected_attributes)
    
# Generate diverse examples
    augmented_samples = []
    
for
 underrep_group 
in
 representation[
'underrepresented'
]:
        prompts = create_diverse_prompts(underrep_group)
        
for
 prompt 
in
 prompts:
            synthetic = generate_synthetic_text(
                prompt,
                num_samples=
100
,
                diversity_penalty=
0.8
  
# Encourage variety
            )
            
# Quality and bias check
            filtered = filter_biased_samples(synthetic)
            augmented_samples.extend(filtered)
    
# Validate fairness improvement
    combined_dataset = dataset + augmented_samples
    fairness_metrics = evaluate_fairness(combined_dataset)
    
return
 augmented_samples, fairness_metrics
  • Representation analysis identifies data gaps.
  • Targeted generation fills underrepresented groups.
  • Diversity penalty prevents stereotypical patterns.
  • Bias filtering removes problematic generations.
  • Fairness validation ensures actual improvement.
class
 
FairnessMonitor
:
    
"""Production fairness monitoring system."""
    
def
 
__init__
(
self, model_name
):
        self.model_name = model_name
        self.metrics_history = []
    
def
 
log_prediction
(
self, input_text, prediction, metadata=
None
):
        
"""Log individual predictions for fairness analysis."""
        self.metrics_history.append({
            
'timestamp'
: datetime.now(),
            
'input'
: input_text,
            
'prediction'
: prediction,
            
'metadata'
: metadata 
or
 {}
        })
        
# Periodic fairness check
        
if
 
len
(self.metrics_history) % 
1000
 == 
0
:
            self._run_fairness_audit()
    
def
 
_run_fairness_audit
(
self
):
        
"""Analyze recent predictions for bias patterns."""
        recent = self.metrics_history[-
1000
:]
        
# Group by protected attributes
        fairness_report = {
            
'overall_bias'
: self._calculate_overall_bias(recent),
            
'demographic_parity'
: self._check_demographic_parity(recent),
            
'false_positive_equity'
: self._check_fp_equity(recent)
        }
        
# Alert if thresholds exceeded
        
if
 fairness_report[
'overall_bias'
] > 
0.2
:
            self._send_fairness_alert(fairness_report)
        
return
 fairness_report
  • Continuous logging captures all predictions
  • Periodic audits detect bias drift
  • Multiple fairness metrics provide comprehensive view
  • Automated alerts enable rapid response
  • Historical tracking shows fairness trends
  1. Measure continuously — Bias can emerge over time.
  2. Use multiple metrics — No single metric captures all fairness aspects.
  3. Act on findings — Detection without action is insufficient.
  4. Document decisions — Fairness tradeoffs should be explicit.
  5. Involve stakeholders — Technical metrics need human context.
class
 
RetailReviewWorkflow
:
    
""
"End-to-end production workflow for retail review analysis."
""
    
def
 
__init__
(
self
):
        
# Multi-pipeline architecture
        
self
.sentiment_pipeline = pipeline(
'sentiment-analysis'
)
        
self
.category_pipeline = pipeline(
'zero-shot-classification'
,
            candidate_labels=[
'product_quality'
, 
'shipping'
, 
'customer_service'
, 
'pricing'
])
    
def
 
process_review_batch
(
self
, 
reviews:
 List[Dict]
) -> Dict[str, Any]:
        
# Business logic integration
        
for
 review 
in
 
reviews:
            
# Sentiment analysis
            sentiment = 
self
.sentiment_pipeline(review[
'text'
])
            
# Category classification
            categories = 
self
.category_pipeline(review[
'text'
])
            
# Priority scoring based on keywords
            priority_score = 
self
._calculate_priority(review[
'text'
])
            
# Generate insights
            
if
 priority_score > 
0.8
:
                
self
._trigger_alert(review)
        
return
 
self
._generate_business_insights(processed_reviews)
  • Multi-pipeline architecture handles complex analysis

  • Business logic integrates with ML predictions

  • Priority scoring identifies urgent issues

  • Alert system enables real-time response

  • Insight generation provides actionable intelligence

  • Multi-pipeline orchestration (sentiment + classification)

  • Business rule integration (priority scoring)

  • Real-time alert system for urgent issues

  • Automated insight generation for decision-making

@contextmanager
def
 
track_memory
(
device: 
str
 = 
"cuda"
):
    
"""Context manager for GPU memory profiling."""
    
if
 device == 
"cuda"
 
and
 torch.cuda.is_available():
        torch.cuda.synchronize()
        start_memory = torch.cuda.memory_allocated()
        
yield
        torch.cuda.synchronize()
        end_memory = torch.cuda.memory_allocated()
        
print
(
f"Memory used: 
{format_size(end_memory - start_memory)}
"
)
  • Context manager ensures proper cleanup
  • CUDA synchronization captures accurate measurements
  • Memory difference shows actual usage
  • Automatic formatting improves readability
def 
get_optimal_device
() -> torch
.device
:
    
""
"Automatically detect best available device."
""
    if torch.cuda.
is_available
():
        return torch.
device
(
"cuda"
)
    elif torch.backends.mps.
is_available
():  # Apple Silicon
        return torch.
device
(
"mps"
)
    else:
        return torch.
device
(
"cpu"
)
  • CUDA check identifies NVIDIA GPUs
  • MPS check finds Apple Silicon acceleration
  • CPU fallback ensures universal compatibility
  • Single function handles all platforms
def
 
generate_model_card
(
model, dataset_info, performance_metrics
):
    
"""Generate comprehensive model documentation."""
    
return
 {
        
"model_details"
: {
            
"architecture"
: model.config.model_type,
            
"parameters"
: count_parameters(model),
            
"training_data"
: dataset_info
        },
        
"performance"
: performance_metrics,
        
"limitations"
: analyze_model_limitations(model),
        
"ethical_considerations"
: generate_bias_report(model, dataset_info)
    }
  • Architecture details enable reproduction
  • Parameter count indicates computational requirements
  • Performance metrics set expectations
  • Limitations section prevents misuse
  • Ethical considerations ensure responsible deployment
class
 
Config
:
    
"""Centralized configuration with environment fallbacks."""
    
# Device configuration with automatic detection
    DEVICE = get_optimal_device()
    
# Model configurations with env overrides
    DEFAULT_SENTIMENT_MODEL = os.getenv(
        
"SENTIMENT_MODEL"
,
        
"distilbert-base-uncased-finetuned-sst-2-english"
    )
    
# Performance settings
    BATCH_SIZE = 
int
(os.getenv(
"BATCH_SIZE"
, 
"32"
))
    ENABLE_FLASH_ATTENTION = os.getenv(
"ENABLE_FLASH_ATTENTION"
, 
"true"
).lower() == 
"true"
    
# Directory management with auto-creation
    DATA_PATH = Path(os.getenv(
"DATA_PATH"
, 
"./data"
))
    DATA_PATH.mkdir(exist_ok=
True
)
  • Environment variables enable deployment flexibility
  • Fallback values ensure sensible defaults
  • Type conversion prevents configuration errors
  • Automatic directory creation prevents runtime failures
  • Single source of truth simplifies maintenance
def
 
validate_synthetic_data
(
synthetic_samples, real_samples
):
    
"""Comprehensive quality validation pipeline."""
    metrics = {
        
"length_similarity"
: calculate_length_distribution_similarity(
            synthetic_samples, real_samples
        ),
        
"vocabulary_overlap"
: calculate_vocabulary_overlap(
            synthetic_samples, real_samples
        ),
        
"diversity_score"
: calculate_diversity(synthetic_samples),
        
"quality_flags"
: check_quality_issues(synthetic_samples)
    }
    
# Filter based on thresholds
    filtered_samples = []
    
for
 sample 
in
 synthetic_samples:
        
if
 
all
([
            
not
 has_repetition(sample),
            
len
(sample.split()) > MIN_LENGTH,
            
not
 is_truncated(sample),
            passes_profanity_check(sample)
        ]):
            filtered_samples.append(sample)
    
return
 filtered_samples, metrics
  • Length similarity ensures realistic text generation
  • Vocabulary overlap confirms domain relevance
  • Diversity score prevents repetitive patterns
  • Quality checks filter problematic samples
  • Multi-stage filtering maintains high standards
def
 
benchmark_optimization_techniques
(
model_name: 
str
):
    
"""Compare all optimization techniques systematically."""
    results = {}
    
# Baseline
    baseline_model = load_model(model_name)
    results[
"baseline"
] = benchmark_model(baseline_model)
    
# Quantization techniques
    
for
 technique 
in
 [
"dynamic_int8"
, 
"static_int8"
, 
"int4_nf4"
]:
        quantized = apply_quantization(baseline_model, technique)
        results[technique] = benchmark_model(quantized)
    
# Batching optimization
    
for
 batch_size 
in
 [
1
, 
8
, 
32
, 
64
]:
        results[
f"batch_
{batch_size}
"
] = benchmark_batching(
            baseline_model, batch_size
        )
    
# Generate comparison report
    
return
 generate_benchmark_report(results)
  • Baseline measurement provides comparison point

  • Multiple quantization techniques reveal tradeoffs

  • Batch size sweep finds optimal throughput

  • Automated reporting simplifies decision-making

  • Systematic approach ensures fair comparison

  • Inference latency (ms)

  • Throughput (samples/sec)

  • Memory usage (GPU/CPU)

  • Model size on disk

  • Accuracy preservation

Customizing Pipelines and Data Workflows: Advanced Models and Efficient Processing

  • Root celebrates your transformation to workflow architect
  • Custom pipelines branch shows flexibility gained
  • Data mastery indicates scale capabilities
  • Optimization arsenal lists cost-saving techniques
  • Synthetic superpowers expand data possibilities
  • Production ready confirms real-world applicability
# You can now build THIS
custom_pipeline = compose_pipelines(
    preprocessing=custom_cleaner,
    main_model=sentiment_analyzer,
    post_processing=business_filter,
    output_format=company_standard
)
# Handle millions without breaking a sweat
massive_dataset
 = load_dataset(
"your_data"
, streaming=
True
)
processed
 = massive_dataset.map(transform, batched=
True
)
# 75% cost reduction, same accuracy
optimized_model = quantize_and_compile(
    model,
    target=
"int4"
,
    hardware=
"mobile"
)
# Fill gaps, boost fairness
augmented_data 
=
 generate_synthetic
(
    minority_class
=
"rare_defects"
,
    count
=
10000
,
    validate
=
True
)
  • Pipeline Usage pipeline() only Custom components, composition 10x flexibility
  • Data Handling Memory limits Streaming, parallel processing 1000x scale
  • Inference Cost $1000/month $250/month (INT8+batching) 75% savings
  • Model Size 400MB BERT 50MB MiniLM INT4 Deploy anywhere
  • Training Data Real only Real + validated synthetic 2x performance

Customizing Pipelines and Data Workflows: Advanced Models and Efficient Processing

  • Article 11: Advanced dataset curation techniques
  • Article 12: LoRA/QLoRA for efficient large model adaptation
  • Article 14: Comprehensive evaluation strategies
  • Article 16: Responsible AI and fairness
from
 huggingface_hub 
import
 create_inference_endpoint
import
 os
# Deploy custom model
endpoint = create_inference_endpoint(
    name=
"retail-sentiment-analyzer"
,
    repository=
"your-username/custom-sentiment-model"
,
    framework=
"pytorch"
,
    task=
"text-classification"
,
    accelerator=
"gpu"
,
    instance_size=
"x1"
,  
# 1 NVIDIA A10G
    min_replica=
1
,
    max_replica=
5
,
    scale_to_zero_timeout=
15
,  
# Minutes before scaling to 0
    env={
        
"MAX_BATCH_SIZE"
: 
"32"
,
        
"MODEL_REVISION"
: 
"main"
    }
)
print
(
f"Endpoint URL: 
{endpoint.url}
"
)
print
(
f"Status: 
{endpoint.status}
"
)
  • Repository specifies your model on HF Hub.
  • Accelerator choice impacts cost and performance.
  • Auto-scaling handles traffic spikes efficiently.
  • Scale-to-zero saves costs during idle periods.
  • Environment variables configure runtime behavior.
# handler.py in your model repository
from typing import Dict, List, Any
import torch
class
 
EndpointHandler
:
    
def
 
__init__
(
self
, path=
""
):
        
# Load multiple models for complex workflow
        
self
.sentiment_model = AutoModelForSequenceClassification.from_pretrained(
            path + 
"/sentiment"
        )
        
self
.ner_model = AutoModelForTokenClassification.from_pretrained(
            path + 
"/ner"
        )
        
self
.tokenizer = AutoTokenizer.from_pretrained(path)
    
def
 
__call__
(
self
, 
data:
 Dict[str, Any]
) -> List[Dict[str, Any]]:
        
""
"
        Process incoming requests with error handling.
        "
""
        
try:
            inputs = data.pop(
"inputs"
, data)
            parameters = data.pop(
"parameters"
, {})
            
# Batch processing
            
if
 isinstance(inputs, list):
                
return
 [
self
._process_single(text, parameters) 
for
 text 
in
 inputs]
            
else:
                
return
 
self
._process_single(inputs, parameters)
        except Exception as 
e:
            
return
 {
"error"
: str(e), 
"error_type"
: type(e).__name__}
    
def
 
_process_single
(
self
, 
text:
 str, 
params:
 Dict
) -> 
Dict:
        
# Tokenize
        encoded = 
self
.tokenizer(
            text,
            truncation=True,
            padding=True,
            return_tensors=
"pt"
        )
        
# Run models
        with torch.no_grad():
            sentiment_output = 
self
.sentiment_model(**encoded)
            ner_output = 
self
.ner_model(**encoded)
        
# Process results
        sentiment = torch.nn.functional.softmax(sentiment_output.logits, dim=-
1
)
        entities = 
self
._extract_entities(ner_output, encoded)
        
return
 {
            
"sentiment"
: {
                
"label"
: 
self
.sentiment_model.config.id2label[sentiment.argmax().item()],
                
"score"
: sentiment.max().item()
            },
            
"entities"
: entities,
            
"processing_time_ms"
: params.get(
"return_timing"
, False)
        }
  • Custom handler enables multi-model workflows
  • Error handling ensures robust production behavior
  • Batch support improves throughput efficiency
  • Parameter handling allows runtime configuration
  • Structured outputs facilitate downstream integration
# 1. Serverless for Variable Traffic
serverless_endpoint = create_inference_endpoint(
    name=
"sentiment-serverless"
,
    repository=
"your-username/sentiment-model"
,
    
type
=
"serverless"
,  
# Pay per request
    framework=
"pytorch"
,
    region=
"us-east-1"
)
# 2. Dedicated for Consistent Load
dedicated_endpoint = create_inference_endpoint(
    name=
"sentiment-dedicated"
,
    repository=
"your-username/sentiment-model"
,
    
type
=
"protected"
,  
# Always running
    accelerator=
"cpu"
,  
# Cheaper for small models
    instance_size=
"small"
,
    min_replica=2,  
# High availability
)
# 3. Private for Sensitive Data
private_endpoint = create_inference_endpoint(
    name=
"sentiment-private"
,
    repository=
"your-username/sentiment-model"
,
    
type
=
"private"
,  
# VPC deployment
    accelerator=
"gpu"
,
    instance_size=
"medium"
,
    custom_image={
        
"url"
: 
"your-ecr-repo/custom-image:latest"
,
        
"health_route"
: 
"/health"
,
        
"env"
: {
"ENABLE_ENCRYPTION"
: 
"true"
}
    }
)
  • Serverless eliminates idle costs for sporadic traffic
  • CPU instances suffice for smaller models
  • Dedicated endpoints ensure consistent latency
  • Private endpoints meet compliance requirements
  • Custom images enable specialized configurations
from
 huggingface_hub 
import
 get_inference_endpoint
import
 time
# Monitor endpoint metrics
endpoint = get_inference_endpoint(
"retail-sentiment-analyzer"
)
# Get usage statistics
metrics = endpoint.get_metrics(
    start_time=
int
(time.time() - 
3600
),  
# Last hour
    end_time=
int
(time.time())
)
print
(
f"Average latency: 
{metrics[
'avg_latency_ms'
]}
ms"
)
print
(
f"Request count: 
{metrics[
'request_count'
]}
"
)
print
(
f"Error rate: 
{metrics[
'error_rate'
]:
.2
%}
"
)
# Auto-scale based on metrics
if
 metrics[
'avg_latency_ms'
] > 
200
:
    endpoint.update(min_replica=endpoint.min_replica + 
1
)
    
print
(
"Scaled up due to high latency"
)
# A/B testing with multiple endpoints
endpoints = {
    
"v1"
: get_inference_endpoint(
"sentiment-v1"
),
    
"v2"
: get_inference_endpoint(
"sentiment-v2-optimized"
)
}
# Route traffic based on performance
def
 
smart_predict
(
text
):
    
# Choose endpoint with lowest latency
    latencies = {
        name: ep.get_metrics()[
'avg_latency_ms'
]
        
for
 name, ep 
in
 endpoints.items()
    }
    best_endpoint = 
min
(latencies, key=latencies.get)
    
return
 endpoints[best_endpoint].predict({
"inputs"
: text})
  • Real-time metrics enable data-driven decisions
  • Automatic scaling maintains performance SLAs
  • A/B testing validates optimization impact
  • Smart routing maximizes efficiency
  • Continuous monitoring prevents degradation
  1. Model Versioning: Tag models with semantic versions
model.push_to_hub(
"sentiment-model"
, revision=
"v2.1.0"
)
# 
10%
 traffic 
to
 
new
 version endpoint.update(revision=
"v2.1.0"
, canary_traffic=
0.1
)
def
 
validate_input
(
text
):
    
if
 
not
 text 
or
 
len
(text) > 
10000
:
        
raise
 ValueError(
"Invalid input length"
)
    
return
 text
from
 functools 
import
 lru_cache
@lru_cache(
maxsize=
10000
)
def
 
cached_predict
(
text_hash
):
    
return
 endpoint.predict({
"inputs"
: text})
def
 
health_check
():
    
try
:
        result = endpoint.predict({
"inputs"
: 
"test"
})
        
return
 result 
is
 
not
 
None
    
except
:
        
return
 
False
  • 99.9% uptime with auto-scaling

  • 75% cost reduction vs self-managed infrastructure

  • 50ms average latency globally

  • Automatic model updates via CI/CD

  • Python 3.12.10 (managed via pyenv)

  • Poetry for dependency management

  • Go Task for build automation

  • macOS (Apple Silicon), Linux, or Windows

  • CUDA GPU (optional for NVIDIA users, required for Flash Attention)

  • MPS support for Apple Silicon

  • (Optional) Hugging Face account for accessing gated models

  • (Optional) bitsandbytes for INT4/INT8 quantization

  1. Clone the repository:
git 
clone
 [email protected]:RichardHightower/art_hug_08.git
cd
 art_hug_08
task setup
  • Install Python 3.12.10 if needed
  • Set up Poetry environment
  • Install all dependencies with 2025 versions:
  • transformers ^4.53.0
  • datasets ^3.0.0
  • diffusers ^0.31.0
  • peft ^1.0.0
  • bitsandbytes ^0.46.0
  • evaluate ^0.4.0
cp
 .env.example .
env
# Edit .env with your configuration (API keys, etc.)
poetry run python test_environment.
py
task run
# Custom pipeline examples with modern models
poetry run python -m 
src
.custom_pipelines
# Efficient data handling demonstrations
poetry run python -m 
src
.data_workflows
# Optimization benchmarks with quantization
poetry run python -m 
src
.optimization
# QLoRA demonstration
poetry run python -m 
src
.peft_lora
 
--qlora
# Production workflow example
poetry run python -m 
src
.production_workflows
# Run 
all
 demonstrations
poetry run python -m 
src
.main
 
--demo
 
all
poetry run jupyter notebook notebooks/tutorial.ipynb
or 
task notebook
  • Modern model usage (Phi-2, RoBERTa variants)
  • BitsAndBytesConfig quantization examples
  • QLoRA configuration demonstrations
  • Flash Attention 2 benchmarks
  • Ethical AI and bias detection
art_hug_08/
├── src/
│   ├── custom_pipelines.py      
# Pipeline customization with _sanitize_parameters
│   ├── data_workflows.py        
# Efficient data handling demonstrations
│   ├── optimization.py          
# Model optimization techniques
│   ├── synthetic_data.py        
# Data generation with toxicity filtering
│   ├── production_workflows.py  
# End-to-end retail example
│   ├── edge_deployment.py       
# ONNX export and edge deployment
│   ├── peft_lora.py            
# PEFT/LoRA/QLoRA fine-tuning examples
│   ├── flash_attention.py       
# Flash Attention 2 demonstrations
│   ├── advanced_quantization.py 
# INT4/INT8 with BitsAndBytesConfig
│   ├── diffusion_generation.py  
# Stable Diffusion 3.5 for images
│   ├── config.py               
# Configuration with modern defaults
│   ├── utils.py                
# Helpers with toxicity checking
│   └── main.py                 
# Main demo runner
├── notebooks/
│   ├── tutorial.ipynb          
# Complete Chapter 8 tutorial (2025 updated)
│   ├── pipeline_exploration.ipynb
│   └── optimization_benchmarks.ipynb
├── docs/
│   ├── art_08.md               
# Original chapter
│   └── art_08i.md              
# Improved chapter with 2025 updates
├── tests/
│   └── test_basic.py           
# Unit tests
└── examples/
    └── retail_workflow.py      
# Real-world retail example
  • Subclass and extend standard pipelines with _sanitize_parameters

  • Chain multiple models together (sentiment + NER)

  • Add business logic, preprocessing, and error handling

  • Modern models: cardiffnlp/twitter-roberta-base-sentiment-latest

  • Stream datasets without memory limits

  • Parallel transformations with map()

  • Smart batching for 10x speedup

  • Datasets v3.0+ features

  • INT8/INT4 quantization with BitsAndBytesConfig

  • QLoRA for memory-efficient fine-tuning (75% reduction)

  • Flash Attention 2 for 2–4x GPU speedup

  • Edge deployment with ONNX

  • PEFT/LoRA with modern target modules

  • LLM-based text generation with microsoft/phi-2

  • Stable Diffusion 3.5 Turbo for images

  • Quality validation with toxicity filtering

  • Bias detection in generated content

  • Toxicity detection using evaluate library

  • Bias checking across demographic groups

  • Fairness monitoring in production pipelines

  • Content filtering for safe deployments

task --list        
# Show all available tasks
task setup         
# Set up the development environment
task run           
# Run the main demonstration
task 
test
          
# Run tests
task format        
# Format code with black and isort
task lint          
# Run linting checks
task clean         
# Clean cache and temporary files
task qlora         
# Run QLoRA demonstration (NEW)
task bias-check    
# Run bias validation on synthetic data (NEW)
task flash         
# Run Flash Attention demo
task quantization  
# Run advanced quantization demo
  1. Hugging Faces Transformers and the AI Revolution (Article 1)
  2. Hugging Faces: Why Language is Hard for AI? How Transformers Changed that (Article 2)
  3. Hands-On with Hugging Face: Building Your AI Workspace (Article 3)
  4. Inside the Transformer: Architecture and Attention Demystified (Article 4)
  5. Tokenization: The Gateway to Transformer Understanding (Article 5)
  6. Prompt Engineering (Article 6)
  7. Extending Transformers Beyond Language (Article 7)
#Customizing #Pipelines #Data #Workflows #Advanced #Models #Efficient #Processing