fastify-ximagepipeline
fastify-ximagepipeline
Fastify plugin for async image upload processing — EXIF stripping, optional content moderation, WebP variant generation, blurhash placeholders, and Cloudflare R2/S3 storage. Uploads land in a staging prefix; a background worker polls a Prisma job queue and processes each image asynchronously.
Installation
npm install @xenterprises/fastify-ximagepipeline
# Peer deps
npm install @fastify/multipart fastify
Register @fastify/multipart before this plugin.
Quick Start
import Fastify from "fastify";
import multipart from "@fastify/multipart";
import xImagePipeline from "@xenterprises/fastify-ximagepipeline";
const fastify = Fastify({ logger: true });
await fastify.register(multipart);
await fastify.register(xImagePipeline, {
r2: {
endpoint: process.env.R2_ENDPOINT,
accessKeyId: process.env.R2_ACCESS_KEY_ID,
secretAccessKey: process.env.R2_SECRET_ACCESS_KEY,
bucket: process.env.R2_BUCKET,
},
db: prisma, // Prisma client with MediaQueue + Media models
});
await fastify.listen({ port: 3000 });
# Upload an image
curl -F "file=@photo.jpg" -F "sourceType=avatar" -F "sourceId=user-123" \
http://localhost:3000/image-pipeline/upload
Options
Plugin Options
| Name | Type | Default | Required | Description |
|---|---|---|---|---|
r2 | object | — | Yes | R2/S3 connection config |
db | PrismaClient | — | Yes | Prisma client with mediaQueue and media models |
moderation | object | null | No | Content moderation config |
variants | object | 6 sizes | No | Variant dimension specs (xs/sm/md/lg/xl/2xl) |
sourceTypes | object | 5 types | No | Per-source-type processing config |
worker | object | enabled | No | Background worker settings |
stagingPath | string | "staging" | No | R2 prefix for staging uploads |
mediaPath | string | "media" | No | R2 prefix for processed media |
originalsPath | string | "originals" | No | R2 prefix for original files |
maxFileSize | number | 52428800 | No | Max upload size in bytes (50 MB) |
allowedMimeTypes | string[] | jpeg/png/webp/gif | No | Accepted MIME types |
r2 Config
| Field | Type | Required | Description |
|---|---|---|---|
endpoint | string | Yes | R2 or S3-compatible endpoint URL |
accessKeyId | string | Yes | Access key |
secretAccessKey | string | Yes | Secret key |
bucket | string | Yes | Bucket name |
region | string | No | Region (default "auto" for R2) |
worker Config
| Field | Type | Default | Description |
|---|---|---|---|
enabled | boolean | true | Enable/disable background processing |
pollInterval | number | 5000 | Poll interval in ms |
maxAttempts | number | 3 | Max retry attempts per job |
lockTimeout | number | 300000 | Stale lock timeout in ms (5 min) |
failOnError | boolean | true | Throw if worker fails to start |
moderation Config
| Field | Type | Description |
|---|---|---|
handler | async (buffer, config) => { passed, flags, confidence } | Custom moderation function — call AWS Rekognition, Google Vision, etc. If omitted, all images pass. |
Default Variants
| Name | Width | Height | Fit | Use Case |
|---|---|---|---|---|
xs | 80 | 80 | cover | Tiny thumbnails, avatars |
sm | 200 | 200 | cover | Thumbnails, lists |
md | 600 | auto | inside | Content images, cards |
lg | 1200 | auto | inside | Detail views |
xl | 1920 | auto | inside | Full-width banners |
2xl | 2560 | auto | inside | Retina/4K displays |
Default Source Types
| Type | Variants | Quality | Store Original |
|---|---|---|---|
avatar | xs, sm | 85 | Yes |
member_photo | xs, sm, md | 85 | Yes |
gallery | md, lg, xl | 85 | No |
hero | lg, xl, 2xl | 80 | No |
content | md, lg | 85 | Yes |
Routes & Methods
Routes registered automatically at /image-pipeline/:
- POST /image-pipeline/upload — Upload a file for async processing; returns a
jobIdimmediately. - GET /image-pipeline/status/:jobId — Poll job status; returns variant URLs and blurhash on COMPLETE.
Decorator methods on fastify.xImagePipeline:
- getStatus(jobId) — Fetch a job record with its media relation from the database.
- deleteMedia(mediaId) — Delete a media record and all associated R2 objects.
- listMedia(sourceType, sourceId) — List all media records for a given source.
- getVariantPresets / getSourceTypes / getVariants — Inspect active variant and source type configuration.
Exported Utilities
@xenterprises/fastify-ximagepipeline/image
import { stripExif, generateVariants, generateBlurhash } from "@xenterprises/fastify-ximagepipeline/image";
| Function | Description |
|---|---|
stripExif(buffer) | Remove EXIF metadata, preserve orientation |
getImageMetadata(buffer) | Extract width, height, format, colorspace, hasAlpha, density |
compressToJpeg(buffer, quality?) | Compress to JPEG (mozjpeg, default quality 85) |
generateVariants(buffer, specs, sourceType, quality?) | Generate WebP variants per spec |
generateBlurhash(buffer) | Create 4×3 component blurhash string |
calculateFitDimensions(srcW, srcH, maxW, maxH) | Aspect-ratio-preserving resize dimensions |
getAspectRatio(width, height) | Ratio string e.g. "16:9" |
validateImage(buffer, options?) | Validate dimensions and format |
processImage(buffer, sourceType, config) | Full pipeline: strip → metadata → variants → blurhash |
@xenterprises/fastify-ximagepipeline/s3
import { uploadToS3, deleteFromS3 } from "@xenterprises/fastify-ximagepipeline/s3";
| Function | Description |
|---|---|
initializeS3Client(config) | Create S3Client for R2/AWS |
uploadToS3(client, bucket, key, buffer, options?) | Upload with metadata and cache headers |
downloadFromS3(client, bucket, key) | Download buffer |
deleteFromS3(client, bucket, key) | Delete single object |
listFromS3(client, bucket, prefix) | List objects by prefix |
getSignedUrlForS3(client, bucket, key, expiresIn?) | Generate signed URL (default 1 h) |
getPublicUrl(r2Config, key) | Generate public URL |
batchDeleteFromS3(client, bucket, prefix) | Delete up to 1000 objects by prefix |
Error Reference
All errors use the [xImagePipeline] prefix.
| Error | Cause |
|---|---|
R2 configuration is required | Missing r2 option at registration |
Database instance (Prisma client) is required | Missing db option at registration |
R2 configuration must include: endpoint, accessKeyId, secretAccessKey, bucket | Incomplete R2 config |
No file provided | Upload request has no file |
File type {mime} not allowed | MIME type not in allowedMimeTypes |
sourceType and sourceId are required | Missing form fields on upload |
Unknown sourceType: {type} | sourceType not in variant presets |
File too large. Maximum size: {n}MB | File exceeds maxFileSize |
Failed to upload file to storage | R2/S3 upload error |
Failed to create processing job | Database error during job creation |
Media not found: {id} | deleteMedia called with invalid media ID |
Environment Variables
| Variable | Required | Description |
|---|---|---|
R2_ENDPOINT | Yes | R2 or S3-compatible endpoint URL |
R2_ACCESS_KEY_ID | Yes | Storage access key |
R2_SECRET_ACCESS_KEY | Yes | Storage secret key |
R2_BUCKET | Yes | Storage bucket name |
DATABASE_URL | Yes | Prisma database connection string |
Database Schema
Requires two Prisma models (see SCHEMA.prisma in the package):
- MediaQueue — job queue: PENDING → PROCESSING → COMPLETE / REJECTED / FAILED
- Media — processed records with variant URLs, dimensions, blurhash, and source metadata
How It Works
Upload: Client sends multipart POST. The file is validated (type, size, sourceType) and uploaded to the R2 staging prefix. A MediaQueue row is created with PENDING status; the route returns a jobId immediately.
Processing: A background worker polls MediaQueue for PENDING jobs using pessimistic locking. For each job it downloads from staging, strips EXIF, runs content moderation (if a handler is configured), generates WebP variants per sourceType config, produces a blurhash placeholder, uploads variants and optional originals to R2, creates a Media record, marks the job COMPLETE, and deletes the staging file.
Retrieval: The client polls GET /image-pipeline/status/:jobId. On COMPLETE the response includes all variant URLs, blurhash, and dimensions.
Retry: Failed jobs are retried up to maxAttempts. Stale locks (worker crash) are automatically recovered after lockTimeout.
AI Context
package: "@xenterprises/fastify-ximagepipeline"
type: fastify-plugin
use-when: Async image upload pipeline — EXIF stripping, WebP variant generation, blurhash, content moderation, R2/S3 storage with background job queue
decorator: fastify.xImagePipeline (getStatus, deleteMedia, listMedia, getVariantPresets, getSourceTypes, getVariants)
routes: POST /image-pipeline/upload, GET /image-pipeline/status/:jobId
env: R2_ENDPOINT, R2_ACCESS_KEY_ID, R2_SECRET_ACCESS_KEY, R2_BUCKET, DATABASE_URL
requires: Prisma client with MediaQueue and Media models; @fastify/multipart registered before this plugin
utility-exports: "@xenterprises/fastify-ximagepipeline/image" (image processing), "@xenterprises/fastify-ximagepipeline/s3" (S3 helpers)
