Skip to content

Channels

Channels are the core abstraction in beryl. A channel maps a topic pattern to a set of typed callback functions that handle joins, messages, and cleanup.

Topics are colon-delimited string identifiers. Patterns can be exact matches, legacy trailing prefix wildcards, or segment-aware wildcards:

import beryl/topic
// Exact: only matches "room:lobby"
topic.parse_pattern("room:lobby") // -> Exact("room:lobby")
// Wildcard: matches "room:lobby", "room:123", etc.
topic.parse_pattern("room:*") // -> Wildcard("room:")
// Segment wildcard: matches one complete segment per "*"
topic.parse_pattern("document:*:ops")
// -> SegmentWildcard(["document", "*", "ops"])
// Multi-segment wildcard: extract tenant and document IDs
topic.parse_pattern("document:*:*")
// -> SegmentWildcard(["document", "*", "*"])
// Single trailing "*" keeps prefix wildcard behavior
topic.parse_pattern("document:tenant-a:*")
// -> Wildcard("document:tenant-a:")
// Extract the dynamic part
topic.extract_id(Wildcard("room:"), "room:lobby") // -> Ok("lobby")
// Extract multiple dynamic segments
topic.extract_wildcards(
topic.parse_pattern("document:*:*"),
"document:tenant-a:doc-42",
)
// -> Ok(["tenant-a", "doc-42"])
// Parse topic segments
topic.segments("room:lobby") // -> ["room", "lobby"]
topic.namespace("room:lobby") // -> Ok("room")

Use document:tenant-a:* to route all documents for one tenant while keeping the existing trailing-wildcard prefix semantics. Use document:*:* when a handler needs to extract both tenant and document IDs from a topic with the exact shape document:{tenant_id}:{document_id}:

let pattern = topic.parse_pattern("document:*:*")
case topic.extract_wildcards(pattern, "document:tenant-a:doc-42") {
Ok([tenant_id, document_id]) -> {
// tenant_id == "tenant-a"
// document_id == "doc-42"
}
_ -> {
// Topic did not match the expected document shape.
}
}

Channels are built using a builder pattern starting with channel.new():

import beryl/channel.{type Channel, type HandleResult, type JoinResult}
import beryl/socket.{type Socket}
import gleam/json.{type Json}
import gleam/option.{type Option, None, Some}
/// Typed assigns — compile-time checked socket state
pub type RoomAssigns {
RoomAssigns(user_id: String, room_id: String)
}
pub fn new() -> Channel(RoomAssigns) {
channel.new(join)
|> channel.with_handle_in(handle_in)
|> channel.with_handle_binary(handle_binary)
|> channel.with_terminate(terminate)
}

Called when a client sends a phx_join message. Return JoinOk to accept or JoinError to reject:

fn join(
topic: String,
payload: Json,
socket: Socket(RoomAssigns),
) -> JoinResult(RoomAssigns) {
// Extract room ID from topic pattern
let assert Ok(room_id) =
topic.extract_id(topic.Wildcard("room:"), topic)
let assigns = RoomAssigns(user_id: "user_123", room_id: room_id)
let socket = socket.set_assigns(socket, assigns)
// Optionally send a reply payload
let reply = json.object([#("status", json.string("joined"))])
channel.JoinOk(reply: Some(reply), socket: socket)
}

Called for each incoming text message. The event string identifies the message type:

fn handle_in(
event: String,
payload: Json,
socket: Socket(RoomAssigns),
) -> HandleResult(RoomAssigns) {
case event {
"new_message" -> {
// Reply to the sender — event arg is ignored, phx_reply is always sent
channel.Reply("ok", payload, socket)
}
"typing" -> {
// No reply needed
channel.NoReply(socket)
}
"update_status" -> {
// Push a server-initiated message
let response = json.object([#("updated", json.bool(True))])
channel.Push("status_changed", response, socket)
}
_ -> channel.NoReply(socket)
}
}

Channel handlers return one of these results:

ResultDescription
NoReply(socket)Continue without sending anything
Reply(event, payload, socket)Send a phx_reply tied to the client message ref (only meaningful from handle_in; see note below)
Push(event, payload, socket)Send a server-initiated message with no ref
Stop(reason)Terminate the channel

Handle raw binary WebSocket frames (bypasses the wire protocol):

fn handle_binary(
data: BitArray,
socket: Socket(RoomAssigns),
) -> HandleResult(RoomAssigns) {
// Process binary data (e.g., file uploads, audio chunks)
channel.NoReply(socket)
}

Called when a client leaves or disconnects. Use for cleanup:

fn terminate(
reason: channel.StopReason,
socket: Socket(RoomAssigns),
) -> Nil {
case reason {
channel.Normal -> Nil // Clean disconnect
channel.Shutdown -> Nil // Server-initiated
channel.HeartbeatTimeout -> Nil // Client went silent
channel.Error(msg) -> Nil // Something went wrong
}
}

Called when an OTP process sends a message directly to this channel context via beryl.send_info. Use this to push server-driven updates (e.g., database change notifications, timer ticks, background job results):

fn handle_info(
message: Dynamic,
socket: Socket(RoomAssigns),
) -> HandleResult(RoomAssigns) {
// Decode the server message and push to the client
let response = json.object([#("data", json.string("server update"))])
channel.Push("server_update", response, socket)
}
// Register the handler when building the channel
channel.new(join)
|> channel.with_handle_in(handle_in)
|> channel.with_handle_info(handle_info)

The message argument is Dynamic. Use gleam/dynamic/decode to extract typed values:

import gleam/dynamic/decode
type ServerMessage {
Tick(at: Int)
Notify(text: String)
}
fn decode_server_message(d: Dynamic) -> Result(ServerMessage, _) {
let decoder =
decode.field("type", decode.string)
|> decode.then(fn(tag) {
case tag {
"tick" -> decode.map(decode.field("at", decode.int), Tick)
"notify" -> decode.map(decode.field("text", decode.string), Notify)
_ -> decode.failure(Tick(0), "ServerMessage")
}
})
decode.run(d, decoder)
}
fn handle_info(
message: Dynamic,
socket: Socket(RoomAssigns),
) -> HandleResult(RoomAssigns) {
case decode_server_message(message) {
Ok(Tick(at)) -> {
channel.Push(
"tick",
json.object([#("at", json.int(at))]),
socket,
)
}
Ok(Notify(text)) -> {
channel.Push(
"notification",
json.object([#("text", json.string(text))]),
socket,
)
}
Error(_) -> channel.NoReply(socket)
}
}

Because server messages have no client ref, Reply returned from handle_info is sent as a push. Use Push here to make intent explicit.

Use beryl.send_info from any process to deliver a message to a specific socket/topic pair:

// In a background process or timer callback:
beryl.send_info(channels, socket_id, "room:lobby", Notify("hello!"))

If the socket is not connected, the topic is not joined, or no handle_info is registered, the message is silently ignored.

A common use case is scheduling periodic pushes to a specific client. Spawn a process when the client joins and cancel it in terminate:

import gleam/erlang/process
fn join(topic, _payload, socket) -> JoinResult(RoomAssigns) {
let socket_id = socket.id(socket)
// Spawn a timer process that sends a tick every 5 seconds
let _pid = process.spawn(fn() {
let rec = process.new_subject()
timer_loop(channels, socket_id, topic, rec)
})
channel.JoinOk(reply: None, socket: socket)
}
fn timer_loop(channels, socket_id, topic, _self) {
process.sleep(5000)
beryl.send_info(channels, socket_id, topic, Tick(erlang.system_time(erlang.Millisecond)))
timer_loop(channels, socket_id, topic, _self)
}

For production use, prefer OTP-based timers (e.g., Erlang's :timer.send_interval) over bare recursion, and track the timer PID in assigns so you can cancel it in terminate.

Register channels with the beryl system using topic patterns:

import beryl
let assert Ok(channels) = beryl.start(beryl.default_config())
// Register handlers for different topic patterns
let assert Ok(Nil) = beryl.register(channels, "room:*", room_channel.new())
let assert Ok(Nil) = beryl.register(channels, "user:*", user_channel.new())
let assert Ok(Nil) = beryl.register(channels, "system", system_channel.new())

Send messages to all subscribers of a topic:

// Broadcast to everyone on a topic
beryl.broadcast(
channels,
"room:lobby",
"new_message",
json.object([#("text", json.string("Hello!"))]),
)
// Broadcast to everyone except one socket
beryl.broadcast_from(
channels,
socket_id,
"room:lobby",
"user_typing",
json.object([#("user", json.string("alice"))]),
)

Sockets carry typed assigns that persist across messages:

import beryl/socket
// Get current assigns
let assigns = socket.get_assigns(socket)
// Update assigns (returns new socket)
let socket = socket.set_assigns(socket, RoomAssigns(..assigns, room_id: "new"))
// Transform assigns to a different type
let socket = socket.map_assigns(socket, fn(old) {
NewType(user_id: old.user_id)
})
  • Reference — module map, wire protocol details, and the broadcast/push cheatsheet
  • Presence guide — track who is online and broadcast presence diffs to clients
  • Groups guide — broadcast a single event to multiple topics at once
  • PubSub guide — distributed messaging for multi-node deployments
  • Error Handling guide — rejected joins, rate limits, and client-visible error shapes