In this series, we've seen how to create GraphQL APIs in Elixir using Absinthe. So far, we have only discussed a one-way communication channel where the client makes the queries or mutations, and the server responds.
GraphQL also supports a long-running subscription between the client and the server where the server can notify the client of events. This can be very useful in multi-user scenarios where many users might interact with the same resource at the same time. For example, in a blogging application, the client can subscribe to the server to receive new comments as they are posted instead of having to poll the server for them.
This is now a two-way communication and the regular HTTP transport is not well suited for it. Let’s see how subscriptions work and how to create them.
Creating Subscriptions in Absinthe for Elixir
Creating subscriptions is very similar to creating query or mutation fields.
We just need to create a field
inside the subscription do ... end
block of our schema.
Let’s create a subscription for observing new comments inside the schema:
# lib/my_app_web/schema.ex defmodule MyAppWeb.Schema do use Absinthe.Schema # ... subscription do field :comment_added, :comment do arg :post_id, non_null(:id) config fn %{post_id: id}, %{context: context} -> {:ok, topic: "post:#{id}"} end end end end
Let’s break this down.
We add a field named comment_added
inside the subscription block.
The return type of this field is comment
(defined using the object
macro in Absinthe — see An Introduction to Absinthe for details).
This field accepts an argument named post_id
which is required.
So far, everything is similar to what we would do when creating a field in a query
.
The config
macro is what is new here and it is used to configure the subscription field.
This is also where we can perform other checks like authorization to accept/reject the subscription.
The function passed to config receives the arguments passed to the field and the Absinthe.Resolution
struct that contains information like context.
The return value can be an {:ok, topic: topic}
tuple for success or {:error, reason}
tuple.
The topic
is what Absinthe uses to identify subscribers who should be notified of an event.
We will get to it in a few minutes.
But just for completeness, the topic
can be a single string or a list containing multiple topics to subscribe to.
Publishing Data to Subscriptions in GraphQL
We have a subscription added to our schema. But even after subscribing, we won’t get back any data because we haven’t triggered any updates yet.
Trigger from Mutations
The easiest way to trigger the subscription is to use the trigger/2
macro inside the subscription field.
It accepts the name of a mutation
to trigger the subscription and a topic
function.
Assuming that we have a mutation named create_comment
in our schema to create a comment, let’s update our subscription to trigger every time a comment is created.
defmodule MyAppWeb.Schema do use Absinthe.Schema # ... subscription do field :comment_added, :comment do # arg and config ... trigger :create_comment, topic: fn comment -> "post:#{comment.post_id}" end end end end
The function we pass as topic
to trigger
will be executed with the create_comment
mutation result.
It should return the name of the topic that this mutation will trigger.
Let’s see this in action. First, user A runs a subscription query like this:
subscription { commentAdded(postId: 1) { id body author { id name } } }
Absinthe records the subscription, but the user doesn’t receive any data straight away.
Now, user B creates a new comment:
mutation { createComment(comment: { postId: 1, body: "nice post!" }) { id } }
User B will receive the mutation response as usual based on the selections:
{ "data": { "createComment": { "id": 1 } } }
Additionally, user A will also receive a response on their subscription. Since they selected many more fields when subscribing, the response will include them all regardless of the selections in the mutation.
{ "data": { "commentAdded": { "id": 1, "body": "nice post!", "author": { "id": 1, "name": "User B" } } } }
Note that with the current setup, user B won't actually receive any updates yet since the subscription wasn't made using a WebSocket transport. We will see how to set that up later in this post.
Translate Mutation Result to Subscription Result
If your mutation returns a different data structure from what the subscription is supposed to return, it is also possible to use a resolve
inside the subscription to convert the result first.
For example, assume that instead of returning a comment
, the create_comment
mutation returns a complex object that also includes validation errors in case of failure.
In that case, we can provide a resolver for the comment_added
subscription field to fetch the comment before returning the result.
We will also need to modify our trigger to create the topic from the response struct.
defmodule MyAppWeb.Schema do use Absinthe.Schema object :comment_mutation_result do field :comment, :comment field :errors, list_of(:string) end mutation do field :create_comment, :comment_mutation_result do arg :comment, non_null(:comment_create_input) resolve fn %{comment: attrs}, _resolution -> case MyAppWeb.Blog.create_comment(attrs) do {:ok, comment} -> {:ok, %{comment: comment, errors: []}} {:error, changeset} -> {:ok, %{comment: nil, errors: translate_changeset_errors(changeset)}} end end end end subscription do field :comment_added, :comment do arg :post_id, non_null(:id) config fn %{post_id: id} = args, %{context: context} -> {:ok, topic: "post:#{id}"} end trigger :create_comment, topic: fn %{comment: nil} -> [] %{comment: comment} -> "post:#{comment.post_id}" end resolve fn %{comment: comment}, _args, _resolution -> {:ok, comment} end end end end
Trigger from Application Code
The trigger
macro works great as long as the only way to create a comment is through the GraphQL API.
But that might not always be the case.
For example, you might have a separate REST API or traditional Phoenix-based controllers/Live Views that create those comments.
In that case, it is usually better to drop the trigger
from the GraphQL API and publish notifications at a lower level from the application code.
This can be achieved using the Absinthe.Subscription.publish/3
method.
A good place to put this might be inside the Phoenix Context method that creates the comment.
Let’s do that now:
defmodule MyApp.Blog do # ... def create_comment(attrs) do %Comment{} |> Comment.changeset(attrs) |> Repo.insert() |> tap(fn {:ok, comment} -> Absinthe.Subscription.publish(MyAppWeb.Endpoint, comment, comment_created: "post:#{comment.post_id}") _ -> nil end) end end
The name of the subscription field to trigger (comment_updated
) is passed as a key inside the last argument to this function.
The value of that key is the topic
to target.
Note that you use both the trigger
macro and publish/3
.
Just make sure that the code that calls publish
is not being executed from the mutation referenced in the trigger
. Otherwise, the user will receive two messages for a single comment — one from the trigger
macro and another from publish
.
Server-Side Setup with Phoenix PubSub
At the beginning of the post, we discussed that a simple one-way HTTP transport is not well-suited for subscriptions. Let's come back to that topic and see how we can set up the application to support a long-running two-way connection between the application and the client.
The easiest way to do it is using Phoenix PubSub which is present by default in all Phoenix applications.
Let’s install Absinthe’s Phoenix helpers by adding the following dependency in mix.exs
:
{:absinthe_phoenix, "~> 2.0"}
Next, configure the PubSub
and add it to the supervision tree (this might already be configured for you if you generated the application using phx.new
):
# config/config.exs config :my_app, MyAppWeb.Endpoint, pubsub_server: MyApp.PubSub # lib/my_app/application.ex defmodule MyApp.Application do use Application def start(_type, _args) do children = [ # ... # Start the PubSub system {Phoenix.PubSub, name: MyApp.PubSub}, # Absinthe Subscription {Absinthe.Subscription, MyAppWeb.Endpoint}, ] opts = [strategy: :one_for_one, name: MyApp.Supervisor] Supervisor.start_link(children, opts) end end
Note that we also added Absinthe.Subscription
inside the supervision tree which keeps track of all the active subscriptions and takes care of delivering notifications to the subscribed clients.
We pass the name of the endpoint as the only argument, but other configuration options are available — see Absinthe.Subscription.child_spec/1
for full details.
Next, use Absinthe.Phoenix.Endpoint
inside your web application’s endpoint module:
# lib/my_app_web/endpoint.ex defmodule MyAppWeb.Endpoint do use Phoenix.Endpoint, otp_app: :my_app use Absinthe.Phoenix.Endpoint # ... end
It adds some utility methods to the Endpoint module used by Absinthe.Subscription
to publish the results of the subscription.
Finally, use Absinthe.Phoenix.Socket
inside the socket module.
# lib/spendra_web/channels/user_socket.ex defmodule MyAppWeb.UserSocket do use Phoenix.Socket use Absinthe.Phoenix.Socket, schema: MyAppWeb.Schema # ... end
This adds a special channel to the socket - Absinthe.Phoenix.Channel
- that handles communication with the Absinthe.Subscription
server we added to our supervision tree.
I know this is a lot to take in, but Absinthe does a really good job of managing everything internally. We only need the few lines of code we see above for everything to work on the server side.
In the previous post of this series, we saw that we could add a context available to all our queries and mutations.
Since the new requests for subscriptions are now going through the UserSocket
instead of the regular:graphql
pipeline in the schema, we don't have access to the context inside the subscriptions.
To get it back, we need to put it inside the socket during the connect
callback:
defmodule MyAppWeb.UserSocket do use Phoenix.Socket use Absinthe.Phoenix.Socket, schema: MyAppWeb.Schema # ... @impl true def connect(query_params, socket, _connect_info) do case authorize_user(socket, query_params) do {:ok, user} -> {:ok, Absinthe.Phoenix.Socket.put_options(socket, context: %{current_user: user})} {:error, _reason} -> # Reject the connection :error end end defp authorize_user(socket, query_params) do # Fetch the user here (for example, from a token in the query_params) {:ok, user} end end
Client-Side Setup with Apollo
Now that our server is ready to accept connections over WebSocket, let’s set up the client.
Since Apollo is usually the go-to client for client-side GQL usage, I will only cover that in this post.
If you are using Relay, check out Using Absinthe with Relay.
For usage without external clients, check out the @absinthe/socket
package, which provides a way to send subscription requests and observe the results.
If you are new to Apollo, I suggest going through the Introduction to Apollo Client docs before going further because we will use advanced concepts to customize our client.
In a regular setup, Apollo is usually configured to use an HTTP Link that, by default, sends a POST
request to the endpoint whenever a new query/mutation is executed.
We now want to modify it to send the request over a WebSocket connection instead of a regular HTTP request.
First, let’s create a standard Phoenix WebSocket connection:
// assets/js/app.js import { Socket as PhoenixSocket } from "phoenix"; const phoenixSocket = new PhoenixSocket("ws://localhost:4000/socket", { params: () => { const token = "xxx"; // an auth token (e.g. from cookies or another authentication process) return { token }; }, });
We can pass any hash as params
when opening the socket connection.
The above is just an example where we send a token
that can then be used inside MyAppWeb.UserSocket.connect/3
above to authorize a user.
Next, wrap it inside an AbsintheSocket
and create a link for use with the Apollo client:
import * as AbsintheSocket from "@absinthe/socket"; import { createAbsintheSocketLink } from "@absinthe/socket-apollo-link"; const absintheSocket = AbsintheSocket.create(phoenixSocket); const websocketLink = createAbsintheSocketLink(absintheSocket);
Note that you will need to install @absinthe/socket
and @absinthe/socket-apollo-link
with npm
/yarn
.
Finally, we will set up the Apollo client to use this link:
import { ApolloClient, InMemoryCache } from "@apollo/client"; const client = new ApolloClient({ link: websocketLink, cache: new InMemoryCache(), });
While this approach works for all types of documents (queries, mutations, or subscriptions), it sends all of them over the WebSocket. It is usually better to send regular POST requests to the API endpoint in your application for regular queries and mutations and only use WebSocket when creating subscriptions.
This is possible using the Apollo client’s split
function to choose which ApolloLink
to use for each request.
Let’s set it up to only use the websocketLink
for subscriptions and create an HttpLink
for other operations:
import { Socket as PhoenixSocket } from "phoenix"; import * as AbsintheSocket from "@absinthe/socket"; import { createAbsintheSocketLink } from "@absinthe/socket-apollo-link"; import { ApolloClient, HttpLink, InMemoryCache, split } from "@apollo/client"; import { hasSubscription } from "@jumpn/utils-graphql"; // Create the websocket link const phoenixSocket = new PhoenixSocket("ws://localhost:4000/socket", { params: () => { const token = "xxx"; // an auth token (e.g. from cookies or another authentication process) return { token }; }, }); const absintheSocket = AbsintheSocket.create(phoenixSocket); const websocketLink = createAbsintheSocketLink(absintheSocket); // Create http link const httpLink = new HttpLink({ uri: "http://localhost:4000/api" }); // Create a split link const link = split( (operation) => hasSubscription(operation.query), websocketLink, httpLink ); // Create the client const client = new ApolloClient({ link, cache: new InMemoryCache(), });
Here we are using the hasSubscription
helper method from @jumpn/utils-graphql
to check if a query has a subscription. If so, we route it to the websocketLink
and all other queries to httpLink
.
Finally, with this setup, you can start using useSubscription
inside your react component or client.subscribe
outside React components to subscribe for updates. Let's see how to create a React component that shows new comments as they are posted:
// assets/js/components/NewComments.jsx import { gql, useSubscription } from "@apollo/client"; import { useState } from "react"; const COMMENTS_SUBSCRIPTION = gql` subscription ($postId: ID!) { commentAdded(postId: $postId) { id body } } `; export default function NewComments({ postId }) { const [comments, setComments] = useState([]); useSubscription(COMMENTS_SUBSCRIPTION, { variables: { postId }, onData: ({ commentAdded }) => { setComments((existing) => [...existing, commentAdded]); }, }); return ( <div> {comments.map((comment) => ( <div key={comment.id}>{comment.body}</div> ))} </div> ); }
And that's it!
Wrap Up
GraphQL subscriptions provide a great way to add real-time behavior to your app. In this post, we saw how to set up the server and the client to support subscriptions.
Since plain HTTP isn’t well suited for long-running connections with two-way server communication, we also saw how to allow Absinthe on the server and Apollo client to communicate over WebSockets for subscriptions.
Happy coding!
P.S. If you'd like to read Elixir Alchemy posts as soon as they get off the press, subscribe to our Elixir Alchemy newsletter and never miss a single post!