A Deep Dive into Subscriptions with Absinthe

Sapan Diwakar

Sapan Diwakar on

A Deep Dive into Subscriptions with Absinthe

In this series, we've seen how to create GraphQL APIs in Elixir using Absinthe. So far, we have only discussed a one-way communication channel where the client makes the queries or mutations, and the server responds.

GraphQL also supports a long-running subscription between the client and the server where the server can notify the client of events. This can be very useful in multi-user scenarios where many users might interact with the same resource at the same time. For example, in a blogging application, the client can subscribe to the server to receive new comments as they are posted instead of having to poll the server for them.

This is now a two-way communication and the regular HTTP transport is not well suited for it. Let’s see how subscriptions work and how to create them.

Creating Subscriptions in Absinthe for Elixir

Creating subscriptions is very similar to creating query or mutation fields. We just need to create a field inside the subscription do ... end block of our schema.

Let’s create a subscription for observing new comments inside the schema:

# lib/my_app_web/schema.ex defmodule MyAppWeb.Schema do use Absinthe.Schema # ... subscription do field :comment_added, :comment do arg :post_id, non_null(:id) config fn %{post_id: id}, %{context: context} -> {:ok, topic: "post:#{id}"} end end end end

Let’s break this down. We add a field named comment_added inside the subscription block. The return type of this field is comment (defined using the object macro in Absinthe — see An Introduction to Absinthe for details). This field accepts an argument named post_id which is required. So far, everything is similar to what we would do when creating a field in a query.

The config macro is what is new here and it is used to configure the subscription field. This is also where we can perform other checks like authorization to accept/reject the subscription. The function passed to config receives the arguments passed to the field and the Absinthe.Resolution struct that contains information like context. The return value can be an {:ok, topic: topic} tuple for success or {:error, reason} tuple. The topic is what Absinthe uses to identify subscribers who should be notified of an event. We will get to it in a few minutes. But just for completeness, the topic can be a single string or a list containing multiple topics to subscribe to.

Publishing Data to Subscriptions in GraphQL

We have a subscription added to our schema. But even after subscribing, we won’t get back any data because we haven’t triggered any updates yet.

Trigger from Mutations

The easiest way to trigger the subscription is to use the trigger/2 macro inside the subscription field. It accepts the name of a mutation to trigger the subscription and a topic function. Assuming that we have a mutation named create_comment in our schema to create a comment, let’s update our subscription to trigger every time a comment is created.

defmodule MyAppWeb.Schema do use Absinthe.Schema # ... subscription do field :comment_added, :comment do # arg and config ... trigger :create_comment, topic: fn comment -> "post:#{comment.post_id}" end end end end

The function we pass as topic to trigger will be executed with the create_comment mutation result. It should return the name of the topic that this mutation will trigger.

Let’s see this in action. First, user A runs a subscription query like this:

subscription { commentAdded(postId: 1) { id body author { id name } } }

Absinthe records the subscription, but the user doesn’t receive any data straight away.

Now, user B creates a new comment:

mutation { createComment(comment: { postId: 1, body: "nice post!" }) { id } }

User B will receive the mutation response as usual based on the selections:

{ "data": { "createComment": { "id": 1 } } }

Additionally, user A will also receive a response on their subscription. Since they selected many more fields when subscribing, the response will include them all regardless of the selections in the mutation.

{ "data": { "commentAdded": { "id": 1, "body": "nice post!", "author": { "id": 1, "name": "User B" } } } }

Note that with the current setup, user B won't actually receive any updates yet since the subscription wasn't made using a WebSocket transport. We will see how to set that up later in this post.

Translate Mutation Result to Subscription Result

If your mutation returns a different data structure from what the subscription is supposed to return, it is also possible to use a resolve inside the subscription to convert the result first.

For example, assume that instead of returning a comment, the create_comment mutation returns a complex object that also includes validation errors in case of failure. In that case, we can provide a resolver for the comment_added subscription field to fetch the comment before returning the result. We will also need to modify our trigger to create the topic from the response struct.

defmodule MyAppWeb.Schema do use Absinthe.Schema object :comment_mutation_result do field :comment, :comment field :errors, list_of(:string) end mutation do field :create_comment, :comment_mutation_result do arg :comment, non_null(:comment_create_input) resolve fn %{comment: attrs}, _resolution -> case MyAppWeb.Blog.create_comment(attrs) do {:ok, comment} -> {:ok, %{comment: comment, errors: []}} {:error, changeset} -> {:ok, %{comment: nil, errors: translate_changeset_errors(changeset)}} end end end end subscription do field :comment_added, :comment do arg :post_id, non_null(:id) config fn %{post_id: id} = args, %{context: context} -> {:ok, topic: "post:#{id}"} end trigger :create_comment, topic: fn %{comment: nil} -> [] %{comment: comment} -> "post:#{comment.post_id}" end resolve fn %{comment: comment}, _args, _resolution -> {:ok, comment} end end end end

