ruby

Fibers and Enumerators in Ruby - Turning Blocks Inside Out

Julik Tarkhanov on

Ruby has various ways of performing iteration—loops, blocks and enumerators. Most Ruby programmers are at least familiar with loops and blocks but Enumerator and Fiber often stay in the dark. In this edition of Ruby Magic, guest author Julik shines a light on Enumerable and Fiber to explain flow controlling enumerables and turning blocks inside out.

Suspending Blocks and Chained Iteration

We've discussed Enumerator in a previous edition of Ruby Magic, where we described how to return an Enumerator from your own #each method and what it can be used for. An even broader use case for Enumerator and Fiber is that they can "suspend a block" mid-flight. Not just the block given to #each or the entire call to #each, but any block!

This is a very powerful construct, which can be used to implement shims for methods that work by using blocks as a bridge to callers that expect sequential calls instead of taking a block. For example, imagine we want to open a database handle and read each item that we have retrieved:

db.with_each_row_of_result(sql_stmt) do |row|
yield row
end

The block API is great since it will potentially perform all kinds of cleanup for us when the block is terminated. However, some consumers might want to work with the database in this way:

@cursor = cursor

# later:
row = @cursor.next_row
send_row_to_event_stream(row)

In practice, it means we want to "suspend" the execution of the block "just for now" and carry on later within the block. Thus, the caller takes over the flow control instead of it being in the hands of the callee (the method performing the block).

Chaining Iterators

