Rails Threading Gains

Introduction

Here at Cloud 66 we do loads of work with external resources (ie. user's servers; generic infrastructure components like Load Balancers, Hardware firewalls, Disks etc). And often times we need to operate on these components as part of a synchronous workflow.

Consider the following (extremely simplified) workflow:

1) Create a 2 new cloud servers (somewhere)  
2) Wait for them to be created  
3) Start an arbitrary process on both of them  

Now, normally we would use sidekiq to perform our background processing, however, in workflow scenarios like the above where we want to synchronously wait on some operations to be performed in parallel then sidekiq doesn't quite fit the bill (we end up with a proliferation of jobs and batches that become very complex as the number of parallel operation in the same workflow increase). So, in this case, we'll roll our own.

What about the GIL

The one thing these operations all have in common is that they are all I/O bound processes (and NOT compute bound). I/O bound processes essentially means that our processes are not CPU calculation intensive but rather are spending time waiting on some other resource - in this case, network comms from our cloud provider. This is important thanks to our ever-present Global Interpreter Lock (GIL) (ps. check out this really good read on the GIL). By focusing on I/O bound processes we can benefit from concurrency performance gains in Ruby.

Implementation

The basic implementation resolves around a few core concepts:
1. We use our own lambda context object to define our lambda methods and store our run results after operation
2. We use Ruby native Thread Local Storage (TLS) to store our operation states and run results during operation
3. We use a new ActiveRecord db connections for each thread operation. This ensures that we don't illegally re-use or close active db connections

Class: LambdaContext

First we need to define a class called LambdaContext which serves simply to store our lambda definition and run-results:

class LambdaContext  
 attr_accessor :lambda_definition,
  :parameters,   
  :stdout,
  :stderr,
  :stderr_backtrace,
  :exit_code,
  :executed

 def initialize(lambda_definition = nil, *args)
  self.lambda_definition = lambda_definition
  self.parameters = args
  self.executed = false
 end
end  

Now we can simply create a collection of these lambda_context objects to run in parallel later on. From our sample workflow (assuming of course that the method create_server with parameter server_index exists):

lambda_contexts = []  
2.times do |idx|  
 lambda_def = lambda {|svr_idx| create_server(svr_idx) }
 lambda_contexts << LambdaContext.new(lambda_def, idx)
end  

Note: we are passing in our svr_idx argument ourselves (instead of defining the lambda directly with the value of idx). In the above example it makes little difference, but in a case where the value of idx could change before the lamdba executed (reference value) we would want to be able use the reference value we originally wanted.

Class: ThreadRunner

We can now define a class to perform the operations in parallel using Rubies native Thread class implementation:

class ThreadRunner  
 attr_accessor :lambda_contexts

 def initialize
  @lambda_contexts = []
 end

 # executes and returns true or false if success or fail
 def execute
  # define the threads container
  threads = []
  # loop through the lambdas
  @lambda_contexts.each do |lambda_context|
   # create a thread and start
   threads << run_thread(lambda_context)
  end
  # join all the threads remaining and capture the output
  return join_thread_batch(threads)
 end

 # returns errors after execution is complete
 def after_execution_get_full_errors
  errors =  @lambda_contexts.select do |lc| 
   !lc.stderr_backtrace.blank?
  end            
  errors.map do |lc| 
    "EXCEPTION: #{lc.stderr} AT #{lc.stderr_backtrace}"
  end
 end

 private

 # executes the lambda definition and stores the results 
 # in TLS for retrieval later
 def run_thread(lambda_context)
  lc = lambda_context
  lc.executed = true
  thread = Thread.new do
   with_connection do
    begin
     params = lc.parameters
     if params.blank?
      Thread.current[:stdout] = lc.lambda_definition.call
     else
      Thread.current[:stdout] = lc.lambda_definition.call(*params)
     end
     Thread.current[:exit_code] = 0
    rescue => exc
     Thread.current[:stderr] = exc.message
     Thread.current[:exit_code] = -1
     Thread.current[:stderr_backtrace] = exc.backtrace.join("\n")
    end
   end
  end
  return thread
 end 

 # ensures that connections are handled in the case of 
 # Ruby Timeouts (evil)
 def with_connection(&block)
  ActiveRecord::Base.connection_pool.with_connection do
   yield block
  end
 rescue Timeout::Error => exc
  ActiveRecord::Base.clear_active_connections!
  ActiveRecord::Base.connection.close
  raise exc
 end

 # joins all threads in the existing batch and stores their 
 # results against the lambda_contexts collection
 def join_thread_batch(threads)
  all_ok = true
  threads.each_with_index do |thread, index|
   lambda_context = @lambda_contexts[index]
   thread.join
   lambda_context.stdout = thread[:stdout]
   lambda_context.stderr = thread[:stderr]
   lambda_context.stderr_backtrace = thread[:stderr_backtrace]
   lambda_context.exit_code = thread[:exit_code]
   all_ok = lambda_context.stderr.nil? if all_ok
  end
  return all_ok
 end
end  

Example Usage

Now, using the classes above for our workflow as defined above we can simply do the following:

# create thread_runner object
thread_runner = ThreadRunner.new

# add lambdas to the thread_runner
2.times do |idx|  
 # define the method lambda
 lambda_def = lambda {|svr_idx| create_server(svr_idx) 
 thread_runner.lambda_contexts << LambdaContext.new(lambda_def, idx)
end

# execute all lambdas in parallel
# and wait for them to complete
all_ok = thread_runner.execute  
# spit out errors if any failed
unless all_ok  
 # do something
 puts thread_runner.after_execution_get_full_errors 
end

... profit!

Conclusion

In this blog post, we looked at a relatively simple multithreading solution in Ruby to provide significant performance improvements in running I/O bound operations in parallel specifically when we need to join on their results and continue. Next time we'll take a look at some improvements we can make to this class around limiting concurrency and using a thread pool.

Happy coding!



Try Cloud 66 for Free, No credit card required