← All Articles

Rails Threading Gains (Part 2)

Vic van GoolVic van Gool
Oct 10th 17Updated Jul 6th 22
Ruby on Rails

rails-threading-gains-part-2

Introduction

Last time we talked about a set of simple Ruby classes that can be used in your Rails application to perform operation in parallel on separate worker threads, Part 1. The classes handle re-joining when the workflow is complete as well taking care of ActiveRecord connections for you.

Today we'll discuss some small improvements we can apply to our other classes - specifically around the use of a ThreadPool (and the resulting inherent concurrency limiting).

Why use a ThreadPool?

The allocation (creation) of threads is not a free operation. Whenever a new thread is allocated, your system needs to create a whole bunch of memory structures internally (about ~50K for each thread for the stack copy). Then, when your thread is complete your system need to garbage collect those memory structures. These operations consume valuable time and memory resources on your system - and depending on your frequency usage could have a real impact.

To get around this we allocate pool(s) of X threads in advance - and re-use those threads whenever we need to. Whenever an operation is required, we pick a thread from the pool and use it to perform the operation and return the result.

What about Concurrency Limiting for a Task?

Well, now that you have a set of ThreadPools (each of which having as many threads as you like) - by simply allocating tasks to that specific ThreadPool you automatically rate-limit the concurrent jobs to the number of threads in that pool. For instance - want 100 jobs to run 3 at at time? Then simply allocated them to a ThreadPool of size 3!

Implementation

The basic implementation resolves around a few core concepts:

  1. We add a simple ThreadPool class
  2. We re-use [the Classes](LINK TO PREVIOUS POST) from the last blog in this series. NOTE: that we'll make some changes to the ThreadRunner class!
Class: ThreadPool

First we need to define a class called ThreadPool which encapsulates our ThreadPool logic.

require 'thread' class ThreadPool # mutex instance creation INSTANCE_MUTEX = Mutex.new # a pool of 15 threads def self.pool_of_15 # initialize thread pool INSTANCE_MUTEX.synchronize do @pool_of_15 ||= MultiThreading::ThreadPool.new(pool_size: 15)} end end # a pool of 3 threads def self.pool_of_3 # initialize thread pool INSTANCE_MUTEX.synchronize do @pool_of_3 ||= MultiThreading::ThreadPool.new(pool_size: 3) end end def initialize(pool_size:) @pool_size = pool_size @operations = Queue.new @thread_pool = Array.new(pool_size) do Thread.new do catch(:exit) do loop do block, lambda_context = @operations.pop block.call(lambda_context) end end end end end def schedule(lambda_context, &block) lambda_context.exit_code = nil @operations << [block, lambda_context] end def shutdown @pool_size.times {schedule(nil) {throw :exit}} @thread_pool.map(&:join) end end
Class: ThreadRunner

We now update our previous ThreadRunner class:

attr_accessor :lambda_contexts def initialize(lambda_contexts: []) @lambda_contexts = lambda_contexts end # executes and returns true or false if success or fail def execute(thread_pool: ThreadPool.pool_of_15) @lambda_contexts.each do |lambda_context| # enqueue the job in the thread pool thread_pool.schedule(lambda_context) do |context| with_connection do begin params = context.parameters context.stdout = context.lambda_definition.call(*params) context.exit_code = 0 rescue => exc context.stderr = exc.message context.exit_code = -1 context.stderr_backtrace = exc.backtrace.join("\n") end end end end # now we wait for all lambda_contexts to complete # lets run a tight loop here for now while @lambda_contexts.any? {|lambda_context| lambda_context.exit_code.nil?} # sleep for an arbitrary length # we can also introduce a timeout here sleep(0.02) end # return exit codes for all our operations @lambda_contexts.all? {|lambda_context| lambda_context.exit_code == 0} end # returns errors after execution is complete def after_execution_get_full_errors errors = @lambda_contexts.select do |lc| !lc.stderr_backtrace.blank? end errors.map do |lc| "EXCEPTION: #{lc.stderr} AT #{lc.stderr_backtrace}" end end private # ensures that connections are handled in the case of # Ruby Timeouts (evil) def with_connection(&block) ActiveRecord::Base.connection_pool.with_connection do yield block end rescue Timeout::Error => exc ActiveRecord::Base.clear_active_connections! ActiveRecord::Base.connection.close raise exc end end

Example Usage

Now, using the classes above for our workflow as defined above we can simply do the following:

# create thread_runner object thread_runner = ThreadRunner.new # add lambdas to the thread_runner 2.times do |idx| # define the method lambda lambda_def = lambda {|svr_idx| create_server(svr_idx) thread_runner.lambda_contexts << LambdaContext.new(lambda_def, idx) end # execute all lambdas in parallel # and wait for them to complete (with max of 3 concurrency) all_ok = thread_runner.execute(thread_pool: ThreadPool.pool_of_3) # OR wait for them to complete (with max of 15 concurrency) all_ok = thread_runner.execute(thread_pool: ThreadPool.pool_of_15) # spit out errors if any failed unless all_ok # do something puts thread_runner.after_execution_get_full_errors end ... profit!

Conclusion

In this blog post, we looked at an enhancement to our previous multithreading solution in Ruby to provide the use of a ThreadPool and concurrency limiting.

Now go code!




Try Cloud 66 for Free, No credit card required