Welcome to the last Ruby Magic article in our series about concurrency. In the previous editions we implemented a chat server using multiple processes and multiple threads. This time we're going to do the same thing using an event loop.
Recap
We're going to use the same client and the same server setup we used in the earlier articles. Our aim is to build a chat system that looks like this:
Please see the previous articles for more details on the basic setup. The full source code that is used in the examples in this article is available on GitHub, so you can experiment with it yourself.
Chat server using an event loop
Using an event loop for our chat server requires you to have a different mental model than using threads or processes. In the classic approach, a thread or process is responsible for handling a single connection. Using an event loop you have a single thread in a single process that handles multiple connections. Let's see how this works by breaking it down.
Event loop
An event loop used by EventMachine or NodeJS for example works as follows. We start with informing the operating system we're interested in certain events. For example, when a connection to a socket is opened. We do this by calling a function that registers interest on some IO object, such as a connection or socket.
When something happens on this IO object, the operating system sends an event to our program. We put these events on a queue. The event loop keeps popping events off the list and handles them one by one.
In a sense an event loop is not truly concurrent. It works sequentially in very small batches to simulate the effect.
To register interest and have the operating system pass IO events to us we'd have to write a C extension, as there is no API present for that in the Ruby standard library. Diving into that is outside of the scope of this article, so we're going to use IO.select
instead to generate events. IO.select
takes an array of IO
objects to monitor. It waits until one or more of the objects from the array are ready for reading or writing, and it returns an array with just those IO
objects.
The code that takes care of everything related to a connection is implemented as a Fiber
: we'll call this code the "handler" from now on. A Fiber
is a code block that can be paused and resumed. The Ruby VM doesn't do this automatically, so we have to resume and yield manually. We'll use the input from IO.select
to inform our handlers when their connections are ready for reading or writing.
Like in the threaded and multi-process examples from the previous posts, we need some storage to keep track of the clients and the messages that are sent. We don't need a Mutex
this time. Our event loop is running in a single thread, so there is no risk of objects being mutated at the same time by different threads.
client_handlers = {} messages = []
The client handler is implemented in the following Fiber
. When the socket can be read from or written to, an event is triggered to which the Fiber
responds. When the state is :readable
it reads a line from the socket and pushes this onto the messages
array. When the state is :writable
it writes any messages that have been received from other clients since the last write to the client. After handling an event it calls Fiber.yield
, so it will pause and wait for the next event.
def create_client_handler(nickname, socket) Fiber.new do last_write = Time.now loop do state = Fiber.yield if state == :readable # Read a message from the socket incoming = read_line_from(socket) # All good, add it to the list to write $messages.push( :time => Time.now, :nickname => nickname, :text => incoming ) elsif state == :writable # Write messages to the socket get_messages_to_send(last_write, nickname, $messages).each do |message| socket.puts "#{message[:nickname]}: #{message[:text]}" end last_write = Time.now end end end end
So how do we trigger the Fiber
to read or write at the right time when the Socket
is ready? We use an event loop that has four steps:
loop do # Step 1: Accept incoming connections accept_incoming_connections # Step 2: Get connections that are ready for reading or writing get_ready_connections # Step 3: Read from readable connections read_from_readable_connections # Step 4: Write to writable connections write_to_writable_connections end
Notice that there is no magic here. This is a normal Ruby loop.
Step 1: Accept incoming connections
See if we have any new incoming connections. We use accept_nonblock
, which will not wait for a client to connect. It will instead raise an error if there is no new client, and if that error occurs we catch it and go to the next step. If there is a new client we create the handler for it and put that on the clients
store. We'll use the socket object as the key of that Hash
so we can find the client handler later.
begin socket = server.accept_nonblock nickname = socket.gets.chomp $client_handlers[socket] = create_client_handler(nickname, socket) puts "Accepted connection from #{nickname}" rescue IO::WaitReadable, Errno::EINTR # No new incoming connections at the moment end
Step 2: Get connections that are ready for reading or writing
Next, we ask the OS to inform us when a connection is ready. We pass in the keys of the client_handlers
store for reading, writing and error handling. These keys are the socket objects we accepted in step 1. We wait for 10 milliseconds for this to happen.
readable, writable = IO.select( $client_handlers.keys, $client_handlers.keys, $client_handlers.keys, 0.01 )
Step 3: Read from readable connections
If any of our connections are readable, we'll trigger the client handlers and resume them with a readable
state. We can look up these client handlers because the Socket
object that is returned by IO.select
is used as the key of the handlers store.
if readable readable.each do |ready_socket| # Get the client from storage client = $client_handlers[ready_socket] client.resume(:readable) end end
Step 4: Write to writable connections
If any of our connections are writable, we'll trigger the client handlers and resume them with a writable
state.
if writable writable.each do |ready_socket| # Get the client from storage client = $client_handlers[ready_socket] next unless client client.resume(:writable) end end
By using these four steps in a loop which creates handlers, and calling readable
and writable
on these handlers at the right time, we have created a fully functional evented chat server. There's very little overhead per connection, and we could scale this up to a large number of concurrent clients.
This approach works very well as long as we keep the amount of work per tick of the loop small. This is especially important for work that involves calculations, since an event loop runs in a single thread and thus can only utilize a single CPU. In production systems there are often multiple processes running an event loop to work around this limitation.
Concluding
After all this you might ask, which of these three methods should I use?
- For most apps, threading makes sense. It's the simplest approach to work with.
- If you run highly concurrent apps with long-running streams, event loops allow you to scale.
- If you expect your processes to crash, go for good old multi-process, as it's the most robust approach.
This concludes our series on concurrency. If you want a full recap check the original mastering concurrency article as well as the detailed articles on using multiple processes and multiple threads.