Channel Layers
Channel layer system for enabling communication between different parts of your application.
Registry Functions
- fast_channels.layers.get_channel_layer(alias: str) BaseChannelLayer | None
Returns a channel layer by alias.
- fast_channels.layers.register_channel_layer(alias: str, layer: BaseChannelLayer) None
Register a channel layer instance.
Example
layer = create_redis_channel_layer(“redis://localhost:6379”) register_channel_layer(“my_layer”, layer)
- fast_channels.layers.unregister_channel_layer(alias: str) None
Remove a channel layer from the registry.
- fast_channels.layers.has_layers() bool
Check if any channel layers are registered.
- Returns:
True if channel layers are registered, False otherwise.
Registry Class
- class fast_channels.layers.ChannelLayerRegistry
Bases:
objectRegistry pattern for managing channel layers. Allows direct registration of channel layer instances.
- clear() None
Clear all registered layers.
- get(alias: str) BaseChannelLayer | None
Get a channel layer by alias.
- list_aliases() list[str]
Get all registered aliases.
- register(alias: str, layer: BaseChannelLayer) None
Register a channel layer instance with an alias.
- Parameters:
alias – Name to register the layer under
layer – Channel layer instance
- unregister(alias: str) None
Remove a channel layer from the registry.
Base Channel Layer
- class fast_channels.layers.BaseChannelLayer(expiry: float = 60, capacity: int = 100, channel_capacity: dict[str | Pattern[str], int] | None = None)
Bases:
objectBase channel layer class that others can inherit from, with useful common functionality.
- MAX_NAME_LENGTH: int = 100
- capacity: int
- channel_capacity: list[tuple[Pattern[str], int]]
- channel_name_regex = re.compile('^[a-zA-Z\\d\\-_.]+(\\![\\d\\w\\-_.]*)?$')
- compile_capacities(channel_capacity: dict[str | Pattern[str], int]) list[tuple[Pattern[str], int]]
Takes an input channel_capacity dict and returns the compiled list of regexes that get_capacity will look for as self.channel_capacity
- Parameters:
channel_capacity – Dictionary mapping channel patterns to capacities.
- Returns:
List of compiled regex patterns with their associated capacities.
- expiry: float
- async flush() None
Flush all messages from all channels.
- Raises:
NotImplementedError – Must be implemented by subclasses that support flushing.
- get_capacity(channel: str) int
Gets the correct capacity for the given channel; either the default, or a matching result from channel_capacity. Returns the first matching result; if you want to control the order of matches, use an ordered dict as input.
- Parameters:
channel – The channel name to get capacity for.
- Returns:
The capacity for the given channel.
- async group_add(group: str, channel: str) None
Add a channel to a group.
- Parameters:
group – The group name.
channel – The channel name to add to the group.
- Raises:
NotImplementedError – Must be implemented by subclasses that support groups.
- async group_discard(group: str, channel: str) None
Remove a channel from a group.
- Parameters:
group – The group name.
channel – The channel name to remove from the group.
- Raises:
NotImplementedError – Must be implemented by subclasses that support groups.
- group_name_regex = re.compile('^[a-zA-Z\\d\\-_.]+$')
- async group_send(group: str, message: Mapping[str, Any]) None
Send a message to all channels in a group.
- Parameters:
group – The group name to send to.
message – The message to send.
- Raises:
NotImplementedError – Must be implemented by subclasses that support groups.
- invalid_name_error = '{} name must be a valid unicode string with length < 100 containing only ASCII alphanumerics, hyphens, underscores, or periods.'
- match_type_and_length(name: str | object) bool
Check if a name is a valid string with acceptable length.
- Parameters:
name – The name to validate.
- Returns:
True if name is a string shorter than MAX_NAME_LENGTH.
- async new_channel(prefix: str = 'specific.') str
Generate a new unique channel name.
- Parameters:
prefix – Prefix for the new channel name.
- Returns:
A unique channel name.
- Raises:
NotImplementedError – Must be implemented by subclasses.
- non_local_name(name: str) str
Given a channel name, returns the “non-local” part. If the channel name is a process-specific channel (contains !) this means the part up to and including the !; if it is anything else, this means the full name.
- Parameters:
name – The channel name to process.
- Returns:
The non-local part of the channel name.
- async receive(channel: str) Mapping[str, Any]
Receive a message from a channel.
- Parameters:
channel – The channel name to receive from.
- Returns:
The received message.
- Raises:
NotImplementedError – Must be implemented by subclasses.
- require_valid_channel_name(name: str, receive: bool = False) bool
Validate a channel name according to the naming rules.
- Parameters:
name – The channel name to validate.
receive – Whether this name will be used for receiving messages.
- Returns:
True if the name is valid.
- Raises:
TypeError – If the channel name is invalid.
- require_valid_group_name(name: str) bool
Validate a group name according to the naming rules.
- Parameters:
name – The group name to validate.
- Returns:
True if the name is valid.
- Raises:
TypeError – If the group name is invalid.
- async send(channel: str, message: Mapping[str, Any]) None
Send a message to a channel.
- Parameters:
channel – The channel name to send to.
message – The message to send.
- Raises:
NotImplementedError – Must be implemented by subclasses.
In-Memory Channel Layer
- class fast_channels.layers.InMemoryChannelLayer(expiry: float = 60, group_expiry: float = 86400, capacity: int = 100, channel_capacity: dict[str | Pattern[str], int] | None = None)
Bases:
BaseChannelLayerIn-memory channel layer implementation
- async close() None
Close the channel layer (no-op for in-memory implementation).
- extensions = ['groups', 'flush']
- async flush() None
Clear all channels and groups from memory.
- async group_add(group: str, channel: str) None
Adds the channel name to a group.
- Parameters:
group – The group name.
channel – The channel name to add to the group.
- async group_discard(group: str, channel: str) None
Remove a channel from a group.
- Parameters:
group – The group name.
channel – The channel name to remove from the group.
- async group_send(group: str, message: Mapping[str, Any]) None
Send a message to all channels in a group.
- Parameters:
group – The group name to send to.
message – The message to send to all channels in the group.
- async new_channel(prefix: str = 'specific.') str
Returns a new channel name that can be used by something in our process as a specific channel.
- Parameters:
prefix – Prefix for the new channel name (default: “specific.”).
- Returns:
A unique channel name.
- async receive(channel: str) Mapping[str, Any]
Receive the first message that arrives on the channel. If more than one coroutine waits on the same channel, a random one of the waiting coroutines will get the result.
- Parameters:
channel – The channel name to receive from.
- Returns:
The received message.
- async send(channel: str, message: Mapping[str, Any]) None
Send a message onto a (general or specific) channel.
- Parameters:
channel – The channel name to send to.
message – The message to send.
- Raises:
ChannelFull – If the channel queue is at capacity.