HH/config/celery_scheduler.py
2026-03-15 23:48:45 +03:00

46 lines
1.7 KiB
Python

"""
Custom Celery Beat scheduler to fix Python 3.12 zoneinfo compatibility issue.
This patches the TzAwareCrontab class to work with zoneinfo.ZoneInfo instead of pytz.
The error occurs because django-celery-beat assumes pytz timezone objects
which have a `normalize()` method that zoneinfo.ZoneInfo doesn't have.
"""
import functools
def apply_tzcrontab_patch():
"""
Apply monkey-patch to django_celery_beat.tzcrontab.TzAwareCrontab
to fix zoneinfo.ZoneInfo compatibility.
This should be called at Celery app initialization.
"""
from django_celery_beat import tzcrontab
original_init = tzcrontab.TzAwareCrontab.__init__
@functools.wraps(original_init)
def patched_init(self, *args, **kwargs):
# Get the tz argument, default to None
tz = kwargs.get('tz', None)
# Check if it's a zoneinfo.ZoneInfo (no 'normalize' attribute)
if tz is not None and not hasattr(tz, 'normalize'):
# Replace with a patched nowfun that works with zoneinfo
def zoneinfo_aware_nowfunc():
"""Get current time in the scheduler's timezone using Django's timezone utility."""
from django.utils import timezone as django_timezone
now = django_timezone.now()
return now.astimezone(self.tz)
# Store the zoneinfo-compatible nowfun
self._zoneinfo_nowfun = zoneinfo_aware_nowfunc
# Call original init
original_init(self, *args, **kwargs)
# If we detected zoneinfo, override the nowfun
if hasattr(self, '_zoneinfo_nowfun'):
self.nowfun = self._zoneinfo_nowfun
tzcrontab.TzAwareCrontab.__init__ = patched_init