Rails Threading Gains (Part 2)

Introduction

Last time we talked about a set of simple Ruby classes that can be used in your Rails application to perform operation in parallel on separate worker threads (Part 1). The classes handle re-joining when the workflow is complete as well taking care of ActiveRecord connections for you.

Today we'll discuss some small improvements we can apply to our other classes - specifically around the use of a ThreadPool (and the resulting inherent concurrency limiting).

Why use a ThreadPool?

The allocation (creation) of threads is not a free operation. Whenever a new thread is allocated, your system needs to create a whole bunch of memory structures internally (about ~50K for each thread for the stack copy). Then, when your thread is complete your system need to garbage collect those memory structures. These operations consume valuable time and memory resources on your system - and depending on your frequency usage could have a real impact.

To get around this we allocate pool(s) of X threads in advance - and re-use those threads whenever we need to. Whenever an operation is required, we pick a thread from the pool and use it to perform the operation and return the result.

What about Concurrency Limiting for a Task?

Well, now that you have a set of ThreadPools (each of which having as many threads as you like) - by simply allocating tasks to that specific ThreadPool you automatically rate-limit the concurrent jobs to the number of threads in that pool. For instance - want 100 jobs to run 3 at at time? Then simply allocated them to a ThreadPool of size 3!

Implementation

The basic implementation resolves around a few core concepts:
1. We add a simple ThreadPool class
2. We re-use the Classes from the last blog in this series. NOTE: that we'll make some changes to the ThreadRunner class!

Class: ThreadPool

First we need to define a class called ThreadPool which encapsulates our ThreadPool logic.

require 'thread'  
class ThreadPool  
 # mutex instance creation
 INSTANCE_MUTEX = Mutex.new

 # a pool of 15 threads
 def self.pool_of_15
  # initialize thread pool
  INSTANCE_MUTEX.synchronize do 
   @pool_of_15 ||= MultiThreading::ThreadPool.new(pool_size: 15)}
  end
 end

 # a pool of 3 threads
 def self.pool_of_3
  # initialize thread pool
  INSTANCE_MUTEX.synchronize do 
   @pool_of_3 ||= MultiThreading::ThreadPool.new(pool_size: 3)
  end
 end

 def initialize(pool_size:)
  @pool_size = pool_size
  @operations = Queue.new
  @thread_pool = Array.new(pool_size) do
   Thread.new do
    catch(:exit) do
     loop do
      block, lambda_context = @operations.pop
      block.call(lambda_context)
     end 
    end
   end
  end
 end

 def schedule(lambda_context, &block)
  lambda_context.exit_code = nil
  @operations << [block, lambda_context]
 end

 def shutdown
   @pool_size.times {schedule(nil) {throw :exit}}
   @thread_pool.map(&:join)
 end
end  
Class: ThreadRunner

We now update our previous ThreadRunner class:

 attr_accessor :lambda_contexts

 def initialize(lambda_contexts: [])
  @lambda_contexts = lambda_contexts
 end

 # executes and returns true or false if success or fail
 def execute(thread_pool: ThreadPool.pool_of_15)
  @lambda_contexts.each do |lambda_context|
   # enqueue the job in the thread pool
   thread_pool.schedule(lambda_context) do |context|
    with_connection do
     begin
      params = context.parameters
      context.stdout = context.lambda_definition.call(*params)
      context.exit_code = 0
     rescue => exc
      context.stderr = exc.message
      context.exit_code = -1
      context.stderr_backtrace = exc.backtrace.join("\n")
     end
    end
   end
  end
  # now we wait for all lambda_contexts to complete
  # lets run a tight loop here for now
  while @lambda_contexts.any? {|lambda_context| lambda_context.exit_code.nil?}
   # sleep for an arbitrary length
   # we can also introduce a timeout here
   sleep(0.02)
  end
  # return exit codes for all our operations
  @lambda_contexts.all? {|lambda_context| lambda_context.exit_code == 0}
 end

 # returns errors after execution is complete
 def after_execution_get_full_errors
  errors =  @lambda_contexts.select do |lc| 
   !lc.stderr_backtrace.blank?
  end            
  errors.map do |lc| 
    "EXCEPTION: #{lc.stderr} AT #{lc.stderr_backtrace}"
  end
 end

 private

 # ensures that connections are handled in the case of 
 # Ruby Timeouts (evil)
 def with_connection(&block)
  ActiveRecord::Base.connection_pool.with_connection do
   yield block
  end
 rescue Timeout::Error => exc
  ActiveRecord::Base.clear_active_connections!
  ActiveRecord::Base.connection.close
  raise exc
 end 
end  

Example Usage

Now, using the classes above for our workflow as defined above we can simply do the following:

# create thread_runner object
thread_runner = ThreadRunner.new

# add lambdas to the thread_runner
2.times do |idx|  
 # define the method lambda
 lambda_def = lambda {|svr_idx| create_server(svr_idx) 
 thread_runner.lambda_contexts << LambdaContext.new(lambda_def, idx)
end

# execute all lambdas in parallel
# and wait for them to complete (with max of 3 concurrency)
all_ok = thread_runner.execute(thread_pool: ThreadPool.pool_of_3)  
# OR wait for them to complete (with max of 15 concurrency)
all_ok = thread_runner.execute(thread_pool: ThreadPool.pool_of_15)  
# spit out errors if any failed
unless all_ok  
 # do something
 puts thread_runner.after_execution_get_full_errors 
end

... profit!

Conclusion

In this blog post, we looked at an enhancement to our previous multithreading solution in Ruby to provide the use of a ThreadPool and concurrency limiting.

Now go code!



Try Cloud 66 for Free, No credit card required