import json import time from django.contrib.auth.models import AnonymousUser from urllib.parse import parse_qs from channels.db import database_sync_to_async from django.contrib.auth import get_user_model from inventory.models import Notification import asyncio from datetime import datetime @database_sync_to_async def get_user(user_id): User = get_user_model() try: return User.objects.get(id=user_id) except User.DoesNotExist: return AnonymousUser() @database_sync_to_async def get_notifications(user, last_id): notifications = Notification.objects.filter( user=user, id__gt=last_id, is_read=False ).order_by("created") return [ { "id": n.id, "message": n.message, "created": n.created.isoformat(), # Convert datetime to string "is_read": n.is_read, } for n in notifications ] class NotificationSSEApp: async def __call__(self, scope, receive, send): if scope["type"] != "http": return # Parse query parameters query_string = parse_qs(scope["query_string"].decode()) last_id = int(query_string.get("last_id", [0])[0]) # Get user from scope user = scope.get("user") if not user or user.is_anonymous: await self._send_response(send, 403, b"Unauthorized") return # Send SSE headers await self._send_headers(send) try: while True: try: message = await asyncio.wait_for(receive(), timeout=3) if message["type"] == "http.disconnect": print("🔌 Client disconnected") break except asyncio.TimeoutError: notifications = await get_notifications(user, last_id) for notification in notifications: await self._send_notification(send, notification) if notification["id"] > last_id: last_id = notification["id"] # Send keep-alive comment every 15 seconds await send( { "type": "http.response.body", "body": b":keep-alive\n\n", "more_body": True, } ) # await asyncio.sleep(3) except (asyncio.CancelledError, ConnectionResetError): pass finally: await self._close_connection(send) async def _send_headers(self, send): await send( { "type": "http.response.start", "status": 200, "headers": [ (b"content-type", b"text/event-stream"), (b"cache-control", b"no-cache"), (b"connection", b"keep-alive"), (b"x-accel-buffering", b"no"), ], } ) async def _send_notification(self, send, notification): try: event_str = ( f"id: {notification['id']}\n" f"event: notification\n" f"data: {json.dumps(notification)}\n\n" ) await send( { "type": "http.response.body", "body": event_str.encode("utf-8"), "more_body": True, } ) except Exception as e: print(f"Error sending notification: {e}") async def _send_response(self, send, status, body): await send( { "type": "http.response.start", "status": status, "headers": [(b"content-type", b"text/plain")], } ) await send({"type": "http.response.body", "body": body}) async def _close_connection(self, send): await send({"type": "http.response.body", "body": b""})