# Aplicaciones con Pubsub

## Requisitos previos a la clase

### Instalar Docker Desktop y Configurar una Cuenta en Docker Hub


*   Descarga e instala Docker Desktop desde el [sitio web oficial de Docker](https://docs.docker.com/engine/install/).

*   Una vez instalado, crea o inicia sesión en tu cuenta de Docker Hub.

*   Abre Docker Desktop e inicia sesión con tus credenciales de Docker Hub.

### Instala Redis

*   Ejecuta el siguiente comando en tu terminal para descargar la imagen Docker:
    ```
    docker pull redis:latest
    ```

### Prueba que todo este correcto

*   Ejecuta el siguiente comando en tu terminal para correr el contenedor
    ```
    docker run --name testing-redis -p 6379:6379 -d redis
    ```

*   Ejecuta el siguiente comando en tu terminal para instalar redis
    ```
    pip install redis
    ```

*   Ejecuta el siguiente script de python para validar que redis esta corriendo correctamente
    ```python
    from redis import StrictRedis
    def main():
        redis = StrictRedis(host="localhost", port="6379", db=0)
        redis.set("name", "Ulises")
        print(redis.get("name"))
        print(redis.exists("name"))
        print(redis.exists("age"))
    if __name__ == '__main__':
        main()
    ```

Si todo esta correcto, al ejecutarlo tu terminal deberia verse algo asi:
```
b'Ulises'
1
0
```

## Construir un API REST con Django REST Framework

Crea un ambiente virtual:
```
python3 -m venv env
```

Activa el ambiente virtual:
```
# Activación en Unix
source env/bin/activate

# Activación en Windows
env\Scripts\activate
```

Instala Django, DRF y PyJWT:
```
pip install django
pip install djangorestframework
pip install google-cloud-pubsub
```

Crea un nuevo proyecto en Django:
```
django-admin startproject realtime_notifications
```

Crea una nueva aplicación en Django:
```
python manage.py startapp api
```

Agrega la aplicación de `rest_framework` y la que acabamos de crear en el archivo de `settings.py`:
```python
INSTALLED_APPS = [
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'rest_framework',
    'api'
]
```

Genera las migraciones y ejeculatas
```
python manage.py makemigrations
python manage.py migrate
```

Crea un super usuario
```
python manage.py createsuperuser
```

Corre la aplicación para corroborar que todo esta correcto
```
python manage.py runserver
```

Agrega la variables de `redis` en el archivo de `settings.py`:
```python
REDIS_HOST = "localhost"

REDIS_PORT = 6379
```

## Construir un endpoint publicador y un subscriptor

### Define la vista para el Publisher

In [None]:
#api/views.py
# Importamos redis para trabajar con el cliente de Redis.
import redis
# Importamos varias clases y funciones de Django REST Framework.
from rest_framework import status
from rest_framework.response import Response
from rest_framework.views import APIView

# Importamos el módulo settings de nuestra aplicación para obtener la configuración de Redis.
from realtime_notifications import settings

# Creamos una instancia del cliente de Redis utilizando la configuración especificada en el archivo settings.py
redis_client = redis.StrictRedis(
    host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0
)

# Esta clase manejará las solicitudes POST para publicar mensajes en Redis.
class PublishView(APIView):
    # Definimos el método post() para manejar las solicitudes POST a esta vista.
    def post(self, request, *args, **kwargs):
        # Extraemos el contenido del mensaje, si no se proporciona, message será una cadena vacía.
        message = request.data.get("message", "")
        # Si se proporciona un mensaje, lo publicamos en el canal "notifications" de Redis utilizando el cliente Redis.
        if message:
            redis_client.publish("notifications", message)
            # Devolvemos una respuesta HTTP 200 OK con un mensaje indicando que el mensaje se envió correctamente.
            return Response(
                {"status": "Message sent", "message": message},
                status=status.HTTP_200_OK,
            )
        # Si no se proporciona ningún mensaje en la solicitud, devolvemos una respuesta HTTP 400 BAD REQUEST indicando que se produjo un error.
        return Response(
            {"status": "Error", "message": "No message provided"},
            status=status.HTTP_400_BAD_REQUEST,
        )

### Configura las URLs de la aplicación

In [None]:
#api/urls.py
# Importa la función path de Django, esta función es utilizada para definir patrones de URL individuales.
from django.urls import path

# Importa ItemViewSet desde el módulo de vistas.
from .views import PublishView, SubscribeView

# Define una lista de rutas URL llamada urlpatterns.
urlpatterns = [
    # Define la ruta para las publicaciones.
    path("publish/", PublishView.as_view(), name="publish"),
]

### Configura las URLs del proyecto

In [None]:
# realtime_notifications/urls.py
# Importa el módulo admin de Django, proporciona el sitio de administración automático de Django.
from django.contrib import admin
# Importa include y path desde django.urls.
# path se utiliza para definir patrones de URL en la aplicación.
# include se usa para hacer referencia a otras configuraciones de URL en el proyecto.
from django.urls import path, include

# Define una lista llamada urlpatterns
urlpatterns = [
    # Define una ruta para el sitio de administración.
    path("admin/", admin.site.urls),
    # Define una ruta para incluir otras configuraciones de URL.
    path("api/", include("api.urls")),
]

### Define la vista para el Subscriber

In [None]:
#api/views.py
# Importamos redis para trabajar con el cliente de Redis.
import redis
# Importamos varias clases y funciones de Django REST Framework.
from rest_framework import status
from rest_framework.response import Response
from rest_framework.views import APIView

# Importamos el módulo settings de nuestra aplicación para obtener la configuración de Redis.
from realtime_notifications import settings

# Creamos una instancia del cliente de Redis utilizando la configuración especificada en el archivo settings.py
redis_client = redis.StrictRedis(
    host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0
)

# Esta clase manejará las solicitudes GET para suscribirse y recibir mensajes de Redis.
class SubscribeView(APIView):
    # Definimos el método get() para manejar las solicitudes GET a esta vista.
    def get(self, request, *args, **kwargs):
        # Creamos una instancia de pubsub utilizando el cliente de Redis.
        pubsub = redis_client.pubsub()
        # Nos suscribimos al canal "notifications", lo que significa que esta vista escuchará los mensajes publicados en ese canal.
        pubsub.subscribe("notifications")

        # Iniciamos un bucle que escucha los mensajes del canal "notifications".
        for message in pubsub.listen():
            # Cuando se recibe un mensaje, verificamos que su tipo sea "message" y decodificamos los datos del mensaje como una cadena UTF-8.
            if message["type"] == "message":
                data = message["data"].decode("utf-8")
                # Devolvemos una respuesta HTTP 200 OK con el mensaje recibido.
                return Response(
                    {"status": "Message Received", "message": data},
                    status=status.HTTP_200_OK,
                )

### Configura las URLs de la aplicación

In [None]:
#api/urls.py
# Importa la función path de Django, esta función es utilizada para definir patrones de URL individuales.
from django.urls import path

# Importa ItemViewSet desde el módulo de vistas.
from .views import PublishView, SubscribeView

# Define una lista de rutas URL llamada urlpatterns.
urlpatterns = [
    # Define la ruta para las publicaciones.
    path("publish/", PublishView.as_view(), name="publish"),
    # Define la ruta para las subscripciones.
    path("subscribe/", SubscribeView.as_view(), name="subscribe"),
]

## Construir un websocket que este escuchando los mensajes

### Agrega la aplicación de `daphne` y `channels` en el archivo de `settings.py`:



```python
INSTALLED_APPS = [
    'daphne',
    'django.contrib.admin',
    'django.contrib.auth',
    'django.contrib.contenttypes',
    'django.contrib.sessions',
    'django.contrib.messages',
    'django.contrib.staticfiles',
    'channels',
    'rest_framework',
    'api'
]
```

### Agrega `ASGI_APPLICATION` en el archivo de `settings.py`:



```python
ASGI_APPLICATION = "realtime_notifications.asgi.application"
```

### Agrega `CHANNEL_LAYERS` en el archivo de `settings.py`:



```python
REDIS_HOST = "localhost"

REDIS_PORT = 6379

CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [(REDIS_HOST, REDIS_PORT)],
        },
    },
}
```

### Define el Consumer de Notificaciones




In [None]:
# api/consumers.py
# Este módulo permite trabajar con datos en formato JSON
import json

# Este es una clase base proporcionada por Django Channels que facilita la creación de consumidores WebSocket asíncronos.
from channels.generic.websocket import AsyncWebsocketConsumer


# Define la clase NotificationConsumer.
class NotificationConsumer(AsyncWebsocketConsumer):
    # Este método es llamado automáticamente cuando un cliente intenta establecer una conexión WebSocket con el servidor.
    async def connect(self):
        # Añade la conexión al grupo 'notifications': Utiliza channel_layer, que es una abstracción para manejar grupos de canales.
        await self.channel_layer.group_add("notifications", self.channel_name)
        # Este comando finaliza el handshake del WebSocket y establece la conexión.
        await self.accept()

    # Este método se llama cuando el WebSocket se desconecta, ya sea porque el cliente cerró la conexión o por algún otro motivo.
    async def disconnect(self, close_code):
        # Al desconectarse, se asegura de eliminar el canal del grupo para no intentar enviar mensajes a un canal que ya no está activo.
        await self.channel_layer.group_discard("notifications", self.channel_name)

    # Este método se ejecuta cuando se reciben datos del cliente.
    async def receive(self, text_data=None, bytes_data=None):
        # Parsea el text_data que es una cadena JSON a un diccionario de Python.
        text_data_json = json.loads(text_data)
        # Obtiene el valor asociado a la clave "message" del diccionario.
        message = text_data_json["message"]
        # Utiliza group_send para enviar un evento al grupo 'notifications'.
        await self.channel_layer.group_send(
            "notifications", {"type": "notification.message", "message": message}
        )

    # Este método maneja los mensajes enviados al grupo 'notifications'.
    async def notification_message(self, event):
        # Extrae el mensaje que fue enviado al grupo.
        message = event["message"]
        # Convierte el mensaje a una cadena JSON y lo envía de vuelta al cliente que está conectado a este WebSocket.
        await self.send(text_data=json.dumps({"message": message}))

### Configura el routing de los websockets

In [None]:
# api/routing.py
# Esta función es parte del módulo django.urls y permite definir una ruta URL utilizando una expresión regular.
from django.urls import re_path
# Este import trae el módulo consumers
from . import consumers
# Esta lista almacenará las rutas URL para los WebSockets.
websocket_urlpatterns = [
    # Utiliza una expresión regular para coincidir con la URL que termina exactamente en ws/notifications/.
    # Esto significa que cualquier conexión WebSocket que intente conectarse a ws://<tu-dominio>/ws/notifications/ coincidirá con esta ruta.
    re_path(r"ws/notifications/$", consumers.NotificationConsumer.as_asgi()),
]

### Configura las URLs del proyecto

In [None]:
# realtime_notifications/urls.py
# Importa el módulo admin de Django, proporciona el sitio de administración automático de Django.
from django.contrib import admin
# Importa include y path desde django.urls.
# path se utiliza para definir patrones de URL en la aplicación.
# include se usa para hacer referencia a otras configuraciones de URL en el proyecto.
from django.urls import path, include
# Importa el routing de los websockets.
from api import routing as notifications_routing

# Define una lista llamada urlpatterns
urlpatterns = [
    # Define una ruta para el sitio de administración.
    path("admin/", admin.site.urls),
    # Define una ruta para incluir otras configuraciones de URL.
    path("api/", include("api.urls")),
    # Define una ruta para incluir los websockets.
    path("", include(notifications_routing.websocket_urlpatterns)),
]

### Configura la aplicación ASGI para que soporte websockets

In [None]:
# realtime_notifications/asgi.py
# Esto permite interactuar con el sistema operativo.
import os
# Este módulo de Django Channels es un middleware que proporciona manejo de autenticación a las conexiones de WebSocket.
from channels.auth import AuthMiddlewareStack
# ProtocolTypeRouter: Este enrutador permite a Django Channels manejar diferentes tipos de protocolos.
# URLRouter: Este enrutador maneja rutas de URL para conexiones WebSocket.
from channels.routing import ProtocolTypeRouter, URLRouter
# Esta función crea una aplicación ASGI a partir de la configuración de Django.
from django.core.asgi import get_asgi_application
# Este import trae las rutas de WebSocket definidas en el módulo de routing de la app api.
from api.routing import websocket_urlpatterns

# Establece la variable de entorno para la configuración de Django.
os.environ.setdefault("DJANGO_SETTINGS_MODULE", "realtime_notifications.settings")

# Define la aplicación ASGI principal.
# "http": Este clave mapea todas las solicitudes HTTP a la aplicación ASGI obtenida de get_asgi_application().
# "websocket": Esta clave mapea todas las solicitudes WebSocket al AuthMiddlewareStack, que a su vez usa URLRouter para dirigir las solicitudes a los consumidores adecuados según websocket_urlpatterns.
application = ProtocolTypeRouter(
    {
        "http": get_asgi_application(),
        "websocket": AuthMiddlewareStack(URLRouter(websocket_urlpatterns)),
    }
)

### Modifica la vista del publisher para que mande los mensajes para los channels

In [None]:
#api/views.py
# Importamos redis para trabajar con el cliente de Redis.
import redis
# Importamos varias clases y funciones de Django REST Framework.
from rest_framework import status
from rest_framework.response import Response
from rest_framework.views import APIView

# Importamos el módulo settings de nuestra aplicación para obtener la configuración de Redis.
from realtime_notifications import settings

# Esta función obtiene la capa de canal actual configurada para Django Channels
from channels.layers import get_channel_layer
# Esta función es utilizada para llamar a funciones asíncronas desde código síncrono.
from asgiref.sync import async_to_sync

# Creamos una instancia del cliente de Redis utilizando la configuración especificada en el archivo settings.py
redis_client = redis.StrictRedis(
    host=settings.REDIS_HOST, port=settings.REDIS_PORT, db=0
)

# Esta clase manejará las solicitudes POST para publicar mensajes en Redis.
class PublishView(APIView):
    # Definimos el método post() para manejar las solicitudes POST a esta vista.
    def post(self, request, *args, **kwargs):
        # Extraemos el contenido del mensaje, si no se proporciona, message será una cadena vacía.
        message = request.data.get("message", "")
        # Si hay un mensaje, procede a enviarlo.
        if message:
            # Es utilizado para interactuar con los canales y grupos definidos.
            channel_layer = get_channel_layer()
            # Utiliza group_send para enviar un mensaje al grupo 'notifications'. async_to_sync permite llamar a esta función asíncrona de forma síncrona.
            async_to_sync(channel_layer.group_send)(
                "notifications", {"type": "notification_message", "message": message}
            )
            # Devolvemos una respuesta HTTP 200 OK con un mensaje indicando que el mensaje se envió correctamente.
            return Response(
                {"status": "Message sent", "message": message},
                status=status.HTTP_200_OK,
            )
        # Si no se proporciona ningún mensaje en la solicitud, devolvemos una respuesta HTTP 400 BAD REQUEST indicando que se produjo un error.
        return Response(
            {"status": "Error", "message": "No message provided"},
            status=status.HTTP_400_BAD_REQUEST,
        )