Channels
Channels are the core abstraction in beryl. A channel maps a topic pattern to a set of typed callback functions that handle joins, messages, and cleanup.
Topics and patterns
Section titled “Topics and patterns”Topics are colon-delimited string identifiers. Patterns can be exact matches, legacy trailing prefix wildcards, or segment-aware wildcards:
import beryl/topic
// Exact: only matches "room:lobby"topic.parse_pattern("room:lobby") // -> Exact("room:lobby")
// Wildcard: matches "room:lobby", "room:123", etc.topic.parse_pattern("room:*") // -> Wildcard("room:")
// Segment wildcard: matches one complete segment per "*"topic.parse_pattern("document:*:ops")// -> SegmentWildcard(["document", "*", "ops"])
// Multi-segment wildcard: extract tenant and document IDstopic.parse_pattern("document:*:*")// -> SegmentWildcard(["document", "*", "*"])
// Single trailing "*" keeps prefix wildcard behaviortopic.parse_pattern("document:tenant-a:*")// -> Wildcard("document:tenant-a:")
// Extract the dynamic parttopic.extract_id(Wildcard("room:"), "room:lobby") // -> Ok("lobby")
// Extract multiple dynamic segmentstopic.extract_wildcards( topic.parse_pattern("document:*:*"), "document:tenant-a:doc-42",)// -> Ok(["tenant-a", "doc-42"])
// Parse topic segmentstopic.segments("room:lobby") // -> ["room", "lobby"]topic.namespace("room:lobby") // -> Ok("room")Use document:tenant-a:* to route all documents for one tenant while keeping the existing trailing-wildcard prefix semantics. Use document:*:* when a handler needs to extract both tenant and document IDs from a topic with the exact shape document:{tenant_id}:{document_id}:
let pattern = topic.parse_pattern("document:*:*")
case topic.extract_wildcards(pattern, "document:tenant-a:doc-42") { Ok([tenant_id, document_id]) -> { // tenant_id == "tenant-a" // document_id == "doc-42" } _ -> { // Topic did not match the expected document shape. }}Defining a channel
Section titled “Defining a channel”Channels are built using a builder pattern starting with channel.new():
import beryl/channel.{type Channel, type HandleResult, type JoinResult}import beryl/socket.{type Socket}import gleam/json.{type Json}import gleam/option.{type Option, None, Some}
/// Typed assigns — compile-time checked socket statepub type RoomAssigns { RoomAssigns(user_id: String, room_id: String)}
pub fn new() -> Channel(RoomAssigns) { channel.new(join) |> channel.with_handle_in(handle_in) |> channel.with_handle_binary(handle_binary) |> channel.with_terminate(terminate)}Join callback
Section titled “Join callback”Called when a client sends a phx_join message. Return JoinOk to accept or JoinError to reject:
fn join( topic: String, payload: Json, socket: Socket(RoomAssigns),) -> JoinResult(RoomAssigns) { // Extract room ID from topic pattern let assert Ok(room_id) = topic.extract_id(topic.Wildcard("room:"), topic)
let assigns = RoomAssigns(user_id: "user_123", room_id: room_id) let socket = socket.set_assigns(socket, assigns)
// Optionally send a reply payload let reply = json.object([#("status", json.string("joined"))]) channel.JoinOk(reply: Some(reply), socket: socket)}Message handler
Section titled “Message handler”Called for each incoming text message. The event string identifies the message type:
fn handle_in( event: String, payload: Json, socket: Socket(RoomAssigns),) -> HandleResult(RoomAssigns) { case event { "new_message" -> { // Reply to the sender — event arg is ignored, phx_reply is always sent channel.Reply("ok", payload, socket) } "typing" -> { // No reply needed channel.NoReply(socket) } "update_status" -> { // Push a server-initiated message let response = json.object([#("updated", json.bool(True))]) channel.Push("status_changed", response, socket) } _ -> channel.NoReply(socket) }}Handle results
Section titled “Handle results”Channel handlers return one of these results:
| Result | Description |
|---|---|
NoReply(socket) | Continue without sending anything |
Reply(event, payload, socket) | Send a phx_reply tied to the client message ref (only meaningful from handle_in; see note below) |
Push(event, payload, socket) | Send a server-initiated message with no ref |
Stop(reason) | Terminate the channel |
Binary handler
Section titled “Binary handler”Handle raw binary WebSocket frames (bypasses the wire protocol):
fn handle_binary( data: BitArray, socket: Socket(RoomAssigns),) -> HandleResult(RoomAssigns) { // Process binary data (e.g., file uploads, audio chunks) channel.NoReply(socket)}Terminate callback
Section titled “Terminate callback”Called when a client leaves or disconnects. Use for cleanup:
fn terminate( reason: channel.StopReason, socket: Socket(RoomAssigns),) -> Nil { case reason { channel.Normal -> Nil // Clean disconnect channel.Shutdown -> Nil // Server-initiated channel.HeartbeatTimeout -> Nil // Client went silent channel.Error(msg) -> Nil // Something went wrong }}Server-originated message handler
Section titled “Server-originated message handler”Called when an OTP process sends a message directly to this channel context via beryl.send_info. Use this to push server-driven updates (e.g., database change notifications, timer ticks, background job results):
fn handle_info( message: Dynamic, socket: Socket(RoomAssigns),) -> HandleResult(RoomAssigns) { // Decode the server message and push to the client let response = json.object([#("data", json.string("server update"))]) channel.Push("server_update", response, socket)}
// Register the handler when building the channelchannel.new(join)|> channel.with_handle_in(handle_in)|> channel.with_handle_info(handle_info)The message argument is Dynamic. Use gleam/dynamic/decode to extract typed values:
import gleam/dynamic/decode
type ServerMessage { Tick(at: Int) Notify(text: String)}
fn decode_server_message(d: Dynamic) -> Result(ServerMessage, _) { let decoder = decode.field("type", decode.string) |> decode.then(fn(tag) { case tag { "tick" -> decode.map(decode.field("at", decode.int), Tick) "notify" -> decode.map(decode.field("text", decode.string), Notify) _ -> decode.failure(Tick(0), "ServerMessage") } }) decode.run(d, decoder)}
fn handle_info( message: Dynamic, socket: Socket(RoomAssigns),) -> HandleResult(RoomAssigns) { case decode_server_message(message) { Ok(Tick(at)) -> { channel.Push( "tick", json.object([#("at", json.int(at))]), socket, ) } Ok(Notify(text)) -> { channel.Push( "notification", json.object([#("text", json.string(text))]), socket, ) } Error(_) -> channel.NoReply(socket) }}Because server messages have no client ref, Reply returned from handle_info is sent as a push. Use Push here to make intent explicit.
Sending messages with send_info
Section titled “Sending messages with send_info”Use beryl.send_info from any process to deliver a message to a specific socket/topic pair:
// In a background process or timer callback:beryl.send_info(channels, socket_id, "room:lobby", Notify("hello!"))If the socket is not connected, the topic is not joined, or no handle_info is registered, the message is silently ignored.
Timer and background job patterns
Section titled “Timer and background job patterns”A common use case is scheduling periodic pushes to a specific client. Spawn a process when the client joins and cancel it in terminate:
import gleam/erlang/process
fn join(topic, _payload, socket) -> JoinResult(RoomAssigns) { let socket_id = socket.id(socket)
// Spawn a timer process that sends a tick every 5 seconds let _pid = process.spawn(fn() { let rec = process.new_subject() timer_loop(channels, socket_id, topic, rec) })
channel.JoinOk(reply: None, socket: socket)}
fn timer_loop(channels, socket_id, topic, _self) { process.sleep(5000) beryl.send_info(channels, socket_id, topic, Tick(erlang.system_time(erlang.Millisecond))) timer_loop(channels, socket_id, topic, _self)}For production use, prefer OTP-based timers (e.g., Erlang's :timer.send_interval) over bare recursion, and track the timer PID in assigns so you can cancel it in terminate.
Registering channels
Section titled “Registering channels”Register channels with the beryl system using topic patterns:
import beryl
let assert Ok(channels) = beryl.start(beryl.default_config())
// Register handlers for different topic patternslet assert Ok(Nil) = beryl.register(channels, "room:*", room_channel.new())let assert Ok(Nil) = beryl.register(channels, "user:*", user_channel.new())let assert Ok(Nil) = beryl.register(channels, "system", system_channel.new())Broadcasting
Section titled “Broadcasting”Send messages to all subscribers of a topic:
// Broadcast to everyone on a topicberyl.broadcast( channels, "room:lobby", "new_message", json.object([#("text", json.string("Hello!"))]),)
// Broadcast to everyone except one socketberyl.broadcast_from( channels, socket_id, "room:lobby", "user_typing", json.object([#("user", json.string("alice"))]),)Socket state
Section titled “Socket state”Sockets carry typed assigns that persist across messages:
import beryl/socket
// Get current assignslet assigns = socket.get_assigns(socket)
// Update assigns (returns new socket)let socket = socket.set_assigns(socket, RoomAssigns(..assigns, room_id: "new"))
// Transform assigns to a different typelet socket = socket.map_assigns(socket, fn(old) { NewType(user_id: old.user_id)})Next steps
Section titled “Next steps”- Reference — module map, wire protocol details, and the broadcast/push cheatsheet
- Presence guide — track who is online and broadcast presence diffs to clients
- Groups guide — broadcast a single event to multiple topics at once
- PubSub guide — distributed messaging for multi-node deployments
- Error Handling guide — rejected joins, rate limits, and client-visible error shapes