Writing Custom AnyCable Server
You can write your own server to handle cable clients and connect them to your business logic through AnyCable.
Saying "cable clients" we want to underline that AnyCable doesn't depend on any transport protocol (e.g., WebSockets), you can use any protocol for client-server communication (e.g., RTMP, custom TCP, long-polling, etc.).
Requirements
The server should be able to:
- Communicate with gRPC server as a gRPC client
gRPC provides libraries for most popular languages (see docs).
If there is no gRPC support for your favorite language, you can build it yourself (the minimal implementation for AnyCable)–it's just HTTP2 + Protocol Buffers.
See erlgrpc for the example of a minimal gRPC client.
- Subscribe to Redis channels.
We use Redis to receive broadcast events from the application by default (see Broadcast adapters).
NOTE: You can build a custom broadcast adapter (for both–your server and anycable
gem).
For the rest of this article, we consider that we want to use Redis.
Step-by-step
Let's go through all steps to implement a custom server (using abstract language).
Step 1. Server
First of all, you need a _server_–the entry point for clients connections – which can handle incoming data and disconnection events.
interface Server {
# Invoked on socket connection.
# socket_handle is an entity (object/record/whatever) representing connection socket
func socket_conn(socket_handle);
# Invoked on socket disconnection
func socket_disconn(socket_handle);
# Invoker on incoming message
func socket_data(socket_handle, msg);
}
Step 2. Hub
Hub stores information about clients subscriptions and has the following interface:
interface Hub {
# Subscribe socket to the stream.
# We also need a channel_id to sign messages with it (see below)
func add(socket_handle, channel_id, stream);
# Unsubscribe socket from a stream for the given channel
func remove(socket_handle, channel_id, stream);
# Unsubscribe socket from all streams for the given channel
func removeAll(socket_handle, channel_id);
# Broadcast a message to all subscribed sockets
func broadcast(stream, msg);
}
Why do we need a channel_id
? This is required by Action Cable client.
The JS client doesn't know about streams, only about channels. So it needs a channel identifier to be present in incoming messages to resolve channels.
Moreover, there are no uniqueness restrictions on streams names–the same stream name can be used for different channels.
Thus, our broadcast
function may look like this:
func broadcast(stream, msg) {
# Assume that we have a nested structure to store subscriptions:
# sockets2streams
# |
# stream1
# | |
# | channel1 - (socket1, ..., socketN)
# | |
# | channel2 – ( ... )
# |
# stream2 ...
#
for (channel in channels_for_stream(stream)) {
channel_msg = msg_for_channel(msg, channel.id())
for (socket in channel.sockets()) {
socket.transmit(channel_msg)
}
}
}
# msg – JSON encoded string
# We should transform into another JSON "{"identifier":<identifier>,"message": <msg>}"
func msg_for_channel(msg, identifier) {
return json_encode(['identifier', 'message'], [identifier, json_decode(msg)]);
}
Step 3. Pinger
Action Cable clients assume that a server sends a special message–ping–every 3 seconds (configurable). Thus we should implement a pinger.
Pinger is a simple entity that holds a list of active sockets and broadcast a message to them every X seconds.
interface Pinger {
# Add socket to the active list
func register(socket_handle);
# Remove socket from the active list
func unregister(socket_handle);
}
And we need a kind of loop
method:
func loop() {
while(true) {
var msg = ping_message()
for (socket in active_sockets) {
socket.transmit(msg)
}
sleep(INTERVAL)
}
}
func ping_message() {
return json_encode(['type', 'message'], ['ping', time.utc()])
}
NOTE: ping could be implemented in a different way (e.g. via a timer attached to a client session).
Step 4. gRPC Client
Then you have to build a gRPC client using a Protobuf service definition.
It has a simple interface with only three methods: Connect
, Disconnect
and Command
.
Let's go to Step 5 to see, how to use these methods and their return values.
Step 5. Server – RPC communication
Now, when we already have a server and RPC client, let's fit them together.
NOTE: see also Action Cable protocol spec.
Client Connection
Every time a client is connected to our server we should invoke Connect
method to authorize connection:
func socket_conn(socket_handle) {
# We need request URL and cookies (if we want to use cookie-based authentication)
var url = socket_handle.url()
# Extract Cookie header and build a map { 'Cookie' => cookie_val }
# NOTE: you MAY provide more headers if you want
var headers = header('Cookie', socket_handle.header('Cookie'));
# Keep header for subsequent calls
socket_handle.setFilteredHeaders(headers);
# Then generate a payload (build protobuf msg)
# ConnectionRequest contains fields:
# env:
# url - string - request URL
# headers - map<string><string>
var env = pb::SessionEnv(url, headers)
var payload = pb::ConnectionRequest(env)
# Make a call and get a response – ConnectionResponse:
# status – Status::SUCCESS | Status::ERROR – status enum is a part of rpc.proto
# identifiers – string (connection identifiers string used by the app)
# transmissions - list of strings (repeated string)
var response = rpc::Connect(payload)
# handle response
if (response.status() == pb::Status::SUCCESS) {
# store identifiers for the socket
# we will use them in later calls
socket_handle.setIdentifiers(response.identifiers())
# update a client's connection state
socket_handle.setState(response.env().cstate())
# transmit messages to socket
# NOTE: typically Connect returns only "welcome" message
socket_handle.transmit(response.transmissions())
# register socket to pinger
pinger.register(socket_handle)
} else {
# if Status is not SUCCESS we should disconnect the socket
socket_handle.close()
# non-SUCCESS status could be:
# - ERROR - there was an en exception during the call; in this case we also have response.error_msg()
# - FAILURE - application-level "rejection" (e.g. authentication failed)
}
}
Client Commands
Command is an incoming message from the client. We should distinguish "subscribe" and "unsubscribe" command from others, 'cause they're responsible for subscriptions.
func socket_data(socket_handle, msg) {
var decoded = json_decode(msg)
var type = decoded.key("type")
# Every command is associated with the specified channel
var identifier = decoded.key("identifier")
var data = decoded.key("data")
# Generate a payload (build protobuf msg)
# CommandMessage contains fields:
# command - string
# identifier - string (channel identifier)
# connection_identifiers - string (identifiers from Connect call)
# data – string (additional provided data)
# env:
# url - string - request URL
# headers - map<string><string>
# cstate - map<string><string> — connection state obtained in socket_conn
# istate - map<string><string> — channel state for the identifier
var env = pb::SessionEnv(socket_handle.url(), socket.filtered_headers(), socket.state(), socket.channel_state(identifier))
var payload = pb::CommandMessage(type, identifier, socket_handle.identifiers(), data, env)
# Make a call and get a response – ConnectionResponse:
# status – Status::SUCCESS | Status::FAILURE | Status::ERROR– status enum is a part of rpc.proto
# disconnect – bool – whether to disconnect the client or not
# stop_streams – bool – whether to stop all existing subscriptions for the channel
# streams – list of strings – new subscriptions
# stopped_streams - list of strings —
# transmissions - list of strings – messages to send to the client
# error_msg – error message in case of ERROR
# env:
# cstate - map<string><string> — connection state changed/new fields
# istate — map<string><string> — channel state changed/new fields
var response = rpc::Command(payload)
# handle response
if (response.status() == pb::Status::SUCCESS) {
# First, handle subscription commands
# We should track client subscriptions in order to call `#unsubscribe` callbacks on disconnection
if (type == "subscribe") {
socket_handle.addSubscripton(identifier)
}
if (type == "unsubscribe") {
socket_handle.removeSubscription(identifier)
}
# Then handle other response information
# If response contains disconnect flag set to true
# The we immediately disconnect the client
if (response.disconnect()) {
return socket_handle.close()
}
# update connection state
if (response.env().cstate()) {
socket_handle.mergeState(response.env().cstate())
}
# update channel state
if (response.env().istate()) {
# socket_handle.channel_state has a form of map<string><map<string><string>>,
# where first-level keys are subscription identifiers, and values are the
# corresponding channels states
socket_handle.mergeChannelState(identifier, response.env().istate())
}
if (response.stop_streams()) {
# Stop all subscriptions for the channel
hub.removeAll(socket_handle, identifier)
}
# Add new subscriptions
for (stream in response.streams()) {
hub.add(socket_handle, identifier, stream)
}
# Remove old subscriptions
for (stream in response.stopped_streams()) {
hub.add(socket_handle, identifier, stream)
}
# And, finally, transmit messages
socket_handle.transmit(response.transmissions())
} else {
# in case of failure you may want to disconnect the client
socket_handle.close()
}
}
Client Disconnection
When a client disconnects, we should remove its subscriptions, de-register from pinger and invoke #disconnect
/#unsubscribe
callbacks in the app.
func socket_disconn(socket_handle) {
# De-register socket from pinger
pinger.unregister(socket_handle)
# Remove subscriptions
var subscriptions = socket_handle.subscriptions()
for (channel in subscriptions) {
hub.removeAll(socket_handle, channel)
}
# And only after that notify the app thru RPC
# Then generate a payload (build protobuf msg)
# DisconnectRequest contains fields:
# identifiers – string – connection identifiers
# subscriptions – list of strings – connections channels
# env:
# url – string – request URL
# headers - map<string><string>
# cstate - map<string><string> — connection state
# istate - map<string><string> — channel states for all subscriptions
# We need to encode channel states to strings to pass them as istate (which is a string-string map)
var channel_states = socket.channel_state().transform_values( (v) => JSON.encode(v) )
var env = pb::SessionEnv(socket_handle.url(), socket.filtered_headers(), socket.state(), channel_states)
var payload = pb::DisconnectRequest(socket_handle.identifier(), subscriptions, env)
# Make a call and get a response – DisconnectResponse:
# status – Status::SUCCESS | Status::ERROR – status enum is a part of rpc.proto
# Actually, response status does not matter here, we should cleanup
rpc::Disconnect(payload)
}
NOTE: It makes sense to call Disconnect
asynchronously or using a queue in order to avoid RPC calls spikes caused by mass-disconnection.
Step 6. Testing
You can use AnyT–AnyCable conformance testing tool–for integration tests.