Trigger from Application Code

The trigger macro works great as long as the only way to create a comment is through the GraphQL API. But that might not always be the case. For example, you might have a separate REST API or traditional Phoenix-based controllers/Live Views that create those comments. In that case, it is usually better to drop the trigger from the GraphQL API and publish notifications at a lower level from the application code.

This can be achieved using the Absinthe.Subscription.publish/3 method. A good place to put this might be inside the Phoenix Context method that creates the comment. Let’s do that now:

defmodule MyApp.Blog do # ... def create_comment(attrs) do %Comment{} |> Comment.changeset(attrs) |> Repo.insert() |> tap(fn {:ok, comment} -> Absinthe.Subscription.publish(MyAppWeb.Endpoint, comment, comment_created: "post:#{comment.post_id}") _ -> nil end) end end

The name of the subscription field to trigger (comment_updated) is passed as a key inside the last argument to this function. The value of that key is the topic to target.

Note that you use both the trigger macro and publish/3. Just make sure that the code that calls publish is not being executed from the mutation referenced in the trigger. Otherwise, the user will receive two messages for a single comment — one from the trigger macro and another from publish.

Server-Side Setup with Phoenix PubSub

At the beginning of the post, we discussed that a simple one-way HTTP transport is not well-suited for subscriptions. Let's come back to that topic and see how we can set up the application to support a long-running two-way connection between the application and the client.

The easiest way to do it is using Phoenix PubSub which is present by default in all Phoenix applications. Let’s install Absinthe’s Phoenix helpers by adding the following dependency in mix.exs:

{:absinthe_phoenix, "~> 2.0"}

Next, configure the PubSub and add it to the supervision tree (this might already be configured for you if you generated the application using phx.new):

# config/config.exs config :my_app, MyAppWeb.Endpoint, pubsub_server: MyApp.PubSub # lib/my_app/application.ex defmodule MyApp.Application do use Application def start(_type, _args) do children = [ # ... # Start the PubSub system {Phoenix.PubSub, name: MyApp.PubSub}, # Absinthe Subscription {Absinthe.Subscription, MyAppWeb.Endpoint}, ] opts = [strategy: :one_for_one, name: MyApp.Supervisor] Supervisor.start_link(children, opts) end end

Note that we also added Absinthe.Subscription inside the supervision tree which keeps track of all the active subscriptions and takes care of delivering notifications to the subscribed clients. We pass the name of the endpoint as the only argument, but other configuration options are available — see Absinthe.Subscription.child_spec/1 for full details.

Next, use Absinthe.Phoenix.Endpoint inside your web application’s endpoint module:

# lib/my_app_web/endpoint.ex defmodule MyAppWeb.Endpoint do use Phoenix.Endpoint, otp_app: :my_app use Absinthe.Phoenix.Endpoint # ... end

It adds some utility methods to the Endpoint module used by Absinthe.Subscription to publish the results of the subscription.

Finally, use Absinthe.Phoenix.Socket inside the socket module.

# lib/spendra_web/channels/user_socket.ex defmodule MyAppWeb.UserSocket do use Phoenix.Socket use Absinthe.Phoenix.Socket, schema: MyAppWeb.Schema # ... end

This adds a special channel to the socket - Absinthe.Phoenix.Channel - that handles communication with the Absinthe.Subscription server we added to our supervision tree.

I know this is a lot to take in, but Absinthe does a really good job of managing everything internally. We only need the few lines of code we see above for everything to work on the server side.

In the previous post of this series, we saw that we could add a context available to all our queries and mutations. Since the new requests for subscriptions are now going through the UserSocket instead of the regular:graphql pipeline in the schema, we don't have access to the context inside the subscriptions. To get it back, we need to put it inside the socket during the connect callback:

defmodule MyAppWeb.UserSocket do use Phoenix.Socket use Absinthe.Phoenix.Socket, schema: MyAppWeb.Schema # ... @impl true def connect(query_params, socket, _connect_info) do case authorize_user(socket, query_params) do {:ok, user} -> {:ok, Absinthe.Phoenix.Socket.put_options(socket, context: %{current_user: user})} {:error, _reason} -> # Reject the connection :error end end defp authorize_user(socket, query_params) do # Fetch the user here (for example, from a token in the query_params) {:ok, user} end end

Client-Side Setup with Apollo

Now that our server is ready to accept connections over WebSocket, let’s set up the client. Since Apollo is usually the go-to client for client-side GQL usage, I will only cover that in this post. If you are using Relay, check out Using Absinthe with Relay. For usage without external clients, check out the @absinthe/socket package, which provides a way to send subscription requests and observe the results.

If you are new to Apollo, I suggest going through the Introduction to Apollo Client docs before going further because we will use advanced concepts to customize our client.

