Showcase

Advanced Channel Layer Patterns and External Integrations

In this final section, we’ll demonstrate advanced Fast Channels patterns by creating a comprehensive showcase that combines multiple channel layer types, different consumer implementations, and external script integrations. This showcases the full power and flexibility of Fast Channels for complex real-time applications.

Understanding Advanced Channel Layer Patterns

Multiple Layer TypesDifferent types of communication require different channel layer configurations:
  • Chat Layer: Fast pub/sub for real-time chat messaging

  • Queue Layer: Reliable message delivery with persistence

  • Notifications Layer: Ephemeral notifications that don’t need persistence

  • Analytics Layer: High-throughput event processing with reliability

Consumer Type VariationsFast Channels provides different consumer base classes:
  • AsyncWebsocketConsumer: Basic WebSocket handling with text/binary data

  • AsyncJsonWebsocketConsumer: Automatic JSON encoding/decoding for structured data

  • AsyncConsumer: Low-level consumer for custom protocols

External Script IntegrationChannel layers enable communication from outside the WebSocket server:
  • Background services sending notifications

  • Analytics processors publishing events

  • System maintenance scripts broadcasting announcements

  • Monitoring tools sending alerts

What We’re Building

A comprehensive showcase application featuring:

  • Multiple Consumer Types: Different consumers using different channel layers

  • JSON vs Text Messaging: Comparison between AsyncWebsocketConsumer and AsyncJsonWebsocketConsumer

  • Layer-Specific Behaviors: Demonstrating pub/sub vs queue-based messaging

  • External Script Integration: Scripts sending messages from outside the web application

  • Real-time Multi-layer Communication: Messages flowing through different layers simultaneously

Creating the Showcase Module

First, let’s create the showcase module structure:

# Create the showcase module directory
mkdir -p sandbox/apps/showcase

# Create __init__.py file to make it a Python module
touch sandbox/apps/showcase/__init__.py

Your apps structure should now look like this:

sandbox/apps/
├── __init__.py
├── background_jobs/
│   ├── __init__.py
│   └── consumer.py
├── room_chat/
│   ├── __init__.py
│   └── consumer.py
├── showcase/
│   └── __init__.py
└── system_chat/
    ├── __init__.py
    └── consumer.py

Creating the Showcase Consumers

Now let’s create our showcase consumers that demonstrate different patterns. Create a new file at sandbox/apps/showcase/consumer.py:

Showcase Consumers Code
"""
Showcase Consumers Template

This template demonstrates multiple channel layer types working together.
Shows different layer configurations and use cases.

TODO:
1. Configure your channel layer aliases in layers.py
2. Customize group names and message types
3. Add your own consumer types as needed
"""

from fast_channels.consumer.websocket import (
    AsyncJsonWebsocketConsumer,
    AsyncWebsocketConsumer,
)


class ChatConsumer(AsyncWebsocketConsumer):
    """
    Basic chat consumer using the centralized chat layer.
    """

    groups = ["chat_room"]
    channel_layer_alias = "chat"  # TODO: Configure in your layers.py

    async def connect(self):
        await self.accept()
        # TODO: Customize join message
        await self.channel_layer.group_send(
            "chat_room",
            {"type": "chat_message", "message": "📢 Someone joined the chat"},
        )

    async def disconnect(self, code):
        await super().disconnect(code)
        # TODO: Customize leave message
        await self.channel_layer.group_send(
            "chat_room",
            {"type": "chat_message", "message": "❌ Someone left the chat."},
        )

    async def receive(self, text_data=None, bytes_data=None, **kwargs):
        # TODO: Add message processing logic
        await self.channel_layer.group_send(
            "chat_room", {"type": "chat_message", "message": f"💬 {text_data}"}
        )

    async def chat_message(self, event):
        """Called when someone has messaged our chat."""
        await self.send(event["message"])


