205 lines
6.5 KiB
Python
205 lines
6.5 KiB
Python
# import json
|
|
# from django.contrib.auth.models import AnonymousUser
|
|
# from django.contrib.auth import get_user_model
|
|
# from django.db import close_old_connections
|
|
# from urllib.parse import parse_qs
|
|
# from channels.db import database_sync_to_async
|
|
# from inventory.models import Notification
|
|
# import asyncio
|
|
|
|
# @database_sync_to_async
|
|
# def get_notifications(user, last_id):
|
|
# return Notification.objects.filter(
|
|
# user=user, id__gt=last_id, is_read=False
|
|
# ).order_by("created")
|
|
|
|
# class NotificationSSEApp:
|
|
# async def __call__(self, scope, receive, send):
|
|
# if scope["type"] != "http":
|
|
# return
|
|
|
|
# query_string = parse_qs(scope["query_string"].decode())
|
|
# last_id = int(query_string.get("last_id", [0])[0])
|
|
|
|
# # Get user from scope if using AuthMiddlewareStack
|
|
# user = scope.get("user", AnonymousUser())
|
|
# if not user.is_authenticated:
|
|
# await send({
|
|
# "type": "http.response.start",
|
|
# "status": 403,
|
|
# "headers": [(b"content-type", b"text/plain")],
|
|
# })
|
|
# await send({
|
|
# "type": "http.response.body",
|
|
# "body": b"Unauthorized",
|
|
# })
|
|
# return
|
|
|
|
# await send({
|
|
# "type": "http.response.start",
|
|
# "status": 200,
|
|
# "headers": [
|
|
# (b"content-type", b"text/event-stream"),
|
|
# (b"cache-control", b"no-cache"),
|
|
# (b"x-accel-buffering", b"no"),
|
|
# ]
|
|
# })
|
|
|
|
# try:
|
|
# while True:
|
|
# close_old_connections()
|
|
|
|
# notifications = await get_notifications(user, last_id)
|
|
# for notification in notifications:
|
|
# data = {
|
|
# "id": notification.id,
|
|
# "message": notification.message,
|
|
# "created": notification.created.isoformat(),
|
|
# "is_read": notification.is_read,
|
|
# }
|
|
|
|
# event_str = (
|
|
# f"id: {notification.id}\n"
|
|
# f"event: notification\n"
|
|
# f"data: {json.dumps(data)}\n\n"
|
|
# )
|
|
|
|
# await send({
|
|
# "type": "http.response.body",
|
|
# "body": event_str.encode("utf-8"),
|
|
# "more_body": True
|
|
# })
|
|
|
|
# last_id = notification.id
|
|
|
|
# await asyncio.sleep(2)
|
|
|
|
# except asyncio.CancelledError:
|
|
# pass
|
|
|
|
import json
|
|
import time
|
|
from django.contrib.auth.models import AnonymousUser
|
|
from urllib.parse import parse_qs
|
|
from channels.db import database_sync_to_async
|
|
from django.contrib.auth import get_user_model
|
|
from inventory.models import Notification
|
|
import asyncio
|
|
from datetime import datetime
|
|
|
|
@database_sync_to_async
|
|
def get_user(user_id):
|
|
User = get_user_model()
|
|
try:
|
|
return User.objects.get(id=user_id)
|
|
except User.DoesNotExist:
|
|
return AnonymousUser()
|
|
|
|
@database_sync_to_async
|
|
def get_notifications(user, last_id):
|
|
notifications = Notification.objects.filter(
|
|
user=user,
|
|
id__gt=last_id,
|
|
is_read=False
|
|
).order_by("created")
|
|
|
|
return [
|
|
{
|
|
'id': n.id,
|
|
'message': n.message,
|
|
'created': n.created.isoformat(), # Convert datetime to string
|
|
'is_read': n.is_read
|
|
}
|
|
for n in notifications
|
|
]
|
|
|
|
class NotificationSSEApp:
|
|
async def __call__(self, scope, receive, send):
|
|
if scope["type"] != "http":
|
|
return
|
|
|
|
# Parse query parameters
|
|
query_string = parse_qs(scope["query_string"].decode())
|
|
last_id = int(query_string.get("last_id", [0])[0])
|
|
|
|
# Get user from scope
|
|
user = scope.get("user")
|
|
if not user or user.is_anonymous:
|
|
await self._send_response(send, 403, b"Unauthorized")
|
|
return
|
|
|
|
# Send SSE headers
|
|
await self._send_headers(send)
|
|
|
|
try:
|
|
while True:
|
|
try:
|
|
message = await asyncio.wait_for(receive(), timeout=3)
|
|
if message["type"] == "http.disconnect":
|
|
print("🔌 Client disconnected")
|
|
break
|
|
except asyncio.TimeoutError:
|
|
notifications = await get_notifications(user, last_id)
|
|
|
|
for notification in notifications:
|
|
await self._send_notification(send, notification)
|
|
if notification['id'] > last_id:
|
|
last_id = notification['id']
|
|
|
|
# Send keep-alive comment every 15 seconds
|
|
await send({
|
|
"type": "http.response.body",
|
|
"body": b":keep-alive\n\n",
|
|
"more_body": True
|
|
})
|
|
|
|
# await asyncio.sleep(3)
|
|
|
|
except (asyncio.CancelledError, ConnectionResetError):
|
|
pass
|
|
finally:
|
|
await self._close_connection(send)
|
|
|
|
async def _send_headers(self, send):
|
|
await send({
|
|
"type": "http.response.start",
|
|
"status": 200,
|
|
"headers": [
|
|
(b"content-type", b"text/event-stream"),
|
|
(b"cache-control", b"no-cache"),
|
|
(b"connection", b"keep-alive"),
|
|
(b"x-accel-buffering", b"no"),
|
|
]
|
|
})
|
|
|
|
async def _send_notification(self, send, notification):
|
|
try:
|
|
event_str = (
|
|
f"id: {notification['id']}\n"
|
|
f"event: notification\n"
|
|
f"data: {json.dumps(notification)}\n\n"
|
|
)
|
|
await send({
|
|
"type": "http.response.body",
|
|
"body": event_str.encode("utf-8"),
|
|
"more_body": True
|
|
})
|
|
except Exception as e:
|
|
print(f"Error sending notification: {e}")
|
|
|
|
async def _send_response(self, send, status, body):
|
|
await send({
|
|
"type": "http.response.start",
|
|
"status": status,
|
|
"headers": [(b"content-type", b"text/plain")]
|
|
})
|
|
await send({
|
|
"type": "http.response.body",
|
|
"body": body
|
|
})
|
|
|
|
async def _close_connection(self, send):
|
|
await send({
|
|
"type": "http.response.body",
|
|
"body": b""
|
|
}) |