
Cowboy is a small, fast, and modern HTTP server for Erlang/OTP. It particularly shines when handling multiple concurrent connections with minimal overhead. This makes it a perfect choice for building lightweight, real-time streaming services.
In this article, we’ll build a tiny real-time text pub/sub server using Cowboy. Our service supports HTTP-based publishing, allowing clients to subscribe using either WebSockets or Server-Sent Events (SSE). Think of it as a minimal message bus over the web, great for prototypes, internal dashboards, collaborative editors, or IoT telemetry applications.
Not only will you learn how to set up and use Cowboy, but you’ll also gain insights into designing a simple yet effective pub/sub architecture. By the end of this article, you’ll have a solid foundation to build upon for more complex real-time applications.
Why Cowboy for Elixir?
Phoenix is the go-to web framework in Elixir, but sometimes you don’t need the full stack. Cowboy shines when:
- You want tight control over connections and protocols
- You’re building a focused service (e.g., streaming telemetry or real-time feeds)
- You want to avoid framework overhead and keep things lean
Cowboy’s design is close to the metal and as barebones as it gets. It doesn’t come with routing helpers or request processing pipelines—but that’s a feature, not a bug. You get raw access to HTTP and WebSocket layers, with no abstraction in your way. Perfect for protocol gateways, edge servers, or streaming APIs.
In contrast, Phoenix is fantastic for full-featured web apps with rich templating, channels, and LiveView. However, if your application's main responsibility is pushing data to clients with minimal latency, Cowboy’s simplicity can improve performance and reduce resource usage.
As mentioned in the introduction, we will build a simple text pub/sub server that supports publishing and subscribing via HTTP, WebSockets, and Server-Sent Events (SSE), which is a great fit for Cowboy’s strengths.
Project Setup and Bootstrapping Cowboy
Let's get started by creating a new Mix project and adding Cowboy as a dependency.
mix new text_bus --sup cd text_bus
This will create a new Elixir project with a supervision tree. Next, open mix.exs and add Cowboy and Jason (for JSON encoding) to the dependencies:
defp deps do [ {:cowboy, "~> 2.10"}, {:jason, "~> 1.4"} ] end
Fetch the dependencies:
mix deps.get
With our dependencies in place, we can set up Cowboy to start an HTTP listener on port 4000. The first step is to create a module to start the Cowboy server. Create a new file at lib/text_bus/http_server.ex:
defmodule TextBus.HTTPServer do use GenServer def start_link(opts), do: GenServer.start_link(__MODULE__, opts, name: __MODULE__) def init(_opts) do dispatch = :cowboy_router.compile([ {:_, [ {"/health", TextBus.Handlers.Health, []} ]} ]) {:ok, _} = :cowboy.start_clear( :http, [port: 4000], %{env: %{dispatch: dispatch}} ) {:ok, %{}} end end
While this code might look a bit complex, here’s a breakdown:
- We define a
TextBus.HTTPServermodule that usesGenServerto manage the Cowboy server lifecycle. - In the
init/1function, we set up a simple router with a health check endpoint at/health. - We start the Cowboy server on port 4000, passing in our routing configuration.
- Finally, we return the initial server state, an empty map.
Next, we need to make sure the application starts the Cowboy server. Open lib/text_bus/application.ex and modify the start/2 function to include our TextBus.HTTPServer module:
defmodule TextBus.Application do use Application def start(_type, _args) do children = [ TextBus.HTTPServer ] Supervisor.start_link(children, strategy: :one_for_one, name: TextBus.Supervisor) end end
Now let's create the health check handler in lib/text_bus/handlers/health.ex:
defmodule TextBus.Handlers.Health do @behaviour :cowboy_handler def init(req, state) do {:ok, req} = :cowboy_req.reply(200, %{"content-type" => "text/plain"}, "OK", req) {:ok, req, state} end end
This handler returns a 200 OK response with the text "OK" as our health check. We can make sure our Cowboy scaffold is working by starting the application:
mix run --no-halt
And then using curl to hit the health endpoint:
curl -i http://localhost:4000/health
If everything is set up correctly, you should see a 200 OK response with the text "OK". Something like:
❯ curl -i http://localhost:4000/health HTTP/1.1 200 OK content-length: 2 content-type: text/plain date: Thu, 18 Sep 2025 13:08:21 GMT server: Cowboy OK⏎
Before we start the implementation, let's quickly outline what a pub/sub server does:
- Clients can publish messages to a topic via HTTP POST requests.
- Clients can subscribe to topics via WebSockets or Server-Sent Events (SSE) to receive real-time updates.
- The server maintains a buffer of recent messages for each topic to allow new subscribers to catch up.
Now, let's move on to the implementation.
Topic Model, Publish Flow, and Subscriptions
Let's define a Topic module to manage individual topics. Each topic will be a GenServer that maintains a ring buffer of messages and tracks subscribers. Create a new file at lib/text_bus/topic.ex:
defmodule TextBus.Topic do use GenServer @buffer_size 100 # Client API def start_link(name), do: GenServer.start_link(__MODULE__, name, name: via(name)) def publish(topic, msg), do: GenServer.cast(via(topic), {:publish, msg}) def subscribe(topic, pid), do: GenServer.cast(via(topic), {:subscribe, pid}) def unsubscribe(topic, pid), do: GenServer.cast(via(topic), {:unsubscribe, pid}) def replay(topic, n), do: GenServer.call(via(topic), {:replay, n}) defp via(name), do: {:via, Registry, {TextBus.TopicRegistry, name}} ## Server def init(name) do # Unnamed table: first arg can be any atom; result is a tid we keep in state. table = :ets.new(:topic_messages, [:ordered_set, :protected]) {:ok, %{name: name, seq: 0, table: table, subs: MapSet.new()}} end def handle_cast({:publish, msg}, state) do seq = state.seq + 1 :ets.insert(state.table, {seq, msg}) Enum.each(state.subs, fn pid -> send(pid, {:message, state.name, seq, msg}) end) {:noreply, %{state | seq: seq}} end def handle_cast({:subscribe, pid}, state), do: {:noreply, %{state | subs: MapSet.put(state.subs, pid)}} def handle_cast({:unsubscribe, pid}, state), do: {:noreply, %{state | subs: MapSet.delete(state.subs, pid)}} def handle_call({:replay, n}, _from, state) do msgs = :ets.tab2list(state.table) |> Enum.sort_by(&elem(&1, 0)) |> Enum.take(-n) {:reply, msgs, state} end end
This Topic module does the following:
- It uses
GenServerto manage the state of each topic. - It maintains an ETS table as a ring buffer to store messages, allowing efficient storage and retrieval.
- It tracks subscribers using a
MapSetto avoid duplicates. - It provides functions to publish messages, subscribe/unsubscribe clients, and replay recent messages.
We will also need to set up a registry for our topics. Create a new file at lib/text_bus/topic_sup.ex:
defmodule TextBus.TopicSup do use DynamicSupervisor def start_link(_), do: DynamicSupervisor.start_link(__MODULE__, :ok, name: __MODULE__) def init(:ok), do: DynamicSupervisor.init(strategy: :one_for_one) def start_topic(name) do child_spec = {TextBus.Topic, name} DynamicSupervisor.start_child(__MODULE__, child_spec) end end
Now, let's add a registry and the topic supervisor to our application supervision tree. Update lib/text_bus/application.ex:
defmodule TextBus.Application do use Application def start(_type, _args) do children = [ {Registry, keys: :unique, name: TextBus.TopicRegistry}, TextBus.TopicSup, TextBus.HTTPServer ] Supervisor.start_link(children, strategy: :one_for_one, name: TextBus.Supervisor) end end
The {Registry, keys: :unique, name: TextBus.TopicRegistry}, line sets up an in-memory registry to keep track of our topics. Keep in mind that this is just for our tutorial; in production, you will want to use something with more persistence.
Now that we have a working topic model, we can implement the publish endpoint. Create a new handler at lib/text_bus/handlers/publish.ex:
defmodule TextBus.Handlers.Publish do @behaviour :cowboy_handler def init(req, state) do topic = :cowboy_req.binding(:topic, req) {:ok, body, req} = :cowboy_req.read_body(req) case Jason.decode(body) do {:ok, %{"message" => msg}} -> ensure_topic(topic) TextBus.Topic.publish(topic, msg) req = :cowboy_req.stream_reply(202, %{"content-type" => "application/json"}, req) :ok = :cowboy_req.stream_body(~s({"status":"accepted"}), :fin, req) {:ok, req, state} _ -> {:ok, req} = :cowboy_req.reply(400, %{}, "invalid", req) {:ok, req, state} end end defp ensure_topic(name) do case Registry.lookup(TextBus.TopicRegistry, name) do [] -> TextBus.TopicSup.start_topic(name) _ -> :ok end end end
Next, add the publish route to our Cowboy router in lib/text_bus/http_server.ex:
dispatch = :cowboy_router.compile([ {:_, [ {"/health", TextBus.Handlers.Health, []}, {"/publish/:topic", TextBus.Handlers.Publish, []}, ]} ])
The next component is the SSE endpoint. This endpoint allows clients to subscribe to topics and receive real-time updates. SSE is dead simple: send "content-type" => "text/event-stream", keep the connection open, and push lines like data: hello.
Create a new handler at lib/text_bus/handlers/sse.ex:
defmodule TextBus.Handlers.SSE do @behaviour :cowboy_handler def init(req, state) do topic = :cowboy_req.binding(:topic, req) ensure_topic(topic) TextBus.Topic.subscribe(topic, self()) req = :cowboy_req.stream_reply(200, %{ "content-type" => "text/event-stream", "cache-control" => "no-cache", "connection" => "keep-alive" }, req) loop(req, topic) {:ok, req, state} end defp loop(req, topic) do receive do {:message, ^topic, seq, msg} -> frame = "id: #{seq}\ndata: #{msg}\n\n" :ok = :cowboy_req.stream_body(frame, :nofin, req) loop(req, topic) {:cowboy_req, :terminate} -> TextBus.Topic.unsubscribe(topic, self()) :ok end end defp ensure_topic(name) do case Registry.lookup(TextBus.TopicRegistry, name) do [] -> TextBus.TopicSup.start_topic(name) _ -> :ok end end end
Our next handler will be for WebSocket subscriptions. WebSockets provide a full-duplex communication channel over a single TCP connection, making them ideal for real-time applications.
Create a new handler at lib/text_bus/handlers/websocket.ex:
defmodule TextBus.Handlers.WS do @behaviour :cowboy_websocket def init(req, state) do topic = :cowboy_req.binding(:topic, req) ensure_topic(topic) {:cowboy_websocket, req, %{topic: topic}} end def websocket_init(state) do TextBus.Topic.subscribe(state.topic, self()) {:ok, state} end def websocket_handle({:text, msg}, state) do TextBus.Topic.publish(state.topic, msg) {:ok, state} end def websocket_info({:message, _topic, seq, msg}, state) do {:reply, {:text, Jason.encode!(%{id: seq, data: msg})}, state} end def terminate(_reason, _req, state) do TextBus.Topic.unsubscribe(state.topic, self()) :ok end defp ensure_topic(name) do case Registry.lookup(TextBus.TopicRegistry, name) do [] -> TextBus.TopicSup.start_topic(name) _ -> :ok end end end
Finally, add the WebSocket route to our Cowboy router in lib/text_bus/http_server.ex:
dispatch = :cowboy_router.compile([ {:_, [ {"/health", TextBus.Handlers.Health, []}, {"/publish/:topic", TextBus.Handlers.Publish, []}, {"/sse/:topic", TextBus.Handlers.SSE, []}, {"/ws/:topic", TextBus.Handlers.WS, []} ]} ])
Let's test our progress so far by starting the application again:
mix run --no-halt
Start by subscribing to a topic:
curl -N http://localhost:4000/sse/demo
And then in another terminal, using curl to publish a message:
curl -X POST -d '{"message":"Hello, World!"}' http://localhost:4000/publish/demo
If everything is working correctly, you should see the message appear in the SSE subscriber terminal.
curl -N http://localhost:4000/sse/demo id: 1 data: Hello, World!
Replay, Stats, and Testing
Alright, although we technically have a working pub/sub server, it's only fully real-time. This means that if you are connected when a message is published, you will receive it. If you connect after a message is published, though, you will miss it. To solve this, we will add a replay endpoint that allows clients to request the last N messages for a topic.
Previously, we ensured that messages were stored in ETS (see TextBus.Topic). Now we will create a new handler to expose a replay endpoint. Create a new file at lib/text_bus/handlers/replay.ex:
defmodule TextBus.Handlers.Replay do @behaviour :cowboy_handler def init(req, state) do topic = :cowboy_req.binding(:topic, req) n = parse_n(:cowboy_req.qs_val("n", req, "10")) msgs = case Registry.lookup(TextBus.TopicRegistry, topic) do [] -> [] _ -> TextBus.Topic.replay(topic, n) end body = Jason.encode!(Enum.map(msgs, fn {id, msg} -> %{id: id, data: msg} end)) {:ok, req} = :cowboy_req.reply(200, %{"content-type" => "application/json"}, body, req) {:ok, req, state} end defp parse_n(val) do case Integer.parse(val) do {n, _} when n > 0 -> n _ -> 10 end end end
Don't forget to add the replay route to our Cowboy router in lib/text_bus/http_server.ex:
{"/replay/:topic", TextBus.Handlers.Replay, []}
Like we did before, we can test our progress so far by starting the application again and using curl:
# Start SSE subscription curl -N http://localhost:4000/sse/demo # In another shell, publish a few messages curl -XPOST http://localhost:4000/publish/demo \ -H "content-type: application/json" \ -d '{"message":"hello"}' curl -XPOST http://localhost:4000/publish/demo \ -H "content-type: application/json" \ -d '{"message":"world"}' # Replay last 2 messages curl "http://localhost:4000/replay/demo?n=2" # => [{"id":1,"data":"hello"},{"id":2,"data":"world"}]
The next and final component that we will add is a stats endpoint. This endpoint will provide some basic introspection into our server, such as the number of topics, subscribers, and messages in buffers. Create a new handler at lib/text_bus/handlers/stats.ex:
defmodule TextBus.Handlers.Stats do @behaviour :cowboy_handler def init(req, state) do topics = Registry.select(TextBus.TopicRegistry, [{{:"$1", :_, :_}, [], [:"$1"]}]) stats = Enum.map(topics, fn topic -> [{pid, _}] = Registry.lookup(TextBus.TopicRegistry, topic) %{topic: topic, subscribers: subscriber_count(pid)} end) body = Jason.encode!(%{topics: stats, total_topics: length(stats)}) {:ok, req} = :cowboy_req.reply(200, %{"content-type" => "application/json"}, body, req) {:ok, req, state} end defp subscriber_count(pid) do # Reach into process state (simplest for demo purposes) case :sys.get_state(pid) do %{subs: subs} -> MapSet.size(subs) _ -> 0 end end end
Let's add the stats route to our Cowboy router in lib/text_bus/http_server.ex:
{"/stats", TextBus.Handlers.Stats, []}
Now let's test our progress so far by starting the application again and using curl:
# Start SSE subscription curl -N http://localhost:4000/sse/demo # In another shell, publish a few messages curl -XPOST http://localhost:4000/publish/demo \ -H "content-type: application/json" \ -d '{"message":"hello"}' curl -XPOST http://localhost:4000/publish/demo \ -H "content-type: application/json" \ -d '{"message":"world"}' # Replay last 2 messages curl "http://localhost:4000/replay/demo?n=2" # => [{"id":1,"data":"hello"},{"id":2,"data":"world"}] # Stats endpoint curl http://localhost:4000/stats # => {"status":"accepted"}{"status":"accepted"}[{"data":"hello","id":1},{"data":"world","id":2}]{"topics":[{"topic":"demo","subscribers":1}],"total_topics":1}
When to Use Cowboy vs. Phoenix for Elixir
At this point, we have a basic but fully functional pub/sub server built with Cowboy. You may wonder when to choose Cowboy over Phoenix for your projects.
Cowboy vs. Phoenix is not an either/or decision. They serve different purposes:
Cowboy Shines When…
- You need full control over the protocol. Cowboy gives you raw access to HTTP, WebSockets, and even HTTP/2 without any framework abstractions in your way.
- You’re building focused services. Things like telemetry pipelines, IoT data ingestion services, internal dashboards, or custom protocol gateways often just need a way to push data around quickly, not templating engines or an ORM.
- You want to minimize overhead. Cowboy itself is small. No view layers, no router DSL, no middleware stack. That means fewer moving parts, less memory, and less latency.
- You care about lightweight deployment. For sidecars, edge servers, or single-purpose services, starting from Cowboy keeps the footprint minimal.
Phoenix Makes More Sense When…
- You’re building a full web application. If you need HTML templates, session management, static file serving, and a router DSL, Phoenix saves you a mountain of boilerplate.
- You need higher-level abstractions. Phoenix Channels, Presence, and LiveView remove the need to roll your own pub/sub, replay, and state management.
- You’re working in a team. Phoenix has a common structure, clear conventions, and rich documentation, which makes onboarding new developers easier.
- You need built-in scaling tools. Phoenix PubSub is cluster-aware out of the box, so you don’t have to hand-roll distributed registries or message propagation.
Nothing illustrates this point better than the fact that Phoenix uses Cowboy under the hood. Here is how that works:
- Cowboy is the HTTP/WebSocket server. It handles sockets, parses requests, and manages connections.
- Plug is a middleware abstraction (think “Rack” in Ruby or “Express” in Node). It defines a standard way to build request/response pipelines.
- Phoenix builds on Plug, adding routing, controllers, views, channels, LiveView, etc. Plug itself runs inside Cowboy.
Having a better understanding of Cowboy’s strengths will serve you well if you are working on the Elixir ecosystem. Plenty of production systems use Cowboy under the hood, for example:
- RabbitMQ management & Web STOMP plugin – RabbitMQ (written in Erlang) uses Cowboy for its HTTP APIs, the management console, and WebSocket/STOMP support.
- Elixir Language Server (ElixirLS) – the LSP server for editors like VS Code runs Cowboy directly to expose JSON-RPC endpoints over HTTP.
- VerneMQ – a high-performance MQTT broker built in Erlang; its HTTP status and plugin endpoints are served through Cowboy.
Wrapping Up
To recap, we built a lightweight real-time pub/sub server directly on top of Cowboy. Along the way, you learned how to:
- Set up Cowboy as a supervised OTP process and wire it into a simple Elixir app.
- Define custom handlers for HTTP, SSE, and WebSocket endpoints.
- Model topics as processes with an ETS-backed buffer for replay.
- Expose introspection endpoints like
/replayand/statsfor debugging and observability. - Test the system end-to-end using
curl,wscat, and a browser-based SSE client.
The result is a minimal but functional message bus: publishers post messages to a topic, subscribers receive them instantly, and operators can replay history or check system health. It’s a great foundation for prototypes, internal dashboards, IoT telemetry, or anywhere you need real-time streaming without the weight of a full framework.
Happy coding!
Wondering what you can do next?
Finished this article? Here are a few more things you can do:
- Share this article on social media
Most popular Elixir articles

A Complete Guide to Phoenix for Elixir Monitoring with AppSignal
Let's set up monitoring and error reporting for a Phoenix application using AppSignal.
See more
Enhancing Your Elixir Codebase with Gleam
Let's look at the benefits of using Gleam and then add Gleam code to an Elixir project.
See more
Using Dependency Injection in Elixir
Dependency injection can prove useful in Elixir. In this first part of a two-part series, we'll look at some basic concepts, core principles, and types of dependency injection.
See more

Allan MacGregor
Guest author Allan is a software engineer and entrepreneur based in Canada. He has 15 years of industry experience, and is passionate about functional programming and Elixir development.
All articles by Allan MacGregorBecome our next author!
AppSignal monitors your apps
AppSignal provides insights for Ruby, Rails, Elixir, Phoenix, Node.js, Express and many other frameworks and libraries. We are located in beautiful Amsterdam. We love stroopwafels. If you do too, let us know. We might send you some!