class ReliableChatConsumer(AsyncWebsocketConsumer):
    """
    Chat consumer using queue-based layer for guaranteed message delivery.
    """

    channel_layer_alias = "queue"  # TODO: Configure in your layers.py
    groups = ["reliable_chat"]

    async def connect(self):
        await self.accept()
        # TODO: Customize connection message
        await self.channel_layer.group_send(
            "reliable_chat",
            {
                "type": "reliable_chat_message",
                "message": "🔒 Reliable chat connection established!",
            },
        )

    async def receive(self, text_data=None, bytes_data=None, **kwargs):
        # TODO: Add reliable message processing
        await self.channel_layer.group_send(
            "reliable_chat",
            {"type": "reliable_chat_message", "message": f"📨 {text_data}"},
        )

    async def disconnect(self, close_code):
        # TODO: Add disconnect handling
        await self.channel_layer.group_send(
            "reliable_chat",
            {"type": "reliable_chat_message", "message": "🚪 Left reliable chat!"},
        )

    async def reliable_chat_message(self, event):
        """Called when someone has messaged our reliable chat."""
        await self.send(event["message"])


class NotificationConsumer(AsyncJsonWebsocketConsumer):
    """
    Consumer for real-time notifications using JSON messages.
    """

    channel_layer_alias = "notifications"  # TODO: Configure in your layers.py
    groups = ["notifications"]

    async def connect(self):
        await self.accept()
        # TODO: Customize notification connection message
        await self.channel_layer.group_send(
            "notifications",
            {
                "type": "notification_message",
                "data": {"type": "system", "message": "🔔 Connected to notifications!"},
            },
        )

    async def receive_json(self, content, **kwargs):
        # TODO: Add JSON notification processing logic
        await self.channel_layer.group_send(
            "notifications",
            {
                "type": "notification_message",
                "data": {
                    "type": "user",
                    "message": (
                        f"🔔 Notification: {content.get('message', 'No message')}"
                    ),
                },
            },
        )

    async def disconnect(self, close_code):
        # TODO: Add notification disconnect handling if needed
        pass

    async def notification_message(self, event):
        """Called when a notification is sent to the group."""
        await self.send_json(event["data"])


# TODO: Implement the AnalyticsConsumer class below
# Hints:
# 1. Use channel_layer_alias = "analytics"
# 2. Use groups = ["analytics"]
# 3. Create analytics_message method to handle events
# 4. Consider JSON message format for structured analytics data
# 5. Process analytics events and send them to the group
# 6. Handle connection/disconnection as needed for your use case


class AnalyticsConsumer(AsyncWebsocketConsumer):
    """
    TODO: Consumer for analytics events with reliable delivery.

    Implement this consumer by:
    1. Setting up the proper channel layer alias and groups
    2. Handling connection, receive, and disconnect methods
    3. Creating an analytics_message method for group events
    4. Processing analytics events appropriately
    """

    # TODO: Add your implementation here
    pass

Understanding the Showcase Consumers

Let’s examine the different patterns demonstrated:

ChatConsumer (AsyncWebsocketConsumer):
  • Uses “chat” layer (Redis pub/sub)

  • Text-based messaging with receive(text_data)

  • Fast, ephemeral communication

  • Auto-joins “chat_room” group via groups attribute

ReliableChatConsumer (AsyncWebsocketConsumer):
  • Uses “queue” layer (Redis queue-based)

  • Guaranteed message delivery with persistence

  • Manual group management in connect/disconnect

  • Ideal for critical communications

NotificationConsumer (AsyncJsonWebsocketConsumer):
  • Uses “notifications” layer

  • JSON-based messaging with receive_json(content) and send_json(data)

  • Structured data with type information

  • Automatic JSON encoding/decoding

AnalyticsConsumer (Your Implementation):
  • Uses “analytics” layer for high-throughput events

  • Exercise for you to implement based on the TODO hints

  • Should handle analytics events and metrics

Creating the External Script

