Channel Layers

Channel layer system for enabling communication between different parts of your application.

Registry Functions

fast_channels.layers.get_channel_layer(alias: str) BaseChannelLayer | None

Returns a channel layer by alias.

fast_channels.layers.register_channel_layer(alias: str, layer: BaseChannelLayer) None

Register a channel layer instance.

Example

layer = create_redis_channel_layer(“redis://localhost:6379”) register_channel_layer(“my_layer”, layer)

fast_channels.layers.unregister_channel_layer(alias: str) None

Remove a channel layer from the registry.

fast_channels.layers.has_layers() bool

Check if any channel layers are registered.

Returns:

True if channel layers are registered, False otherwise.

Registry Class

class fast_channels.layers.ChannelLayerRegistry

Bases: object

Registry pattern for managing channel layers. Allows direct registration of channel layer instances.

clear() None

Clear all registered layers.

get(alias: str) BaseChannelLayer | None

Get a channel layer by alias.

list_aliases() list[str]

Get all registered aliases.

register(alias: str, layer: BaseChannelLayer) None

Register a channel layer instance with an alias.

Parameters:
  • alias – Name to register the layer under

  • layer – Channel layer instance

unregister(alias: str) None

Remove a channel layer from the registry.

Base Channel Layer

class fast_channels.layers.BaseChannelLayer(expiry: float = 60, capacity: int = 100, channel_capacity: dict[str | Pattern[str], int] | None = None)

Bases: object

Base channel layer class that others can inherit from, with useful common functionality.

MAX_NAME_LENGTH: int = 100
capacity: int
channel_capacity: list[tuple[Pattern[str], int]]
channel_name_regex = re.compile('^[a-zA-Z\\d\\-_.]+(\\![\\d\\w\\-_.]*)?$')
compile_capacities(channel_capacity: dict[str | Pattern[str], int]) list[tuple[Pattern[str], int]]

Takes an input channel_capacity dict and returns the compiled list of regexes that get_capacity will look for as self.channel_capacity

Parameters:

channel_capacity – Dictionary mapping channel patterns to capacities.

Returns:

List of compiled regex patterns with their associated capacities.

expiry: float
async flush() None

Flush all messages from all channels.

Raises:

NotImplementedError – Must be implemented by subclasses that support flushing.

get_capacity(channel: str) int

Gets the correct capacity for the given channel; either the default, or a matching result from channel_capacity. Returns the first matching result; if you want to control the order of matches, use an ordered dict as input.

Parameters:

channel – The channel name to get capacity for.

Returns:

The capacity for the given channel.

async group_add(group: str, channel: str) None

Add a channel to a group.

Parameters:
  • group – The group name.

  • channel – The channel name to add to the group.

Raises:

NotImplementedError – Must be implemented by subclasses that support groups.

async group_discard(group: str, channel: str) None

Remove a channel from a group.

Parameters:
  • group – The group name.

  • channel – The channel name to remove from the group.

Raises:

NotImplementedError – Must be implemented by subclasses that support groups.

group_name_regex = re.compile('^[a-zA-Z\\d\\-_.]+$')
async group_send(group: str, message: Mapping[str, Any]) None

Send a message to all channels in a group.

Parameters:
  • group – The group name to send to.

  • message – The message to send.

Raises:

NotImplementedError – Must be implemented by subclasses that support groups.

invalid_name_error = '{} name must be a valid unicode string with length < 100 containing only ASCII alphanumerics, hyphens, underscores, or periods.'
match_type_and_length(name: str | object) bool

Check if a name is a valid string with acceptable length.

Parameters:

name – The name to validate.

Returns:

True if name is a string shorter than MAX_NAME_LENGTH.

async new_channel(prefix: str = 'specific.') str

Generate a new unique channel name.

Parameters:

prefix – Prefix for the new channel name.

Returns:

A unique channel name.

Raises:

NotImplementedError – Must be implemented by subclasses.

non_local_name(name: str) str

Given a channel name, returns the “non-local” part. If the channel name is a process-specific channel (contains !) this means the part up to and including the !; if it is anything else, this means the full name.

Parameters:

name – The channel name to process.

Returns:

The non-local part of the channel name.

async receive(channel: str) Mapping[str, Any]

Receive a message from a channel.

Parameters:

channel – The channel name to receive from.

Returns:

The received message.

Raises:

NotImplementedError – Must be implemented by subclasses.

require_valid_channel_name(name: str, receive: bool = False) bool

Validate a channel name according to the naming rules.

Parameters:
  • name – The channel name to validate.

  • receive – Whether this name will be used for receiving messages.

Returns:

True if the name is valid.

Raises:

TypeError – If the channel name is invalid.

require_valid_group_name(name: str) bool

Validate a group name according to the naming rules.

Parameters:

name – The group name to validate.

Returns:

True if the name is valid.

Raises:

TypeError – If the group name is invalid.

async send(channel: str, message: Mapping[str, Any]) None

Send a message to a channel.

Parameters:
  • channel – The channel name to send to.

  • message – The message to send.

Raises:

NotImplementedError – Must be implemented by subclasses.

In-Memory Channel Layer

class fast_channels.layers.InMemoryChannelLayer(expiry: float = 60, group_expiry: float = 86400, capacity: int = 100, channel_capacity: dict[str | Pattern[str], int] | None = None)

Bases: BaseChannelLayer

In-memory channel layer implementation

async close() None

Close the channel layer (no-op for in-memory implementation).

extensions = ['groups', 'flush']
async flush() None

Clear all channels and groups from memory.

async group_add(group: str, channel: str) None

Adds the channel name to a group.

Parameters:
  • group – The group name.

  • channel – The channel name to add to the group.

async group_discard(group: str, channel: str) None

Remove a channel from a group.

Parameters:
  • group – The group name.

  • channel – The channel name to remove from the group.

async group_send(group: str, message: Mapping[str, Any]) None

Send a message to all channels in a group.

Parameters:
  • group – The group name to send to.

  • message – The message to send to all channels in the group.

async new_channel(prefix: str = 'specific.') str

Returns a new channel name that can be used by something in our process as a specific channel.

Parameters:

prefix – Prefix for the new channel name (default: “specific.”).

Returns:

A unique channel name.

async receive(channel: str) Mapping[str, Any]

Receive the first message that arrives on the channel. If more than one coroutine waits on the same channel, a random one of the waiting coroutines will get the result.

Parameters:

channel – The channel name to receive from.

Returns:

The received message.

async send(channel: str, message: Mapping[str, Any]) None

Send a message onto a (general or specific) channel.

Parameters:
  • channel – The channel name to send to.

  • message – The message to send.

Raises:

ChannelFull – If the channel queue is at capacity.

Redis Channel Layers