# import json # from django.contrib.auth.models import AnonymousUser # from django.contrib.auth import get_user_model # from django.db import close_old_connections # from urllib.parse import parse_qs # from channels.db import database_sync_to_async # from inventory.models import Notification # import asyncio # @database_sync_to_async # def get_notifications(user, last_id): # return Notification.objects.filter( # user=user, id__gt=last_id, is_read=False # ).order_by("created") # class NotificationSSEApp: # async def __call__(self, scope, receive, send): # if scope["type"] != "http": # return # query_string = parse_qs(scope["query_string"].decode()) # last_id = int(query_string.get("last_id", [0])[0]) # # Get user from scope if using AuthMiddlewareStack # user = scope.get("user", AnonymousUser()) # if not user.is_authenticated: # await send({ # "type": "http.response.start", # "status": 403, # "headers": [(b"content-type", b"text/plain")], # }) # await send({ # "type": "http.response.body", # "body": b"Unauthorized", # }) # return # await send({ # "type": "http.response.start", # "status": 200, # "headers": [ # (b"content-type", b"text/event-stream"), # (b"cache-control", b"no-cache"), # (b"x-accel-buffering", b"no"), # ] # }) # try: # while True: # close_old_connections() # notifications = await get_notifications(user, last_id) # for notification in notifications: # data = { # "id": notification.id, # "message": notification.message, # "created": notification.created.isoformat(), # "is_read": notification.is_read, # } # event_str = ( # f"id: {notification.id}\n" # f"event: notification\n" # f"data: {json.dumps(data)}\n\n" # ) # await send({ # "type": "http.response.body", # "body": event_str.encode("utf-8"), # "more_body": True # }) # last_id = notification.id # await asyncio.sleep(2) # except asyncio.CancelledError: # pass import json import time from django.contrib.auth.models import AnonymousUser from urllib.parse import parse_qs from channels.db import database_sync_to_async from django.contrib.auth import get_user_model from inventory.models import Notification import asyncio from datetime import datetime @database_sync_to_async def get_user(user_id): User = get_user_model() try: return User.objects.get(id=user_id) except User.DoesNotExist: return AnonymousUser() @database_sync_to_async def get_notifications(user, last_id): notifications = Notification.objects.filter( user=user, id__gt=last_id, is_read=False ).order_by("created") return [ { 'id': n.id, 'message': n.message, 'created': n.created.isoformat(), # Convert datetime to string 'is_read': n.is_read } for n in notifications ] class NotificationSSEApp: async def __call__(self, scope, receive, send): if scope["type"] != "http": return # Parse query parameters query_string = parse_qs(scope["query_string"].decode()) last_id = int(query_string.get("last_id", [0])[0]) # Get user from scope user = scope.get("user") if not user or user.is_anonymous: await self._send_response(send, 403, b"Unauthorized") return # Send SSE headers await self._send_headers(send) try: while True: try: message = await asyncio.wait_for(receive(), timeout=3) if message["type"] == "http.disconnect": print("🔌 Client disconnected") break except asyncio.TimeoutError: notifications = await get_notifications(user, last_id) for notification in notifications: await self._send_notification(send, notification) if notification['id'] > last_id: last_id = notification['id'] # Send keep-alive comment every 15 seconds await send({ "type": "http.response.body", "body": b":keep-alive\n\n", "more_body": True }) # await asyncio.sleep(3) except (asyncio.CancelledError, ConnectionResetError): pass finally: await self._close_connection(send) async def _send_headers(self, send): await send({ "type": "http.response.start", "status": 200, "headers": [ (b"content-type", b"text/event-stream"), (b"cache-control", b"no-cache"), (b"connection", b"keep-alive"), (b"x-accel-buffering", b"no"), ] }) async def _send_notification(self, send, notification): try: event_str = ( f"id: {notification['id']}\n" f"event: notification\n" f"data: {json.dumps(notification)}\n\n" ) await send({ "type": "http.response.body", "body": event_str.encode("utf-8"), "more_body": True }) except Exception as e: print(f"Error sending notification: {e}") async def _send_response(self, send, status, body): await send({ "type": "http.response.start", "status": status, "headers": [(b"content-type", b"text/plain")] }) await send({ "type": "http.response.body", "body": body }) async def _close_connection(self, send): await send({ "type": "http.response.body", "body": b"" })