First, let’s create an external script that demonstrates sending messages to WebSocket clients from outside the web application. This will show you what kinds of analytics events you’ll need to handle. Create the file sandbox/external_sender.py:

External Script Code
"""
Example of sending messages to WebSocket groups from outside a consumer,
using centralized layer configuration from layers.py.
"""

import asyncio

from fast_channels.layers import get_channel_layer

from sandbox.layers import setup_layers

setup_layers()

chat_layer = get_channel_layer("chat")
queue_layer = get_channel_layer("queue")
notifications_layer = get_channel_layer("notifications")
analytics_layer = get_channel_layer("analytics")


async def send_chat_message():
    """
    Send a message to the chat room using the chat layer.
    """
    if not chat_layer:
        print("❌ No chat layer available!")
        return

    await chat_layer.group_send(
        "chat_room",
        {
            "type": "chat_message",
            "message": "🔔 System announcement: Welcome to the chat!",
        },
    )

    print("✅ Chat message sent!")


async def send_reliable_message():
    """
    Send a message using the queue-based layer for guaranteed delivery.
    """
    if not queue_layer:
        print("❌ No queue layer available!")
        return

    await queue_layer.group_send(
        "reliable_chat",
        {
            "type": "reliable_chat_message",
            "message": "🔒 Important: System maintenance scheduled for tonight",
        },
    )

    print("✅ Reliable message sent!")


async def send_notification():
    """
    Send a notification using the notifications layer.
    """
    if not notifications_layer:
        print("❌ No notifications layer available!")
        return

    await notifications_layer.group_send(
        "notifications",
        {
            "type": "notification_message",
            "data": {
                "type": "system",
                "message": "🚨 Alert: High CPU usage detected on server",
            },
        },
    )

    print("✅ Notification sent!")


async def send_analytics_event():
    """
    Send analytics events using the analytics layer.
    """
    if not analytics_layer:
        print("❌ No analytics layer available!")
        return

    # Send multiple analytics events
    events = [
        "user_login:john_doe",
        "page_view:/dashboard",
        "button_click:export_data",
        "session_duration:1234",
        "error:api_timeout",
    ]

    for event in events:
        await analytics_layer.group_send(
            "analytics", {"type": "analytics_message", "message": f"📊 Event: {event}"}
        )
        await asyncio.sleep(0.1)  # Small delay between events

    print(f"✅ Sent {len(events)} analytics events!")


async def send_to_multiple_layers():
    """
    Demonstrate sending to different layers for different purposes.
    """
    print("🚀 Broadcasting to multiple layers...")

    # Send to chat (fast pub/sub)
    await chat_layer.group_send(
        "chat_room",
        {
            "type": "chat_message",
            "message": "💬 Multi-layer broadcast: Chat message",
        },
    )

    # Send to notifications (ephemeral)
    await notifications_layer.group_send(
        "notifications",
        {
            "type": "notification_message",
            "data": {
                "type": "broadcast",
                "message": "🔔 Multi-layer broadcast: Notification",
            },
        },
    )

    # Send to queue (reliable)
    await queue_layer.group_send(
        "reliable_chat",
        {
            "type": "reliable_chat_message",
            "message": "📨 Multi-layer broadcast: Reliable message",
        },
    )

    print("✅ Multi-layer broadcast complete!")


async def periodic_announcements():
    """
    Send periodic announcements to different channels.
    """
    print("⏰ Starting periodic announcements...")

    for i in range(3):
        # Alternate between different layers
        if i % 2 == 0:
            layer = chat_layer
            group = "chat_room"
            message = f"⏰ Hourly chat announcement #{i+1}"
        else:
            layer = notifications_layer
            group = "notifications"
            message = f"🔔 System status update #{i+1}"

        if layer:
            if group == "chat_room":
                await layer.group_send(
                    group, {"type": "chat_message", "message": message}
                )
            else:  # notifications
                await layer.group_send(
                    group,
                    {
                        "type": "notification_message",
                        "data": {"type": "periodic", "message": message},
                    },
                )
            print(f"✅ Sent announcement #{i+1}")

        await asyncio.sleep(1)  # 1 second between announcements

    print("✅ Periodic announcements complete!")


