Channels
Channels in SocketAPI implement a publish/subscribe pattern, allowing clients to subscribe to real-time data streams. When you decorate a function with @app.channel(), it becomes a broadcasting endpoint that can push data to all subscribed clients.
Basic Channel Definition
from socketapi import SocketAPI
app = SocketAPI()
@app.channel("chat")
async def chat_channel(message: str = "Welcome"):
return {"message": message}
This creates a channel named "chat" that clients can subscribe to.
Subscribing to a Channel
Clients subscribe by sending a message with type: "subscribe":
Send:
{"type": "subscribe", "channel": "chat"}
Receive confirmation:
{"type": "subscribed", "channel": "chat"}
By default, clients do not receive initial data automatically after subscribing. They only receive the confirmation message. To enable automatic initial data delivery, use default_response=True.
Unsubscribing from a Channel
Clients can unsubscribe at any time:
Send:
{"type": "unsubscribe", "channel": "chat"}
Receive:
{"type": "unsubscribed", "channel": "chat"}
When a WebSocket connection closes, clients are automatically unsubscribed from all channels.
Broadcasting Messages
Once a channel is defined, you can broadcast messages to all subscribers by calling the decorated function:
@app.channel("chat")
async def chat_channel(message: str):
return {"message": message}
@app.action("send_message")
async def send_message(text: str):
# Broadcast to all subscribers of the "chat" channel
await chat_channel(message=text)
All subscribers will receive:
{"type": "data", "channel": "chat", "data": {"message": "text from action"}}
Default Response Behavior
By default, channels do not send initial data immediately after a client subscribes. You can enable this behavior by setting default_response=True:
@app.channel("news", default_response=True)
async def news_channel():
return {"headline": "Breaking News!"}
Send:
{"type": "subscribe", "channel": "news"}
Receive confirmation and initial data:
{"type": "subscribed", "channel": "news"}
{"type": "data", "channel": "news", "data": {"headline": "Breaking News!"}}
Parameters and Validation
Channels support typed parameters with automatic validation using Pydantic:
@app.channel("user_updates")
async def user_updates(user_id: int, include_details: bool = True):
# Fetch user data based on parameters
return {"user_id": user_id, "details": include_details}
When broadcasting, pass parameters as keyword arguments:
await user_updates(user_id=123, include_details=False)
Required Parameters on Subscribe
You can require clients to provide parameters when subscribing using the RequiredOnSubscribe annotation:
from typing import Annotated
from socketapi import RequiredOnSubscribe, SocketAPI
app = SocketAPI()
@app.channel("private_chat")
async def private_chat(
token: Annotated[str, RequiredOnSubscribe],
message: str = "Welcome"
):
# Validate token, fetch user data, etc.
return {"message": message, "authenticated": True}
Clients must provide required parameters when subscribing:
Send:
{
"type": "subscribe",
"channel": "private_chat",
"data": {"token": "secret_token"}
}
Receive:
{"type": "subscribed", "channel": "private_chat"}
Note: By default, no initial data is sent automatically. To receive initial data after subscribing, set default_response=True on the channel decorator.
If required parameters are missing or invalid:
Send:
{"type": "subscribe", "channel": "private_chat"}
Receive error:
{
"type": "error",
"message": "Invalid parameters for action 'private_chat'"
}
Using Pydantic Models
You can use Pydantic models for complex data structures:
from pydantic import BaseModel
class ChatMessage(BaseModel):
user: str
text: str
timestamp: int
@app.channel("typed_chat")
async def typed_chat(message: ChatMessage = ChatMessage(user="system", text="Welcome", timestamp=0)):
return message
Error Handling
If a client tries to subscribe to a non-existent channel:
Send:
{"type": "subscribe", "channel": "nonexistent"}
Receive:
{
"type": "error",
"message": "Channel 'nonexistent' not found."
}
Broadcasting from Outside Server Context
One of the most powerful features of SocketAPI channels is the ability to call channel functions from outside the server context - even from different processes or threads. This makes it incredibly useful for integrating with other technologies that run in separate processes.
Use Cases
- Background tasks: Celery workers, RQ jobs, or other task queues
- Database triggers: PostgreSQL NOTIFY/LISTEN or other database events
- External services: Webhooks, message queues (RabbitMQ, Redis), or third-party APIs
- Scheduled jobs: Cron jobs or APScheduler tasks
- Separate processes: Any Python process that has access to your SocketAPI app instance
How It Works
Channel functions work seamlessly regardless of where they're called from. When called from outside the WebSocket request context, SocketAPI automatically handles broadcasting to all subscribed clients.
from socketapi import SocketAPI
app = SocketAPI()
@app.channel("broadcast_channel")
async def broadcast_channel(message: str):
return {"message": message}
# This works from anywhere - inside actions, background tasks, or external processes
async def external_process():
# Call from a completely different context
await broadcast_channel(message="Hello from external process!")
Custom Host and Port Configuration
If the server is running on a different host than localhost or a different port than 8000, you need to provide these details when creating the SocketAPI object so that other processes know the server address and can communicate with it:
from socketapi import SocketAPI
# Server running on custom host and/or port
app = SocketAPI(host="192.168.1.100", port=9000)
# Now external processes can broadcast to this server
If the server is running on the default host (localhost) and port (8000), no additional configuration is needed.
Important Notes
- Channel functions are thread-safe and process-safe
- No special configuration needed - it just works!
- All subscribed clients receive broadcasts regardless of where the function is called
- Parameters are validated the same way as when called from actions
Broadcasting from a Different Machine
By default, the broadcast endpoint only accepts connections from localhost (127.0.0.1, ::1, localhost) for security reasons. If you need to call channel functions from a different machine or server, you must explicitly configure broadcast_allowed_hosts when creating the SocketAPI instance:
app = SocketAPI(
host="0.0.0.0",
port=8000,
broadcast_allowed_hosts=("127.0.0.1", "::1", "localhost", "192.168.1.50")
)
Add the IP address of the machine that needs to communicate with the server to the broadcast_allowed_hosts tuple.
This feature makes SocketAPI perfect for building real-time applications that need to integrate with existing infrastructure and external services.
Complete Example
from typing import Annotated
from socketapi import RequiredOnSubscribe, SocketAPI
app = SocketAPI()
@app.channel("notifications")
async def notifications(user_id: Annotated[int, RequiredOnSubscribe], notification_type: str = "all"):
# Fetch notifications for the user
return {
"user_id": user_id,
"type": notification_type,
"notifications": []
}
@app.action("send_notification")
async def send_notification(user_id: int, message: str):
# Broadcast notification to specific user's subscribers
await notifications(user_id=user_id, notification_type="alert")
Client workflow:
-
Subscribe with required parameters:
{"type": "subscribe", "channel": "notifications", "data": {"user_id": 123}} -
Receive confirmation (no initial data by default):
{"type": "subscribed", "channel": "notifications"} -
Receive broadcasts when
send_notificationis called:{"type": "data", "channel": "notifications", "data": {"user_id": 123, "type": "alert", "notifications": []}} -
Unsubscribe when done:
{"type": "unsubscribe", "channel": "notifications"}