Phoenix and Elixir are designed at their core to build real-time, fault-tolerant applications. With its elegant syntax and the robustness of the Erlang VM, Elixir is an ideal candidate for tackling the challenges of distributed state management.
This two-part series will guide Phoenix/Elixir developers through the intricacies of working with Phoenix in a distributed setup.
The focus of this first post will be on GenServers — the key components behind state management in distributed systems. We will explore how GenServers can be leveraged to maintain state across nodes in a Phoenix application, ensuring data consistency and fault tolerance. We’ll break down the complexities of GenServers and distributed state management into manageable and understandable segments.
Overview of Our Phoenix Application
Let's focus on distributed state management in the context of a CRUD API. Imagine you have a popular API service built with Phoenix that handles a large number of requests from various clients. To prevent abuse and ensure fair usage, you want to implement a rate limiter that restricts the number of requests a client can make within a specific timeframe, such as 100 requests per minute.
You can implement this simply using the "token bucket" strategy. In this strategy, a bucket is maintained for each client that is filled with tokens at a constant rate. Each API call consumes a single token from the bucket and is blocked if there are no tokens available.
In the next section, we will set up a token bucket for a single-node scenario using a GenServer.
A Single-Node Rate Limiter Implementation Using GenServer for Elixir
GenServers are a cornerstone within the Elixir ecosystem for managing state and encapsulating functionality. They are the go-to structure for maintaining state within an application because they can hold state throughout their lifecycle and be interacted with asynchronously.
defmodule MyApp.TokenBucketRateLimiter do use GenServer # tokens per second @rate 1 # maximum tokens in the bucket @bucket_size 10 def start_link([]), do: GenServer.start_link(__MODULE__, nil, name: __MODULE__) @impl true def init(nil), do: {:ok, %{buckets: %{}}} def allow?(client_id) do GenServer.call(__MODULE__, {:allow?, client_id}) end @impl true def handle_call({:allow?, client_id}, _from, state) do current_time = System.monotonic_time(:second) {reply, updated_bucket} = state.buckets |> get_or_init_bucket(client_id) |> update_bucket(current_time) |> maybe_consume_token(current_time) state = put_in(state, [:buckets, client_id], updated_bucket) {:reply, reply, state} end defp maybe_consume_token(%{tokens: tokens} = bucket, current_time) when tokens >= 1 do updated_bucket = %{bucket | tokens: tokens - 1, last_checked: current_time} {true, updated_bucket} end defp maybe_consume_token(bucket, _current_time), do: {false, bucket} defp get_or_init_bucket(buckets, client_id) do Map.get(buckets, client_id, %{ tokens: @bucket_size, last_checked: current_time }) end defp update_bucket(bucket, current_time) do # Add tokens based on the last time the bucket was checked time_passed = current_time - bucket.last_checked new_tokens = time_passed * @rate updated_tokens = min(bucket.tokens + new_tokens, @bucket_size) %{bucket | tokens: updated_tokens, last_checked: current_time} end end
This works well in a single-node setup because the GenServer holds the token state for the entire application across web requests. We can add it to the application's supervision tree to start the API rate limiter automatically:
defmodule MyApp.Application do use Application @impl true def start(_type, _args) do children = [ # Other Children... MyApp.TokenBucketRateLimiter ] opts = [strategy: :one_for_one, name: MyApp.Supervisor] Supervisor.start_link(children, opts) end # ... end
The first time allow?
is called with a new client ID, it initializes the bucket to 10 tokens, and then
each allow?
with the same client ID consumes one token per request.
Eventually, the eleventh request in the same second is rejected because there are no more tokens.
If the client tries again after a while, the requests are allowed again.
iex> 1..11 |> Enum.map(fn _i -> MyApp.TokenBucketRateLimiter.allow?("some client id") end) [true, true, true, true, true, true, true, true, true, true, false] iex> Process.sleep(1000) :ok iex> MyApp.TokenBucketRateLimiter.allow?("some client id") true
But as the app scales and we deploy it across multiple nodes (e.g., in a Kubernetes cluster or across different regions), we face the challenge of coordinating rate limiting across all nodes.
Each node needs to be aware of the request counts from other nodes to enforce the rate limit consistently. Without a distributed solution, each node would only track requests it directly handles, leading to inconsistent rate limiting.
You might think that maintaining request limits in memory is not feasible at a scale that requires multiple nodes. But since each client's state is just a single integer for the token size and a single integer for the timestamp, this doesn't require a large amount of memory (2 * 64 bytes per client). Even if there are 100k concurrent clients, less than 15MB of in-memory state is required.
Understanding GenServers in a Distributed Elixir Environment
Distributed systems pose unique challenges, particularly when it comes to state management. Network partitions, node failures, and latency are just a few issues that can disrupt state consistency across nodes. Ensuring that all nodes have the correct state or can recover it when things go wrong is critical for a system's reliability.
GenServers can be distributed across nodes in a cluster, allowing them to share state and handle requests from different parts of a system. By leveraging the built-in distribution mechanisms of the BEAM VM, GenServers can communicate across nodes with the same ease as within a single node.
In the next section, we will use some strategies to support rate limiting across multiple nodes in the cluster.
Implementing Distributed GenServers Using a Single Global Process
The simplest strategy to support a rate limiter across a whole cluster is to start a single global process that manages the limits.
This is simple to do using Erlang's :global
name registration.
In the single node setup above, we register the GenServer
under the name MyApp.TokenBucketRateLimiter
(__MODULE__
contains the name of the current module).
This registers the process locally on the node.
It is also possible to register it globally using a {:global, name}
tuple when calling GenServer.start_link/3
.
This registers the process under the given name globally across the whole cluster.
Note that since we update the process name in start_link
, we also have to update it when using any call
, cast
, and friends.
Let's update the code to do this:
defmodule MyApp.TokenBucketRateLimiter do def start_link([]) do GenServer.start_link(__MODULE__, nil, name: {:global, __MODULE__}) end def allow?(client_id) do GenServer.call({:global, __MODULE__}, {:allow?, client_id}) end end
This enforces that only a single process with the global name MyApp.TokenBucketRateLimiter
can exist across the cluster.
However, this is insufficient, as it will prevent all other application nodes from joining our cluster because the supervision tree cannot start all its processes.
Let's try it out by starting a few nodes in the cluster.
- The first node starts fine:
elixir
➜ iex --name node1@127.0.0.1 --cookie asdf -S mix iex(node1@127.0.0.1)1>
- But as soon as we start a second node and connect it to the cluster, the application crashes:
elixir
➜ iex --name node2@127.0.0.1 --cookie asdf -S mix iex(node2@127.0.0.1)1> Node.connect(:"node1@127.0.0.1") [notice] Application my_app exited: shutdown
We need to make sure that the start_link
function of the rate limiter doesn't generate any errors when a process already exists on another node.
To do this, we can check and modify the start_link
function to return :ignore
when the process already exists:
def start_link([]) do tuple = {:global, __MODULE__} case GenServer.whereis(tuple) do nil -> GenServer.start_link(__MODULE__, nil, name: tuple) _pid -> :ignore end end
Once that's done, we can start multiple nodes in the cluster, and they will all ping a central rate limiter instance to allow or deny incoming requests. The central rate limiter will be the GenServer process that happens to start first, i.e., the first node in the cluster to come up.
While this works, there are several issues with this approach:
- The node is a single point of failure now. If the node goes down, the rate limiter will be unreachable until another node starts it. Even worse, this is not handled automatically, so we might be left with no rate limiter process unless a new node is started.
- The state is on a single node. This means that if the rate limiter process is terminated/restarted (either due to a programming error or node failure), the state is lost.
- All nodes in the cluster have to make a call to the single node on every API call.
Using Multiple Processes Across the Cluster
In order to provide better fault tolerance when managing state across distributed nodes, we can use a combination of techniques, including:
- State partitioning: Dividing the state into partitions so that each GenServer instance handles a subset of the data.
- Replication: Replicating state across nodes to provide redundancy and increase fault tolerance.
- Conflict resolution: Implementing strategies to handle state conflicts, such as using version vectors or CRDTs (Conflict-free Replicated Data Types).
Let's see how we can use CRDTs to provide super-fast data access alongside eventual consistency and data replication guarantees.
We will use the Elixir library DeltaCrdt
to manage shared state across multiple nodes in the cluster.
Note: In addition to
DeltaCrdt
, other libraries likeHorde
andSwarm
can help you to coordinate processes and state across several nodes in the cluster.
Let's add it to mix.exs
to get started:
defmodule MyApp.MixProject do defp deps do [ # ... {:delta_crdt, "~> 0.6.3"} ] end end
DeltaCrdt
provides a simple-to-use API for managing a map of data that is guaranteed to be conflict-free (across distributed usage) and replicated across multiple nodes.
For conflict resolution, it provides AWLWWMap
(that stands for 'Add-Wins Last-Write-Wins Map) so that in the case of a conflict, the last write always wins.
The API is simple — you start multiple DeltaCrdt
processes (which can be on the same node or multiple nodes) and create a cluster with them using set_neighbours/2
:
{:ok, crdt1} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap) {:ok, crdt2} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap) DeltaCrdt.set_neighbours(crdt1, [crdt2]) DeltaCrdt.read(crdt1) %{}
Once neighbours are set, adding any data to one CRDT automatically syncs with the second one.
DeltaCrdt.put(crdt1, "foo", "bar") DeltaCrdt.read(crdt2) %{"foo" => "bar"}
Starting A One Rate Limiter Per Node
With this out of the way, we can update our implementation of the rate limiter to use DeltaCrdt
to sync states.
First, we need to make sure all rate limiter processes are discoverable globally, but with unique names.
One way to do that is to use the node name when naming the process:
defmodule MyApp.TokenBucketRateLimiter do use GenServer def start_link([]), do: GenServer.start_link(__MODULE__, nil, name: global_tuple()) defp global_tuple(node \\ Node.self()), do: {:global, to_string(node) <> to_string(__MODULE__)} end
Syncing GenServer State With CRDT
Now that all nodes start their own copy of the rate limiter, we need to sync the state.
Let's do that by starting a DeltaCrdt
process from the GenServer
and using the CRDT to make decisions about the rate limits:
defmodule MyApp.TokenBucketRateLimiter do use GenServer @impl true def init(nil) do {:ok, crdt} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, sync_interval: 100) {:ok, %{crdt: crdt}} end def allow?(client_id), do: GenServer.call(global_tuple(), {:allow?, client_id}) @impl true def handle_call({:allow?, client_id}, _from, state) do current_time = System.os_time(:second) {reply, updated_bucket} = state.crdt |> get_or_init_bucket(client_id, current_time) |> update_bucket(current_time) |> maybe_consume_token(current_time) # Update the CRDT with the new bucket state DeltaCrdt.put(state.crdt, client_id, updated_bucket) {:reply, reply, state} end defp get_or_init_bucket(crdt, client_id, current_time) do case DeltaCrdt.get(crdt, client_id) do nil -> default_bucket(current_time) bucket -> bucket end end defp default_bucket(time), do: %{tokens: @bucket_size, last_checked: time} end
The major changes here from the single-node approach are:
- Instead of
Map.get
from the localGenServer
state, we useDeltaCrdt.get
to get the key from the CRDT instead. This fetches the data from the local copy of the CRDT which is very fast to access and guarantees eventual consistency. - Similar to the above, we use
DeltaCrdt.put
to update the shared state instead of the localGenServer
state when updating the tokens count. This is again a fast operation that updates the local state — but with a guarantee that the changes will be eventually synced with all processes in the cluster.
Clustering the GenServers
While we are almost there, there is one last step left.
The CRDTs are currently local and do not sync with other nodes in the GenServer.
We need to use the set_neighbours/2
API from DeltaCRDT
to set up a sync.
Let's create a new function named update_neighbours
inside the GenServer that does this:
defp update_neighbours(crdt) do neighbours = Node.list() |> Enum.map(fn node -> node |> global_tuple() |> GenServer.whereis() |> :sys.get_state() |> Map.get(:crdt) end) DeltaCrdt.set_neighbours(crdt, neighbours) end
Now all we need to do is call this function when the GenServer
starts (from init
or a handle_continue
callback).
The above piece of code takes care of updating CRDT neighbors when a new process starts.
But set_neighbours
is a one-sided operation (i.e., it sets up put
s from the CRDT to its neighbors, but not the other way around).
To ensure servers that have already started also update their neighbors, we must call update_neighbours
on all cluster processes as soon as any service starts.
Let's add a new refresh_peer_neighbours
function to do that (which should then be called after the initialization of the GenServer
).
defp refresh_peer_neighbours() do Node.list() |> Enum.each(fn node -> node |> global_tuple() |> GenServer.whereis() |> Process.send(pid, :refresh_neighbours, []) end) end @impl true def handle_info(:refresh_neighbours, state) do update_neighbours(state.crdt) {:noreply, state} end
Handling Cluster Changes
With all this in place, there is still one final piece of the puzzle. What happens when nodes are added or removed from the cluster?
To update the CRDT neighborhood on cluster-level changes, we can use :net_kernel.monitor_nodes(true)
from Erlang. This monitors the nodes and refreshes the neighbors on any nodeup
or nodedown
events:
@impl true def init(nil) do {:ok, crdt} = DeltaCrdt.start_link(DeltaCrdt.AWLWWMap, sync_interval: 100) # Monitor for addition of new nodes :net_kernel.monitor_nodes(true) {:ok, %{crdt: crdt}, {:continue, nil}} end @impl true def handle_info({:nodeup, _node}, state) do Process.send_after(self(), :refresh_neighbours, 1_000) {:noreply, state} end def handle_info({:nodedown, _node}, state) do Process.send_after(self(), :refresh_neighbours, 1_000) {:noreply, state} end
This finally brings us to the end of our implementation. There are some edge cases to consider:
- Avoiding deadlocks when refreshing neighbours (since we need to fetch the PID of the CRDTs from all nodes).
- Handling cases where the
RateLimiter
does not exist on a node (e.g., when it is being restarted).
Here is the full implementation. Let's take it for a spin:
# Node 1 ➜ iex --name node1@127.0.0.1 --cookie asdf -S mix iex(node1@127.0.0.1)1> # Node2 ➜ iex --name node2@127.0.0.1 --cookie asdf -S mix iex(node2@127.0.0.1)1> Node.connect(:"node1@127.0.0.1") true iex(node2@127.0.0.1)2> MyApp.TokenBucketRateLimiter.allow?("some client id") true iex(node2@127.0.0.1)3> MyApp.TokenBucketRateLimiter.crdt() |> DeltaCrdt.get("some client id") %{tokens: 9, last_checked: 1725358705} # Back on Node1 iex(node1@127.0.0.1)1> MyApp.TokenBucketRateLimiter.crdt() |> DeltaCrdt.get("some client id") %{tokens: 9, last_checked: 1725358705}
And that's it for part one of this series!
Wrapping Up
In this first part of our Distributed Phoenix series, we've taken an in-depth look at how GenServers and Delta CRDTs can be leveraged to manage distributed state in a Phoenix application. We've implemented a token bucket rate limiter across multiple nodes, ensuring consistency and fault tolerance even in a distributed setup. This foundational knowledge is crucial for building scalable, resilient systems that can handle high traffic and maintain data integrity across multiple servers.
In part two, we'll dive deeper into advanced techniques for optimizing distributed systems, including strategies for deploying and scaling distributed applications.
Stay tuned for more insights and practical examples that will help you master the art of distributed applications with Phoenix and Elixir.
P.S. If you'd like to read Elixir Alchemy posts as soon as they get off the press, subscribe to our Elixir Alchemy newsletter and never miss a single post!