PubSub
beryl's PubSub layer provides distributed publish/subscribe messaging built on Erlang's pg (process groups) module.
Starting PubSub
Section titled “Starting PubSub”import beryl/pubsub
// Default scope ("beryl_pubsub")let assert Ok(ps) = pubsub.start(pubsub.default_config())
// Custom scope (isolates process groups)let assert Ok(ps) = pubsub.start(pubsub.config_with_scope("my_app_pubsub"))The scope maps to a pg scope atom. Different scopes are completely isolated from each other.
Subscribing
Section titled “Subscribing”The calling process receives pubsub.Message values when broadcasts are sent to the topic:
// Subscribe the current processpubsub.subscribe(ps, "room:lobby")
// Unsubscribepubsub.unsubscribe(ps, "room:lobby")Messages
Section titled “Messages”Subscribers receive Message records:
pub type Message { Message( topic: String, event: String, payload: json.Json, from: PubSubFrom, )}
pub type PubSubFrom { System // Broadcast with no sender FromPid(Pid) // Broadcast from a specific process FromSocket(Pid, String) // Broadcast from a process, excluding a socket ID}FromSocket carries both the sending process PID and a socket ID to exclude. Receiving coordinators use this to suppress delivery to the named socket, so that beryl.broadcast_from correctly excludes the sender across cluster nodes.
Broadcasting
Section titled “Broadcasting”import gleam/json
// Broadcast to all subscribers (all nodes)pubsub.broadcast(ps, "room:lobby", "new_message", json.string("hello"))
// Broadcast to all except the sender processpubsub.broadcast_from( ps, process.self(), "room:lobby", "new_message", json.string("hello"),)
// Broadcast to all except a specific socket ID (clustered "broadcast except this socket")pubsub.broadcast_from_socket( ps, process.self(), // sending coordinator process socket_id, // socket ID to exclude on receiving coordinators "room:lobby", "new_message", json.string("hello"),)
// Broadcast to local node onlypubsub.local_broadcast(ps, "room:lobby", "new_message", json.string("hello"))Use broadcast_from_socket when you need to broadcast to all subscribers across a cluster while excluding one specific socket — even if that socket's coordinator is on a different node. beryl.broadcast_from calls this internally.
Querying subscribers
Section titled “Querying subscribers”// All subscribers across all nodeslet pids = pubsub.subscribers(ps, "room:lobby")
// Count subscriberslet count = pubsub.subscriber_count(ps, "room:lobby")Distributed operation
Section titled “Distributed operation”Because PubSub is built on pg, it automatically works across connected Erlang nodes. When nodes join a cluster, their process groups are merged and messages are delivered to subscribers on all nodes — no configuration required.
Integration with beryl channels
Section titled “Integration with beryl channels”The channel system uses PubSub internally for distributed broadcasts when configured:
import beryl
let assert Ok(ps) = pubsub.start(pubsub.default_config())let config = beryl.default_config() |> beryl.with_pubsub(ps)let assert Ok(channels) = beryl.start(config)
// beryl.broadcast() now sends to all nodes automaticallyberyl.broadcast(channels, "room:lobby", "event", payload)Next steps
Section titled “Next steps”- Supervision guide — supervised startup and multi-node deployment checklist
- Architecture overview — how PubSub fits into the beryl layer diagram
- Troubleshooting — diagnosing cluster broadcast failures and diverging presence state