In a regular setup, Apollo is usually configured to use an HTTP Link that, by default, sends a POST request to the endpoint whenever a new query/mutation is executed. We now want to modify it to send the request over a WebSocket connection instead of a regular HTTP request.

First, let’s create a standard Phoenix WebSocket connection:

// assets/js/app.js import { Socket as PhoenixSocket } from "phoenix"; const phoenixSocket = new PhoenixSocket("ws://localhost:4000/socket", { params: () => { const token = "xxx"; // an auth token (e.g. from cookies or another authentication process) return { token }; }, });

We can pass any hash as params when opening the socket connection. The above is just an example where we send a token that can then be used inside MyAppWeb.UserSocket.connect/3 above to authorize a user.

Next, wrap it inside an AbsintheSocket and create a link for use with the Apollo client:

import * as AbsintheSocket from "@absinthe/socket"; import { createAbsintheSocketLink } from "@absinthe/socket-apollo-link"; const absintheSocket = AbsintheSocket.create(phoenixSocket); const websocketLink = createAbsintheSocketLink(absintheSocket);

Note that you will need to install @absinthe/socket and @absinthe/socket-apollo-link with npm/yarn.

Finally, we will set up the Apollo client to use this link:

import { ApolloClient, InMemoryCache } from "@apollo/client"; const client = new ApolloClient({ link: websocketLink, cache: new InMemoryCache(), });

While this approach works for all types of documents (queries, mutations, or subscriptions), it sends all of them over the WebSocket. It is usually better to send regular POST requests to the API endpoint in your application for regular queries and mutations and only use WebSocket when creating subscriptions.

This is possible using the Apollo client’s split function to choose which ApolloLink to use for each request. Let’s set it up to only use the websocketLink for subscriptions and create an HttpLink for other operations:

import { Socket as PhoenixSocket } from "phoenix"; import * as AbsintheSocket from "@absinthe/socket"; import { createAbsintheSocketLink } from "@absinthe/socket-apollo-link"; import { ApolloClient, HttpLink, InMemoryCache, split } from "@apollo/client"; import { hasSubscription } from "@jumpn/utils-graphql"; // Create the websocket link const phoenixSocket = new PhoenixSocket("ws://localhost:4000/socket", { params: () => { const token = "xxx"; // an auth token (e.g. from cookies or another authentication process) return { token }; }, }); const absintheSocket = AbsintheSocket.create(phoenixSocket); const websocketLink = createAbsintheSocketLink(absintheSocket); // Create http link const httpLink = new HttpLink({ uri: "http://localhost:4000/api" }); // Create a split link const link = split( (operation) => hasSubscription(operation.query), websocketLink, httpLink ); // Create the client const client = new ApolloClient({ link, cache: new InMemoryCache(), });

Here we are using the hasSubscription helper method from @jumpn/utils-graphql to check if a query has a subscription. If so, we route it to the websocketLink and all other queries to httpLink.

Finally, with this setup, you can start using useSubscription inside your react component or client.subscribe outside React components to subscribe for updates. Let's see how to create a React component that shows new comments as they are posted:

// assets/js/components/NewComments.jsx import { gql, useSubscription } from "@apollo/client"; import { useState } from "react"; const COMMENTS_SUBSCRIPTION = gql` subscription ($postId: ID!) { commentAdded(postId: $postId) { id body } } `; export default function NewComments({ postId }) { const [comments, setComments] = useState([]); useSubscription(COMMENTS_SUBSCRIPTION, { variables: { postId }, onData: ({ commentAdded }) => { setComments((existing) => [...existing, commentAdded]); }, }); return ( <div> {comments.map((comment) => ( <div key={comment.id}>{comment.body}</div> ))} </div> ); }

And that's it!

Wrap Up

GraphQL subscriptions provide a great way to add real-time behavior to your app. In this post, we saw how to set up the server and the client to support subscriptions.

Since plain HTTP isn’t well suited for long-running connections with two-way server communication, we also saw how to allow Absinthe on the server and Apollo client to communicate over WebSockets for subscriptions.

Happy coding!

P.S. If you'd like to read Elixir Alchemy posts as soon as they get off the press, subscribe to our Elixir Alchemy newsletter and never miss a single post!

Sapan Diwakar

Sapan Diwakar

Our guest author Sapan Diwakar is a full-stack developer. He writes about his interests on his blog and is a big fan of keeping things simple, in life and in code. When he’s not working with technology, he loves to spend time in the garden, hiking around forests, and playing outdoor sports.

All articles by Sapan Diwakar

Become our next author!

Find out more

AppSignal monitors your apps

AppSignal provides insights for Ruby, Rails, Elixir, Phoenix, Node.js, Express and many other frameworks and libraries. We are located in beautiful Amsterdam. We love stroopwafels. If you do too, let us know. We might send you some!

Discover AppSignal
AppSignal monitors your apps