
Communication between processes in the Erlang VM happens through message passing. In fact,
all communication across processes occurs through the same mechanism. All you need is an address and you can send a
message to it, with the lower level building blocks being send
and receive
. This is true even if that address
is on another node in a cluster of Elixir nodes that are aware of each other. In fact, it works in exactly the same way.
There’s no special RPC syntax, no special error handling. In fact, you can write your code without knowing whether a
message is destined for another node or process running on the same node. In an example send(address, message)
the address
could be a local pid, “via tuple”, or a pid from another node. It could be a registered
name combined with a node in a tuple like {node, MyProcess}
. send
will figure out what to do. This is also true
when using the higher level versions GenServer.cast/2
and GenServer.call/3
.
It might not be obvious at first glance, but this is a superpower. Let’s take a simple example of some code that runs on a single node and then upgrade it to share state across a cluster of nodes. To follow along, all you’ll need is Elixir installed.
Note: This blog post is an extended version of an example from my Code BEAM Mexico keynote.
A Basic In-Memory Rate Limiter
defmodule RateLimiter do def check(ip) do Hammer.check_rate(ip, 60_000, 10) end end
In my example, I’m wrapping the Hammer rate limiting library, although you could
use any other library or roll your own. With our RateLimiter
module we can now lock down the endpoints in our
imaginary web app and protect our service. We’re very strict, only allowing 10 requests per minute per IP.
But as we scale up and add more nodes, this basic rate limiting module doesn’t work anymore because it’s not sharing state across the nodes. The same IP is now able to make more than 10 requests per minute, as the requests are randomly assigned to different nodes by the load balancer.
There are, of course, tons of ways of solving this problem. Hammer itself comes with an adapter for Redis that allows sharing state across multiple nodes. Don't get me wrong — Redis is a great piece of software, but you might not need it. Maybe there’s a way to solve this, using Elixir and nothing else.
Interlude: Connecting Nodes
To try this out on your own machine, you’ll need to connect two Elixir nodes together. To get started, assuming you’ve
got Elixir installed locally, you’ll want to start iex
but with a special flag:
iex --name a@127.0.0.1
Then, in a second terminal/tab:
iex --name b@127.0.0.1
Now, in either terminal, type in Node.list
and verify that it’s an empty list, and then
Node.ping(:”x@127.0.0.1”)
replacing x
with the name of the other node — the one you're not currently using. You
should see a :pong
response. Now try typing in Node.list
again and verify that it shows the other node in the
list. That’s it, you’ve made your first cluster!
Note that if you get a :pang
, you may need to run the command epmd
in your terminal first, to start the service
that connects the nodes.
Hash Rings
The first tool we’ll need is a mechanism for associating keys with nodes in a cluster. I’m a big fan of Discord’s HashRing library so we’ll use that, but there are alternative implementations out there. This is based on an ingenious idea from the ‘90s around returning consistent look-up results given the same outputs. Basically, we can add all of our node names to the ring, and when we look up keys we can rely on getting the same result every time (assuming the node names in the ring are the same). Even when they change, we should see minimal changes in our look-ups:
iex(b@127.0.0.1)4> alias ExHashRing.Ring ExHashRing.Ring iex(b@127.0.0.1)5> {:ok, ring} = Ring.start_link() {:ok, #PID<0.279.0>} # The keys you add to the ring can be anything, but for our purposes # we're interested in node names. iex(b@127.0.0.1)6> Ring.add_node(ring, "a@127.0.0.1") {:ok, [{"a@127.0.0.1", 512}]} iex(b@127.0.0.1)7> Ring.add_node(ring, "b@127.0.0.1") {:ok, [{"b@127.0.0.1", 512}, {"a@127.0.0.1", 512}]} iex(b@127.0.0.1)16> Ring.add_node(ring, "c@127.0.0.1") {:ok, [{"c@127.0.0.1", 512}, {"b@127.0.0.1", 512}, {"a@127.0.0.1", 512}]} # Now that they're in there, we can take it for a spin. Let's try checking # some different look ups. iex(b@127.0.0.1)18> Ring.find_node(ring, "result1") {:ok, "c@127.0.0.1"} iex(b@127.0.0.1)19> Ring.find_node(ring, "result2") {:ok, "a@127.0.0.1"} iex(b@127.0.0.1)21> Ring.find_node(ring, "result4") {:ok, "b@127.0.0.1"} # Okay, but what if we remove a node? iex(b@127.0.0.1)22> Ring.remove_node(ring,"c@127.0.0.1") {:ok, [{"b@127.0.0.1", 512}, {"a@127.0.0.1", 512}]} iex(b@127.0.0.1)23> Ring.find_node(ring, "result4") {:ok, "b@127.0.0.1"} iex(b@127.0.0.1)24> Ring.find_node(ring, "result1") {:ok, "a@127.0.0.1"} # As you can see, result4 still results in node b even though the # memberships changed, and result1 has been moved over to a. # But what if we add the node c back? iex(b@127.0.0.1)25> Ring.add_node(ring, "c@127.0.0.1") {:ok, [{"c@127.0.0.1", 512}, {"b@127.0.0.1", 512}, {"a@127.0.0.1", 512}]} iex(b@127.0.0.1)26> Ring.find_node(ring, "result1") {:ok, "c@127.0.0.1"} # result1 is now back on c. It will always go to c, those are the # behaviors we can rely on from the hash ring.
Okay! Now let’s hook into OTP to grab all the node names and add them to the ring, and additionally, use some events to keep the ring up to date as nodes connect and disconnect. You can use this code alongside manually connecting and disconnecting nodes from the terminal like I demonstrated above. However, in a production environment, you may want to use a library like libcluster to manage your cluster more efficiently.
First of all, add a HashRing to your application.ex
like this {ExHashRing.Ring, name: DistributionRing}
. Then add
the following module to your app:
defmodule Distribution do use GenServer require Logger def start_link(opts) do GenServer.start_link(__MODULE__, opts, name: __MODULE__) end @impl GenServer def init(_opts) do :net_kernel.monitor_nodes(true) for node <- [Node.self() | Node.list()] do ExHashRing.Ring.add_node(DistributionRing, node) end {:ok, []} end @impl GenServer def handle_info({:nodeup, node}, state) do Logger.info("Node #{node} connected") ExHashRing.Ring.add_node(DistributionRing, node) {:noreply, state} end def handle_info({:nodedown, node}, state) do Logger.info("Node #{node} disconnected") ExHashRing.Ring.remove_node(DistributionRing, node) {:noreply, state} end end
This is a GenServer responsible for maintaining that hash ring, adding and removing nodes as they connect and
disconnect. Go ahead and try it out right now if you want. You should see
the log messages show up as you add and remove nodes with Node.ping
. Pretty neat.
Now we have everything we need to upgrade our simple rate limiting module to share data across the cluster.
Distributed Rate Limiter
Let’s do this in two steps. First, we need to turn our basic rate limiting module into a GenServer so that there’s a process on each node that can be reached from the other nodes. We’re doing it in two steps to really hammer home the difference between the version that only works locally and the version that returns consistent results across the cluster.
defmodule RateLimiter do use GenServer # Public interface def check(ip) do check_internal(ip) end def start_link(_opts) do GenServer.start_link(__MODULE__, [], name: __MODULE__) end def init(_opts) do {:ok, []} end # Internal implementation def handle_call({:check, ip}, _from, state) do {:reply, check_internal(ip), state} end defp check_internal(ip) do Hammer.check_rate(ip, 60_000, 10) end end
Make sure you add that process to your application children, like {RateLimiter, []}
. Then let’s upgrade:
defmodule RateLimiter do use GenServer require Logger # Public interface def check(ip) do {:ok, node} = ExHashRing.Ring.find_node(DistributionRing, ip) GenServer.call({__MODULE__, node}, {:check, ip}) catch :exit, reason -> Logger.warning("Tried to check rate limit but failed", reason: inspect(reason)) # As a fallback, do a local check. It's better than nothing! check_internal(ip) end def start_link(_opts) do GenServer.start_link(__MODULE__, [], name: __MODULE__) end def init(_opts) do {:ok, []} end # Internal implementation def handle_call({:check, ip}, _from, state) do {:reply, check_internal(ip), state} end defp check_internal(ip) do Hammer.check_rate(ip, 60_000, 10) end end
We’re taking a laissez-faire approach to error handling here overall. This code is just something to get you started,
but I do think it’s worth having that catch
in the check
function. When GenServer.call/3
fails it exits and
crashes the caller, this would, for example, happen where it’s not able to reach the process on the other node. Because
networks are not reliable, this will happen. Fortunately, short interval rate limiting is a use case that doesn’t
demand absolutely perfect results. We can very easily fall back to just checking locally instead, and at worst, a few extra requests may need to be made. As long as the network is mostly stable, we’ll be fine.
So there it is. With those two magical lines, looking up the node and then passing the node to GenServer.call/3
, we took
code that ran fully locally, to code that now transparently routes requests to the proper nodes in the cluster. Given
a specific IP address, the lookup will always happen on the node that is responsible for that IP address, and even if
nodes are going up or down it is likely to be handled by the same node anyway!
A Word of Warning
I do want to emphasize that this isn’t necessarily the “right” tool for every use case. In my opinion, it works well for short-lived rate limiting because it’s ok if we’re a little bit wrong sometimes. It’s also ok that we lose state on app restarts. The rate limit quotas reset every minute anyway!
Wrapping Up
If you’re concerned about routing these rate limit lookups through GenServers, remember that they can easily handle tens of thousands of messages per second. And if you need more than that, you’ll probably need to look at something more robust than this! But for the majority of apps that need multiple nodes, maybe for resiliency and nothing else, this can be a great alternative to Redis or other solutions, helping you keep your stack lean and not worry about maintaining extra services.
To summarize: you get the semantics of just calling code locally, transparent routing across your cluster, and you don’t even need to add another service to maintain. And remember, with 3 nodes, every 3 lookups on average will happen locally, amortizing your overall network latency overhead!
You don’t need to limit yourself to rate limiting either, there are tons of use cases where you can apply this pattern. Why not add some simple ephemeral caches? Or how about some high cardinality metrics/event reporting? Your options are limitless, so why not bring a little bit of distributed Elixir magic into your app.
Happy coding!
P.S. If you'd like to read Elixir Alchemy posts as soon as they get off the press, subscribe to our Elixir Alchemy newsletter and never miss a single post!