Logo of AppSignal

Menu

Ruby Magic

Concurrency Deep Dive: Event loops

Thijs Cadier on

Welcome to the last Ruby Magic article in our series about concurrency. In the previous editions we implemented a chat server using multiple processes and multiple threads. This time we’re going to do the same thing using an event loop.

Recap

We’re going to use the same client and the same server setup we used in the earlier articles. Our aim is to build a chat system that looks like this:

Chat example

Please see the previous articles for more details on the basic setup. The full source code that is used in the examples in this article is available on GitHub, so you can experiment with it yourself.

Chat server using an event loop

Using an event loop for our chat server requires you to have a different mental model than using threads or processes. In the classic approach, a thread or process is responsible for handling a single connection. Using an event loop you have a single thread in a single process that handles multiple connections. Let’s see how this works by breaking it down.

Event loop

An event loop used by EventMachine or NodeJS for example works as follows. We start with informing the operating system we’re interested in certain events. For example, when a connection to a socket is opened. We do this by calling a function that registers interest on some IO object, such as a connection or socket.

When something happens on this IO object, the operating system sends an event to our program. We put these events on a queue. The event loop keeps popping events off the list and handles them one by one.

In a sense an event loop is not truly concurrent. It works sequentially in very small batches to simulate the effect.

To register interest and have the operating system pass IO events to us we’d have to write a C extension, as there is no API present for that in the Ruby standard library. Diving into that is outside of the scope of this article, so we’re going to use IO.select instead to generate events. IO.select takes an array of IO objects to monitor. It waits until one or more of the objects from the array are ready for reading or writing, and it returns an array with just those IO objects.

The code that takes care of everything related to a connection is implemented as a Fiber: we’ll call this code the “handler” from now on. A Fiber is a code block that can be paused and resumed. The Ruby VM doesn’t do this automatically, so we have to resume and yield manually. We’ll use the input from IO.select to inform our handlers when their connections are ready for reading or writing.

Like in the threaded and multi-process examples from the previous posts, we need some storage to keep track of the clients and the messages that are sent. We don’t need a Mutex this time. Our event loop is running in a single thread, so there is no risk of objects being mutated at the same time by different threads.

1
2
client_handlers = {}
messages = []

The client handler is implemented in the following Fiber. When the socket can be read from or written to, an event is triggered to which the Fiber responds. When the state is :readable it reads a line from the socket and pushes this onto the messages array. When the state is :writable it writes any messages that have been received from other clients since the last write to the client. After handling an event it calls Fiber.yield, so it will pause and wait for the next event.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
def create_client_handler(nickname, socket)
  Fiber.new do
    last_write = Time.now
    loop do
      state = Fiber.yield

      if state == :readable
        # Read a message from the socket
        incoming = read_line_from(socket)
        # All good, add it to the list to write
        $messages.push(
          :time => Time.now,
          :nickname => nickname,
          :text => incoming
        )
      elsif state == :writable
        # Write messages to the socket
        get_messages_to_send(last_write, nickname, $messages).each do |message|
          socket.puts "#{message[:nickname]}: #{message[:text]}"
        end
        last_write = Time.now
      end
    end
  end
end

So how do we trigger the Fiber to read or write at the right time when the Socket is ready? We use an event loop that has four steps:

1
2
3
4
5
6
7
8
9
10
11
12
13
loop do
  # Step 1: Accept incoming connections
  accept_incoming_connections

  # Step 2: Get connections that are ready for reading or writing
  get_ready_connections

  # Step 3: Read from readable connections
  read_from_readable_connections

  # Step 4: Write to writable connections
  write_to_writable_connections
end

Notice that there is no magic here. This is a normal Ruby loop.

Step 1: Accept incoming connections

See if we have any new incoming connections. We use accept_nonblock, which will not wait for a client to connect. It will instead raise an error if there is no new client, and if that error occurs we catch it and go to the next step. If there is a new client we create the handler for it and put that on the clients store. We’ll use the socket object as the key of that Hash so we can find the client handler later.

1
2
3
4
5
6
7
8
begin
  socket = server.accept_nonblock
  nickname = socket.gets.chomp
  $client_handlers[socket] = create_client_handler(nickname, socket)
  puts "Accepted connection from #{nickname}"
rescue IO::WaitReadable, Errno::EINTR
  # No new incoming connections at the moment
end

Step 2: Get connections that are ready for reading or writing

Next, we ask the OS to inform us when a connection is ready. We pass in the keys of the client_handlers store for reading, writing and error handling. These keys are the socket objects we accepted in step 1. We wait for 10 milliseconds for this to happen.

1
2
3
4
5
6
readable, writable = IO.select(
  $client_handlers.keys,
  $client_handlers.keys,
  $client_handlers.keys,
  0.01
)

Step 3: Read from readable connections

If any of our connections are readable, we’ll trigger the client handlers and resume them with a readable state. We can look up these client handlers because the Socket object that is returned by IO.select is used as the key of the handlers store.

1
2
3
4
5
6
7
8
if readable
  readable.each do |ready_socket|
    # Get the client from storage
    client = $client_handlers[ready_socket]

    client.resume(:readable)
  end
end

Step 4: Write to writable connections

If any of our connections are writable, we’ll trigger the client handlers and resume them with a writable state.

1
2
3
4
5
6
7
8
9
if writable
  writable.each do |ready_socket|
    # Get the client from storage
    client = $client_handlers[ready_socket]
    next unless client

    client.resume(:writable)
  end
end

By using these four steps in a loop which creates handlers, and calling readable and writable on these handlers at the right time, we have created a fully functional evented chat server. There’s very little overhead per connection, and we could scale this up to a large number of concurrent clients.

This approach works very well as long as we keep the amount of work per tick of the loop small. This is especially important for work that involves calculations, since an event loop runs in a single thread and thus can only utilize a single CPU. In production systems there are often multiple processes running an event loop to work around this limitation.

Concluding

After all this you might ask, which of these three methods should I use?

This concludes our series on concurrency. If you want a full recap check the original mastering concurrency article as well as the detailed articles on using multiple processes and multiple threads.

Latest Ruby Magic articles (see all)

Go back

Subscribe to

Ruby Magic

Magicians never share their secrets. But we do. Sign up for our Ruby Magic email series and receive deep insights about garbage collection, memory allocation, concurrency and much more.