async def main():
    """
    Run all external messaging examples.
    """
    print("=== External Messaging with Centralized Layers ===\n")

    # Import layers module to trigger setup

    examples = [
        ("Chat Message", send_chat_message),
        ("Reliable Message", send_reliable_message),
        ("Notification", send_notification),
        ("Analytics Events", send_analytics_event),
        ("Multi-layer Broadcast", send_to_multiple_layers),
        ("Periodic Announcements", periodic_announcements),
    ]

    for name, func in examples:
        print(f"🎯 Running: {name}")
        try:
            await func()
        except Exception as e:
            print(f"❌ Error in {name}: {e}")
        print()  # Add spacing between examples
        await asyncio.sleep(0.5)  # Brief pause between examples

    print("=== All Examples Complete! ===")


if __name__ == "__main__":
    asyncio.run(main())

This external script demonstrates:

  • Multi-layer Communication: Sends messages through different channel layers

  • External Integration: Shows how background services can communicate with WebSocket clients

  • Layer-specific Messaging: Different message types for different purposes

  • Periodic Operations: Automated announcements and system notifications

Notice the send_analytics_event() function that sends various analytics events like “user_login”, “page_view”, “button_click”, etc. This gives you a good idea of what your AnalyticsConsumer should handle.

Implementing the AnalyticsConsumer

Now that you’ve seen what kinds of analytics events the external script sends, try implementing the AnalyticsConsumer yourself based on the TODO hints in the template. Here’s the complete code for you to compare and reference:

AnalyticsConsumer Implementation
class AnalyticsConsumer(AsyncWebsocketConsumer):
    """
    Consumer for analytics events with reliable delivery.
    """

    channel_layer_alias = "analytics"
    groups = ["analytics"]

    async def connect(self):
        await self.accept()
        # Send connection confirmation
        await self.channel_layer.group_send(
            "analytics",
            {
                "type": "analytics_message",
                "message": "📊 Analytics tracking started!"
            },
        )

    async def receive(self, text_data=None, bytes_data=None, **kwargs):
        # Process analytics event
        await self.channel_layer.group_send(
            "analytics",
            {
                "type": "analytics_message",
                "message": f"📊 Analytics: {text_data}"
            },
        )

    async def disconnect(self, close_code):
        # Optional: Send disconnect analytics
        pass

    async def analytics_message(self, event):
        """Called when an analytics event is sent to the group."""
        await self.send(event["message"])

Integrating the Showcase Consumers

Connect all showcase consumers to your FastAPI application. Open sandbox/main.py and uncomment these lines:

# Uncomment these import lines:
from sandbox.apps.showcase.consumer import (
    AnalyticsConsumer,
    ChatConsumer,
    NotificationConsumer,
    ReliableChatConsumer,
)

# Uncomment these WebSocket routes:
ws_router.add_websocket_route("/chat", ChatConsumer.as_asgi())
ws_router.add_websocket_route("/reliable", ReliableChatConsumer.as_asgi())
ws_router.add_websocket_route("/notifications", NotificationConsumer.as_asgi())
ws_router.add_websocket_route("/analytics", AnalyticsConsumer.as_asgi())

These routes create WebSocket endpoints for each showcase consumer type.

Testing the Showcase

Your final project structure should now look like this:

tutorial-project/
├── docker-compose.yml
└── sandbox/
    ├── __init__.py
    ├── main.py
    ├── layers.py
    ├── tasks.py
    ├── worker.py
    ├── start_dev.py
    ├── external_sender.py
    ├── apps/
    │   ├── __init__.py
    │   ├── background_jobs/
    │   │   ├── __init__.py
    │   │   └── consumer.py
    │   ├── room_chat/
    │   │   ├── __init__.py
    │   │   └── consumer.py
    │   ├── showcase/
    │   │   ├── __init__.py
    │   │   └── consumer.py
    │   └── system_chat/
    │       ├── __init__.py
    │       └── consumer.py
    └── static/
        ├── css/
        │   └── style.css
        └── js/
            └── main.js
  1. Stop any running servers:

If you have a server running, stop it first (Ctrl+C) to avoid conflicts.

  1. Start the development environment:

# Use the development launcher
python sandbox/start_dev.py
  1. Test the showcase features:

Visit http://localhost:8080 in your browser. You should see the “Showcase” section.

Try these interactions to see different layer behaviors:

  • Type in the Showcase section and press “Send” - You’ll receive messages back through both chat and reliable chat layers

  • Run the external script in a separate terminal: python -m sandbox.external_sender - You’ll see messages appear in real-time from the external script

Expected Behavior:
  • Messages flow through different channel layers simultaneously

  • JSON notifications have structured data format

  • External script sends messages that appear in real-time

  • Different layers show different messaging patterns

Showcase demo showing multiple channel layers working together

Customizing the Showcase

1. Adding Custom Layer Types:

# In sandbox/layers.py, add new layer configurations:
"monitoring": {
    "layer": RedisPubSubChannelLayer(hosts=[redis_url], prefix="monitor"),
    "description": "Real-time monitoring layer",
},

2. Creating Custom JSON Consumers:

class CustomJsonConsumer(AsyncJsonWebsocketConsumer):
    async def receive_json(self, content, **kwargs):
        # Handle structured data
        message_type = content.get("type")
        payload = content.get("payload")

        if message_type == "user_action":
            await self.handle_user_action(payload)
        elif message_type == "system_event":
            await self.handle_system_event(payload)

3. Advanced External Scripts:

# Create monitoring script
async def send_monitoring_alert():
    layer = get_channel_layer("monitoring")
    await layer.group_send(
        "alerts",
        {
            "type": "alert_message",
            "data": {
                "severity": "high",
                "message": "CPU usage above 90%",
                "timestamp": time.time()
            }
        }
    )

4. Cross-Layer Communication:

async def broadcast_to_all_layers(message):
    """Send message to multiple layer types simultaneously."""
    layers = ["chat", "notifications", "analytics"]

    for layer_name in layers:
        layer = get_channel_layer(layer_name)
        if layer:
            await layer.group_send(
                f"{layer_name}_group",
                {"type": f"{layer_name}_message", "message": message}
            )

Troubleshooting

External Script Not Working:

Ensure Redis is running and channel layers are set up: docker compose ps

JSON Messages Not Parsing:

Check that you’re sending valid JSON to AsyncJsonWebsocketConsumer endpoints

Messages Not Appearing:

Verify all imports and routes are uncommented in sandbox/main.py

Layer Connection Issues:

Ensure setup_layers() is called before any channel layer operations

External Script Import Errors:

Make sure you’re running the script from the project root directory

What You’ve Accomplished

Congratulations! You’ve now mastered Fast Channels and built a comprehensive real-time application. You understand:

Multiple Channel Layer Types: Pub/sub, queue-based, and specialized layers

Advanced Consumer Patterns: Text vs JSON consumers and their use cases

External Script Integration: Sending messages from outside the web application

Cross-Process Communication: Coordinating between different services and scripts

Production-Ready Patterns: Scalable architectures using different layer types

Real-time Application Development: Complete workflow from simple echo to complex multi-layer systems

References

Learning Resources:
  • Explore the complete /sandbox code for more examples

  • Read the Fast Channels documentation for advanced configuration options

  • Study Django Channels patterns that inspired Fast Channels design

Thank you for completing the Fast Channels tutorial! You’re now ready to build powerful, scalable real-time applications with confidence.