FootyCollect uses Celery for asynchronous task processing, including photo optimization, external image downloads, and periodic cleanup tasks.
Overview
Celery handles resource-intensive operations asynchronously:
Photo optimization (AVIF conversion)
External image downloads from Football Kit Archive
Orphaned photo cleanup
Periodic maintenance tasks
Celery requires Redis as the message broker. Make sure Redis is running before starting Celery workers.
Celery Configuration
config/celery_app.py
The Celery application is configured in config/celery_app.py:
import os
from celery import Celery
# Set default Django settings module
os.environ.setdefault( "DJANGO_SETTINGS_MODULE" , "config.settings.local" )
app = Celery( "footycollect" )
# Load configuration from Django settings with CELERY_ prefix
app.config_from_object( "django.conf:settings" , namespace = "CELERY" )
# Autodiscover tasks from all registered Django apps
app.autodiscover_tasks()
Celery automatically discovers tasks from tasks.py files in all installed Django apps.
Django Settings
Celery settings in config/settings/base.py:
# Celery Configuration
CELERY_BROKER_URL = env( "CELERY_BROKER_URL" , default = "redis://localhost:6379/0" )
CELERY_RESULT_BACKEND = CELERY_BROKER_URL
CELERY_ACCEPT_CONTENT = [ "json" ]
CELERY_TASK_SERIALIZER = "json"
CELERY_RESULT_SERIALIZER = "json"
CELERY_TIMEZONE = TIME_ZONE
Background Tasks
Photo Processing Tasks
Defined in footycollect/collection/tasks.py:
Converts uploaded photos to AVIF format for optimal compression: @shared_task
def process_photo_to_avif ( photo_id ):
"""Convert photo to AVIF format."""
try :
photo = Photo.objects.get( pk = photo_id)
except Photo.DoesNotExist:
logger.warning( "Photo %s does not exist" , photo_id)
return
if not photo.image:
logger.warning( "Photo %s has no image to process" , photo_id)
return
optimized = optimize_image(photo.image)
if not optimized:
logger.warning( "Optimization returned no data for photo %s " , photo_id)
return
photo.image_avif.save(optimized.name, optimized, save = False )
photo.save( update_fields = [ "image_avif" ])
logger.info( "Photo %s AVIF processing completed" , photo_id)
Usage:
Triggered automatically when photos are uploaded
Optimizes image file size while maintaining quality
Updates image_avif field on Photo model
check_item_photo_processing
Checks if all photos for an item have been processed: @shared_task
def check_item_photo_processing ( item_id ):
"""Check if all photos for an item are processed."""
try :
base_item = BaseItem.objects.get( pk = item_id)
photos = base_item.photos.all()
if not photos.exists():
base_item.is_processing_photos = False
base_item.save( update_fields = [ "is_processing_photos" ])
return
# Check if all photos have AVIF versions
all_processed = all (
photo.image_avif and photo.image_avif.name
for photo in photos
if photo.image
)
if all_processed:
base_item.is_processing_photos = False
base_item.save( update_fields = [ "is_processing_photos" ])
logger.info( "All photos processed for item %s " , item_id)
except BaseItem.DoesNotExist:
logger.warning( "Item %s does not exist" , item_id)
Usage:
Monitors photo processing status
Updates is_processing_photos flag when complete
Used to show processing indicators in UI
External Image Download
download_external_image_and_attach
Downloads images from Football Kit Archive and attaches them to items: @shared_task
def download_external_image_and_attach ( app_label , model_name , object_id , image_url , order = 0 ):
"""Download external image and attach to item."""
try :
# Validate URL is from allowed hosts
if not _is_allowed_image_url(image_url):
logger.warning( "Blocked download from untrusted host: %s " , image_url)
raise ValueError ( "URL from untrusted source" )
# Download image
response = requests.get(
image_url,
timeout = 30 ,
stream = True ,
headers = { "Referer" : "https://www.footballkitarchive.com/" },
)
response.raise_for_status()
# Save to temporary file
img_temp = tempfile.NamedTemporaryFile( delete = False )
for chunk in response.iter_content( chunk_size = 1024 ):
if chunk:
img_temp.write(chunk)
# Create Photo object
model = ContentType.objects.get_by_natural_key(app_label, model_name).model_class()
instance = model.objects.get( pk = object_id)
photo = Photo( content_object = instance, user = instance.user)
photo.image.save(image_name, File(img_temp), save = False )
photo.order = order
photo.save()
# Trigger photo processing
check_item_photo_processing.apply_async( args = [object_id], countdown = 3 )
return photo.id
except Exception :
logger.exception( "Failed to download image: %s " , image_url)
raise
Features:
Downloads images from Football Kit Archive
Validates allowed image hosts (security)
Supports rotating proxies (if configured)
Automatically triggers photo processing
Security:
Only downloads from whitelisted hosts
Configurable via ALLOWED_EXTERNAL_IMAGE_HOSTS setting
Cleanup Tasks
Removes incomplete photo uploads (older than 24 hours): @shared_task
def cleanup_orphaned_photos ():
"""Clean up orphaned photos (incomplete uploads)."""
try :
logger.info( "Starting orphaned photos cleanup task" )
call_command(
"cleanup_orphaned_photos" ,
"--incomplete-only" ,
"--older-than-hours=24" ,
verbosity = 1 ,
)
logger.info( "Orphaned photos cleanup completed" )
return "Orphaned photos cleanup completed"
except Exception :
logger.exception( "Error in orphaned photos cleanup task" )
raise
Schedule: Runs daily (configured in django-celery-beat)
cleanup_old_incomplete_photos
Removes old incomplete photos (older than 7 days): @shared_task
def cleanup_old_incomplete_photos ():
"""Clean up old incomplete photos (7+ days)."""
try :
call_command(
"cleanup_orphaned_photos" ,
"--incomplete-only" ,
"--older-than-hours=168" ,
verbosity = 1 ,
)
return "Old incomplete photos cleanup completed"
except Exception :
logger.exception( "Error in old incomplete photos cleanup" )
raise
Schedule: Runs weekly
Running Celery
Development
Start Redis
# Using Docker
docker run -d -p 6379:6379 redis:7-alpine
# Or using system Redis
redis-server
Start Celery Worker
celery -A config.celery_app worker -l info
This starts a worker that processes tasks from the queue.
Start Celery Beat (Optional)
celery -A config.celery_app beat -l info
This starts the scheduler for periodic tasks.
Worker only
Worker with Beat
Multiple workers
celery -A config.celery_app worker -l info
Docker Compose
Using Docker Compose (recommended):
services :
celeryworker :
build :
context : .
dockerfile : ./compose/local/django/Dockerfile
command : /start-celeryworker
depends_on :
- redis
- postgres
environment :
- CELERY_BROKER_URL=redis://redis:6379/0
celerybeat :
build :
context : .
dockerfile : ./compose/local/django/Dockerfile
command : /start-celerybeat
depends_on :
- redis
- postgres
environment :
- CELERY_BROKER_URL=redis://redis:6379/0
docker compose -f docker-compose.local.yml up celeryworker celerybeat
Periodic Tasks (django-celery-beat)
FootyCollect uses django-celery-beat for scheduling periodic tasks.
Setup Schedule
Run migrations
python manage.py migrate django_celery_beat
Setup default schedule
python manage.py setup_beat_schedule
This creates default periodic tasks:
Orphaned photos cleanup (daily)
Old incomplete photos cleanup (weekly)
Customize in Django Admin
Navigate to Django Admin → django_celery_beat → Periodic tasks Adjust task intervals as needed.
Managing Periodic Tasks
In Django Admin, you can:
Create new periodic tasks
Edit task schedules (cron/interval)
Enable/disable tasks
View task execution history
Use the Django Admin interface to manage periodic tasks without code changes.
Monitoring
Flower (Task Monitor)
Flower provides a web-based UI for monitoring Celery:
# Install Flower
pip install flower
# Start Flower
celery -A config.celery_app flower
Access at http://localhost:5555
Features:
Real-time task monitoring
Worker status
Task history
Task statistics
Logs
Celery logs task execution:
import logging
logger = logging.getLogger( __name__ )
@shared_task
def my_task ():
logger.info( "Task started" )
# ... task logic
logger.info( "Task completed" )
Task Best Practices
Tasks should be idempotent (safe to run multiple times): @shared_task
def process_photo_to_avif ( photo_id ):
# Check if already processed
photo = Photo.objects.get( pk = photo_id)
if photo.image_avif:
logger.info( "Photo already processed" )
return
# Process photo...
Handle errors gracefully: @shared_task ( bind = True , max_retries = 3 )
def download_image ( self , url ):
try :
response = requests.get(url, timeout = 30 )
response.raise_for_status()
except requests.RequestException as exc:
# Retry with exponential backoff
raise self .retry( exc = exc, countdown = 60 * ( 2 ** self .request.retries))
Set task time limits: @shared_task ( time_limit = 300 , soft_time_limit = 240 )
def long_running_task ():
# Task will be terminated after 300 seconds
# Soft limit sends exception after 240 seconds
pass
Troubleshooting
Check:
Redis is running: redis-cli ping
Celery worker is running
Task is registered: celery -A config.celery_app inspect registered
No errors in Celery logs
Solutions:
Increase time_limit setting
Break task into smaller subtasks
Check for blocking operations
Solutions:
Reduce worker concurrency: --concurrency=2
Enable worker max tasks: --max-tasks-per-child=1000
Monitor memory usage
Next Steps