# import json # from django.contrib.auth.models import AnonymousUser # from django.contrib.auth import get_user_model # from django.db import close_old_connections # from urllib.parse import parse_qs # from channels.db import database_sync_to_async # from inventory.models import Notification # import asyncio # @database_sync_to_async # def get_notifications(user, last_id): # return Notification.objects.filter( # user=user, id__gt=last_id, is_read=False # ).order_by("created") # class NotificationSSEApp: # async def __call__(self, scope, receive, send): # if scope["type"] != "http": # return # query_string = parse_qs(scope["query_string"].decode()) # last_id = int(query_string.get("last_id", [0])[0]) # # Get user from scope if using AuthMiddlewareStack # user = scope.get("user", AnonymousUser()) # if not user.is_authenticated: # await send({ # "type": "http.response.start", # "status": 403, # "headers": [(b"content-type", b"text/plain")], # }) # await send({ # "type": "http.response.body", # "body": b"Unauthorized", # }) # return # await send({ # "type": "http.response.start", # "status": 200, # "headers": [ # (b"content-type", b"text/event-stream"), # (b"cache-control", b"no-cache"), # (b"x-accel-buffering", b"no"), # ] # }) # try: # while True: # close_old_connections() # notifications = await get_notifications(user, last_id) # for notification in notifications: # data = { # "id": notification.id, # "message": notification.message, # "created": notification.created.isoformat(), # "is_read": notification.is_read, # } # event_str = ( # f"id: {notification.id}\n" # f"event: notification\n" # f"data: {json.dumps(data)}\n\n" # ) # await send({ # "type": "http.response.body", # "body": event_str.encode("utf-8"), # "more_body": True # }) # last_id = notification.id # await asyncio.sleep(2) # except asyncio.CancelledError: # pass import json import time from django.contrib.auth.models import AnonymousUser from urllib.parse import parse_qs from channels.db import database_sync_to_async from django.contrib.auth import get_user_model from inventory.models import Notification import asyncio from datetime import datetime @database_sync_to_async def get_user(user_id): User = get_user_model() try: return User.objects.get(id=user_id) except User.DoesNotExist: return AnonymousUser() @database_sync_to_async def get_notifications(user, last_id): notifications = Notification.objects.filter( user=user, id__gt=last_id, is_read=False ).order_by("created") return [ { "id": n.id, "message": n.message, "created": n.created.isoformat(), # Convert datetime to string "is_read": n.is_read, } for n in notifications ] class NotificationSSEApp: async def __call__(self, scope, receive, send): if scope["type"] != "http": return # Parse query parameters query_string = parse_qs(scope["query_string"].decode()) last_id = int(query_string.get("last_id", [0])[0]) # Get user from scope user = scope.get("user") if not user or user.is_anonymous: await self._send_response(send, 403, b"Unauthorized") return # Send SSE headers await self._send_headers(send) try: while True: try: message = await asyncio.wait_for(receive(), timeout=3) if message["type"] == "http.disconnect": print("🔌 Client disconnected") break except asyncio.TimeoutError: notifications = await get_notifications(user, last_id) for notification in notifications: await self._send_notification(send, notification) if notification["id"] > last_id: last_id = notification["id"] # Send keep-alive comment every 15 seconds await send( { "type": "http.response.body", "body": b":keep-alive\n\n", "more_body": True, } ) # await asyncio.sleep(3) except (asyncio.CancelledError, ConnectionResetError): pass finally: await self._close_connection(send) async def _send_headers(self, send): await send( { "type": "http.response.start", "status": 200, "headers": [ (b"content-type", b"text/event-stream"), (b"cache-control", b"no-cache"), (b"connection", b"keep-alive"), (b"x-accel-buffering", b"no"), ], } ) async def _send_notification(self, send, notification): try: event_str = ( f"id: {notification['id']}\n" f"event: notification\n" f"data: {json.dumps(notification)}\n\n" ) await send( { "type": "http.response.body", "body": event_str.encode("utf-8"), "more_body": True, } ) except Exception as e: print(f"Error sending notification: {e}") async def _send_response(self, send, status, body): await send( { "type": "http.response.start", "status": status, "headers": [(b"content-type", b"text/plain")], } ) await send({"type": "http.response.body", "body": body}) async def _close_connection(self, send): await send({"type": "http.response.body", "body": b""})