Introduction
Last time we talked about a set of simple Ruby classes that can be used in your Rails application to perform operation in parallel on separate worker threads, Part 1. The classes handle re-joining when the workflow is complete as well taking care of ActiveRecord connections for you.
Today we'll discuss some small improvements we can apply to our other classes - specifically around the use of a ThreadPool (and the resulting inherent concurrency limiting).
Why use a ThreadPool?
The allocation (creation) of threads is not a free operation. Whenever a new thread is allocated, your system needs to create a whole bunch of memory structures internally (about ~50K for each thread for the stack copy). Then, when your thread is complete your system need to garbage collect those memory structures. These operations consume valuable time and memory resources on your system - and depending on your frequency usage could have a real impact.
To get around this we allocate pool(s) of X threads in advance - and re-use those threads whenever we need to. Whenever an operation is required, we pick a thread from the pool and use it to perform the operation and return the result.
What about Concurrency Limiting for a Task?
Well, now that you have a set of ThreadPools (each of which having as many threads as you like) - by simply allocating tasks to that specific ThreadPool you automatically rate-limit the concurrent jobs to the number of threads in that pool. For instance - want 100 jobs to run 3 at at time? Then simply allocated them to a ThreadPool of size 3!
Implementation
The basic implementation resolves around a few core concepts:
- We add a simple ThreadPool class
- We re-use [the Classes](LINK TO PREVIOUS POST) from the last blog in this series. NOTE: that we'll make some changes to the ThreadRunner class!
Class: ThreadPool
First we need to define a class called ThreadPool
which encapsulates our ThreadPool logic.
require 'thread'
class ThreadPool
# mutex instance creation
INSTANCE_MUTEX = Mutex.new
# a pool of 15 threads
def self.pool_of_15
# initialize thread pool
INSTANCE_MUTEX.synchronize do
@pool_of_15 ||= MultiThreading::ThreadPool.new(pool_size: 15)}
end
end
# a pool of 3 threads
def self.pool_of_3
# initialize thread pool
INSTANCE_MUTEX.synchronize do
@pool_of_3 ||= MultiThreading::ThreadPool.new(pool_size: 3)
end
end
def initialize(pool_size:)
@pool_size = pool_size
@operations = Queue.new
@thread_pool = Array.new(pool_size) do
Thread.new do
catch(:exit) do
loop do
block, lambda_context = @operations.pop
block.call(lambda_context)
end
end
end
end
end
def schedule(lambda_context, &block)
lambda_context.exit_code = nil
@operations << [block, lambda_context]
end
def shutdown
@pool_size.times {schedule(nil) {throw :exit}}
@thread_pool.map(&:join)
end
end
Class: ThreadRunner
We now update our previous ThreadRunner
class:
attr_accessor :lambda_contexts
def initialize(lambda_contexts: [])
@lambda_contexts = lambda_contexts
end
# executes and returns true or false if success or fail
def execute(thread_pool: ThreadPool.pool_of_15)
@lambda_contexts.each do |lambda_context|
# enqueue the job in the thread pool
thread_pool.schedule(lambda_context) do |context|
with_connection do
begin
params = context.parameters
context.stdout = context.lambda_definition.call(*params)
context.exit_code = 0
rescue => exc
context.stderr = exc.message
context.exit_code = -1
context.stderr_backtrace = exc.backtrace.join("\n")
end
end
end
end
# now we wait for all lambda_contexts to complete
# lets run a tight loop here for now
while @lambda_contexts.any? {|lambda_context| lambda_context.exit_code.nil?}
# sleep for an arbitrary length
# we can also introduce a timeout here
sleep(0.02)
end
# return exit codes for all our operations
@lambda_contexts.all? {|lambda_context| lambda_context.exit_code == 0}
end
# returns errors after execution is complete
def after_execution_get_full_errors
errors = @lambda_contexts.select do |lc|
!lc.stderr_backtrace.blank?
end
errors.map do |lc|
"EXCEPTION: #{lc.stderr} AT #{lc.stderr_backtrace}"
end
end
private
# ensures that connections are handled in the case of
# Ruby Timeouts (evil)
def with_connection(&block)
ActiveRecord::Base.connection_pool.with_connection do
yield block
end
rescue Timeout::Error => exc
ActiveRecord::Base.clear_active_connections!
ActiveRecord::Base.connection.close
raise exc
end
end
Example Usage
Now, using the classes above for our workflow as defined above we can simply do the following:
# create thread_runner object
thread_runner = ThreadRunner.new
# add lambdas to the thread_runner
2.times do |idx|
# define the method lambda
lambda_def = lambda {|svr_idx| create_server(svr_idx)
thread_runner.lambda_contexts << LambdaContext.new(lambda_def, idx)
end
# execute all lambdas in parallel
# and wait for them to complete (with max of 3 concurrency)
all_ok = thread_runner.execute(thread_pool: ThreadPool.pool_of_3)
# OR wait for them to complete (with max of 15 concurrency)
all_ok = thread_runner.execute(thread_pool: ThreadPool.pool_of_15)
# spit out errors if any failed
unless all_ok
# do something
puts thread_runner.after_execution_get_full_errors
end
... profit!
Conclusion
In this blog post, we looked at an enhancement to our previous multithreading solution in Ruby to provide the use of a ThreadPool and concurrency limiting.
Now go code!
- Check out part I - 'Ruby Mutex Mayhem'.
- Also, read part I 'Rails Threading Gains'