One of the most common uses of this pattern is chaining multiple iterators together. When we do so, the methods we are used to for iteration (like #each), return an Enumerator object instead, which we can use to "grab" the values that the block sends us using the yield statement:

range = 1..8
each_enum = range.each # => <Enumerator...>

The enumerators can then be chained which allows us to perform operations like "any iteration but with the index". In this example, we're calling #map on a range to get an Enumerable object. We then chain #with_index to iterate over the range with an index:

(1..3).map.with_index {|element_n, index| [element_n, index] }
#=> [[1, 0], [2, 1], [3, 2]]

This can be very useful, especially if your system uses events. Ruby provides a built-in method for wrapping any method with an Enumerator generator, which allows us to accomplish exactly this. Imagine we want to "pull" rows one by one from our with_each_row_of_result, instead of the method yielding them to us.

@cursor = db.to_enum(:with_each_row_of_result, sql_stmt)
schedule_for_later do
begin
row = @cursor.next
send_row_to_event_stream(row)
rescue StopIteration # the block has ended and the cursor is empty, the cleanup has taken place
end
end

If we were to implement this ourselves, this is how it would likely come about:

cursor = Enumerator.new do |yielder|
db.with_each_row_of_result(sql_stmt) do |row|
yielder.yield row
end
end

Turning Blocks Inside Out

Rails allows us to assign the response body to also be an Enumerator. It will call next on the Enumerator we assign as the response body and expect the returned value to be a string—which will be written out into the Rack response. For example, we can return a call to the #each method of a Range as a Rails response body:

class MyController < ApplicationController
def index
response.body = ('a'..'z').each
end
end

This is what I call turning a block inside out. In essence, it is a control flow helper that allows us to "freeze time" in a block (or a loop, which is also a block in Ruby) mid-flight.

However, Enumerators have a limiting property that makes them slightly less useful. Imagine we want to do something like this:

File.open('output.tmp', 'wb') do |f|
# Yield file for writing, continuously
loop { yield(f) }
end

Let's wrap it with an enumerator, and write into it

writer_enum = File.to_enum(:open, 'output.tmp', 'wb')
file = en.next
file << data
file << more_data

Everything works great. However, there is a hitch—how do we tell the enumerator that we are done writing, so that it can "finish" the block, close the file and exit? This will perform a number of important steps—for example, resource cleanup (the file will be closed), as well as ensuring all the buffered writes are flushed to disk. We do have access to the File object, and we can close it ourselves, but we would like the enumerator to manage the closing for us; we have to let the enumerator proceed past the block.

Another hurdle is that sometimes we want to pass arguments of what is happening within the suspended block. Imagine we have a block-accepting method with the following semantics:

write_file_through_encryptor(file_name) do |writable|
writable << "Some data"
writable << "Some more data"
writable << "Even more data"
end

but in our calling code we want to use it like this:

writable = write_file_through_encryptor(file_name)
writable << "Some data"
# ...later on
writable << "Some more data"
writable.finish

Ideally, we would wrap our method call into some structure that would permit us the following trick:

write_file_through_encryptor(file_name) do |writable|
loop do
yield_and_wait_for_next_call(writable)
# Then we somehow break out of this loop to let the block complete
end
end

What if we were to wrap our writes like this?

deferred_writable = write_file_through_encryptor(file_name)
deferred_writable.next("Some data")
deferred_writable.next("Some more data")
deferred_writable.next("Even more data")
deferred_writable.next(:terminate)

In this case, we will use the :terminate as a magic value that will tell our method that it can finish the block and return. This is where Enumerator won't really help us because we can't pass any arguments to Enumerator#next. If we could, we would be able to do:

deferred_writable = write_file_through_encryptor(file_name)
deferred_writable.next("Some data")
...
deferred_writable.next(:terminate)

Enter Ruby's Fibers

This is exactly what Fibers permit. A Fiber allows you to accept arguments on each reentry, so we can implement our wrapper like so:

deferred_writable = Fiber.new do |data_to_write_or_termination|
write_file_through_encryptor(filename) do |f|
# Here we enter the block context of the fiber, reentry will be to the start of this block
loop do
# When we call Fiber.yield our fiber will be suspended—we won't reach the
# "data_to_write_or_termination = " assignment before our fiber gets resumed
data_to_write_or_termination = Fiber.yield
end
end
end

This is how it works: When you first call .resume on your deferred_writable, it enters the fiber and goes all the way to the first Fiber.yield statement or to the end of the outermost Fiber block, whichever comes first. When you call Fiber.yield, it gives you back control. Remember the Enumerator? The block is going to be suspended, and the next time you call .resume, the argument to resume becomes the new data_to_write.

deferred_writes = Fiber.new do |data_to_write|
loop do
$stderr.puts "Received #{data_to_write} to work with" data_to_write = Fiber.yield end end # => #<Fiber:0x007f9f531783e8> deferred_writes.resume("Hello") #=> Received Hello to work with deferred_writes.resume("Goodbye") #=> Received Goodbye to work with  So, within the Fiber, the code flow is started on the first call to Fiber#resume, suspended at the first call to Fiber.yield, and then continued on subsequent calls to Fiber#resume, with the return value of Fiber.yield being the arguments to resume. The code continues running from the point where Fiber.yield was last called. This is a bit of a quirk of Fibers in that the initial arguments to the fiber will be passed to you as the block arguments, not via the return value of Fiber.yield. With that in mind, we know that by passing a special argument to resume, we can decide within the Fiber whether we should stop or not. Let's try that: deferred_writes = Fiber.new do |data_to_write| loop do$stderr.puts "Received #{data_to_write} to work with"
break if data_to_write == :terminate # Break out of the loop, or...
write_to_output(data_to_write)       # ...write to the output
data_to_write = Fiber.yield          # suspend ourselves and wait for the next resume
end
# We end up here if we break out of the loop above. There is no Fiber.yield
# statement anywhere, so the Fiber will terminate and become "dead".
end

deferred_writes.resume("Hello") #=> Received Hello to work with
deferred_writes.resume("Goodbye") #=> Received Goodbye to work with
deferred_writes.resume(:terminate)
deferred_writes.resume("Some more data after close") # FiberError: dead fiber called

There are a number of situations where these facilities can be very useful. Since a Fiber contains a suspended block of code that can be manually resumed, Fibers can be used for implementing event reactors and for dealing with concurrent operations within a single thread. They are lightweight, so you can implement a server using Fibers by assigning a single client to a single Fiber and switching between these Fiber objects as necessary.

client_fiber = Fiber.new do |socket|
loop do
sent_to_client = socket.write_nonblock("OK")
Fiber.yield # Return control back to the caller and wait for it to call 'resume' on us
end
end

client_fibers << client_fiber

# and then in your main webserver loop
client_fibers.each do |client_fiber|
client_fiber.resume # Receive data from the client if any, and send it an OK
end

Ruby has an additional standard library called fiber which allows you to explicitly transfer control from one fiber to another, which can be a bonus facility for these uses.

Controlling Data Emission Rates

Another great use for fibers and enumerators can arise when you want to be able to control the rate at which a Ruby block emits data. For example, in zip_tricks we support the following block use as the primary way of using the library:

ZipTricks::Streamer.open(output_io) do |z|
z.write_deflated_file("big.csv") do |destination|
columns.each do |col|
destination << column
end
end
end

We therefore allow "push" control on the part of the code that creates the ZIP archive, and it is impossible to control how much data it outputs and how often. If we want to write our ZIP in chunks of, say, 5 MB—which would be a limitation on AWS S3 object storage—we would have to create a custom output_io object which would somehow "refuse" to accept << method calls when the segment needs to be split off into an S3 multipart part. We can, however, invert the control and make it "pull". We will still use the same block for writing our big CSV file, but we will be resuming and halting it based on the output it provides. We therefore make the following use possible:

output_enum = ZipTricks::Streamer.output_enum do |z|
z.write_deflated_file("big.csv") do |destination|
columns.each do |col|
destination << column
end
end
end

# At this point nothing has been generated or written yet
enum = output_enum.each # Create an Enumerator
bin_str = enum.next # Let the block generate some binary data and then suspend it
output.write(bin_str) # Our block is suspended and waiting for the next invocation of next

This allows us to control at which rate our ZIP file generator emits data.

Enumerator and Fiber are, therefore, a control flow mechanism for turning "push" blocks into "pull" objects that accept method calls.

There is only one pitfall with Fibers and Enumerators—if you have something like ensure in your block, or something that needs to be done after the block completes, it is now up to the caller to call you enough times. In a way, it is comparable to the constraints you have when using Promises in JavaScript.

Conclusion

This concludes our look into flow-controlled enumerables in Ruby. Along the way, Julik shone light on the similarities and differences between the Enumerable and Fiber classes, and dove into examples where caller determined the flow of data. We’ve also learned about Fiber’s additional magic to allow passing arguments on each block reentry. Happy flow-controlling!

To get a steady dose of magic, subscribe to Ruby Magic and we'll deliver our monthly edition straight to your inbox.