← All Articles

Kubernetes: (Graceful) Sidekiq Worker Lifecycle

Vic van GoolVic van Gool
Jul 11th 17Updated Jul 6th 22
Ruby on Rails

Kubernetes: (Graceful) Sidekiq Worker Lifecycle

Introduction

With our recent release of Container Stacks v2 into public beta we're totally loving Kubernetes. But as with all love affairs, there are some bothersome aspects that we have to accept and work with. One such aspect is in the inflexibility of the vanilla shutdown sequence provided by Kubernetes.

We're also prolific users of Sidekiq for the parts of our backend that are ruby-based (we're running a bunch of other technologies, but we think Sidekiq is hands-down the best for running ruby jobs). As with any background workers, Sidekiq is sensitive to its shutdown sequence. We need to have more control over this.

TL;DR

This article provides a solution to achieve graceful shutdown of Sidekiq workers via the Kubernetes pod shutdown lifecycle.

Problem

There is a lot of documentation out there around the current Kubernetes pod shutdown sequence (see appendices for some starting points). NOTE: I say current as this information is only valid as of now... this might change (though I think at this point that is fairly unlikely). The current shutdown sequence looks like the following:

1. POD marked as *terminating* 2. Optional: PreStop hook called synchronously 3. SIGTERM sent to container process if still present 4. Kubernetes waits up-to *grace-period* for container to exit 5. SIGKILL sent to container process if still present

Kubernetes allows you to specify the terminationGracePeriodSeconds (ie. how long it will wait for shutdown after SIGTERM sent) in your spec. Unfortunately Kubernetes doesn't allow you to specify the shutdown sequence itself.

At Cloud 66 we were previously lucky enough to be controlling the shutdown process via our own homegrown scheduler, this enabled us to expose the shutdown sequence to our users directly (in the form of USR1;1h;TERM;10s;KILL for example). But now we need another solution.

Furthermore (and specific to Sidekiq) as we have some very long running jobs (dependent on external resources), we want to have a long wait time; but also want to terminate the workers as soon as they are no longer busy. So our ideal Sidekiq shutdown sequence looks like the following:

1. Send USR1 (or TSTP for sidekiq > 5.0.0) to workers 2. Wait until they are no longer processing jobs 3. Send TERM

Solution: Use a Pre-Stop Hook

Looking at the shutdown sequence above, you'll see that there is a PreStop hook point called during the sequence. More on this in the Kubernetes Container Lifecycle Hooks documentation. The salient bit of information is essentially that kubernetes will execute some command of your choosing at that hook point, and it will execute it synchronously, waiting for the command to complete before resuming the shutdown sequence.

Using this hook point, we can inject the graceful shutdown behaviour we want for our Sidekiq workers. And because we need ths ourselves (and given that Sidekiq is ruby-based) I put together the following ruby script to do just that!

#! /usr/bin/env ruby # encoding: utf-8 ## PURPOSE: this script will quiet any sidekiq workers it finds, ## and then shut them down when they are no longer handling jobs # utility class for logging and running commands class Utils require 'open3' attr_accessor :output def initialize(output) @output = output end def run_command(command) log("RUNNING: \"#{command}\"") stdout, stderr, status = Open3.capture3(command) return stdout.strip if status.success? # handle errors stderr = stderr.strip if stderr.empty? log("FATAL:\nCommand: \"#{command}\"") else log("FATAL:\nCommand: \"#{command}\"\nError: #{stderr}") end exit(-1) end def log(message) line = "[#{Time.now}] #{message}" @output == 'stdout' ? puts(line) : File.open(@output, 'a') {|file| file.puts(line)} end def log_underline log('-' * 70) end end # class to encapsulate the worker manager class WorkerManager attr_accessor :timeout, :utils STATUS_WAITING_THREADS = :waiting_threads STATUS_CAN_BE_TERMINATED = :can_be_terminated STATUS_CAN_BE_QUIETED = :can_be_quieted POLL_FREQUENCY = 10 def initialize(timeout, utils) @timeout = timeout @utils = utils end def initiate_shutdown @utils.log(' *******************************') @utils.log(' **STARTED SHUTDOWN SEQUENCE**') @utils.log(' *******************************') # figure out the timeout time current_time = Time.now timeout_time = current_time + @timeout # fetch latest worker info workers = materialize_workers while Time.now <= timeout_time && !workers.empty? # do what is needed for each worker workers.each {|worker| worker.handle_shutdown(false)} # sleep for the poll time @utils.log("...sleeping for #{POLL_FREQUENCY} seconds...") sleep(POLL_FREQUENCY) # fetch latest worker info workers = materialize_workers end if Time.now > timeout_time && !workers.empty? @utils.log('[[TIMED-OUT]]') # fetch latest worker info workers = materialize_workers # do what is needed for each worker workers.each {|worker| worker.handle_shutdown(true)} # give process time to respond to the signals @utils.log("...sleeping for #{POLL_FREQUENCY} seconds...") sleep(POLL_FREQUENCY) end end private def materialize_workers workers = [] stdout = @utils.run_command('ps aux | grep [s]idekiq | grep busy\] || true') stdout.lines.each do |line| line = line.strip if line =~ Worker::WORKER_REGEX pid = $~[:pid].to_i version = $~[:version] active_threads = $~[:worker_count].to_i total_threads = $~[:total_threads].to_i is_quiet = line =~ /stopping$/ worker = Worker.new(pid, version, active_threads, total_threads, is_quiet, @utils) workers << worker end end @utils.log_underline if workers.empty? @utils.log('CURRENT STATE: No workers found!') else @utils.log('CURRENT STATE:') workers.each {|worker| @utils.log(worker.status_text)} end @utils.log_underline return workers end end # class to encapsulate workers class Worker attr_accessor :utils, :pid, :status, :active_threads, :total_threads, :version STATUS_WAITING_THREADS = :waiting_threads STATUS_CAN_BE_TERMINATED = :can_be_terminated STATUS_CAN_BE_QUIETED = :can_be_quieted WORKER_REGEX = /^.*?\s+(?<pid>\d+).*sidekiq\s+(?<version>[\d\.]+).*?\[(?<worker_count>\d+)\sof\s(?<total_threads>\d+) busy\]/ def initialize(pid, version, active_threads, total_threads, is_quiet, utils) @utils = utils @pid = pid @version = version @active_threads = active_threads @total_threads = total_threads @status = parse_status(active_threads, is_quiet) end def status_text output = @status == STATUS_CAN_BE_QUIETED ? '[ACTIVE]' : '[QUIET]' output = "#{output} [PID:#{@pid}] [VERSION:#{@version}] [#{@active_threads} of #{@total_threads}]" return "#{output} - waiting for threads to complete" if status == STATUS_WAITING_THREADS return "#{output} - can be terminated" if status == STATUS_CAN_BE_TERMINATED return "#{output} - can be quieted" if status == STATUS_CAN_BE_QUIETED end def handle_shutdown(aggressive) if aggressive # kill worker @utils.run_command("kill -9 #{@pid}") else if @status == STATUS_CAN_BE_QUIETED major_version = @version.gsub(/\..*/,'').to_i if major_version < 5 # quiet worker @utils.run_command("kill -USR1 #{@pid}") else # quiet worker @utils.run_command("kill -TSTP #{@pid}") end elsif @status == STATUS_CAN_BE_TERMINATED # stop worker @utils.run_command("kill -TERM #{@pid}") end end end private def parse_status(active_threads, is_quiet) return STATUS_CAN_BE_QUIETED unless is_quiet return STATUS_WAITING_THREADS if active_threads > 0 STATUS_CAN_BE_TERMINATED end end # parse arguments require 'optparse' require 'ostruct' options = OpenStruct.new options.timeout = 120 options.output = 'stdout' OptionParser.new do |opts| opts.banner = 'Usage: sidekiq_safe_shutdown.rb [options]' opts.on('-o [ARG]', '--output [ARG]', 'File-path or stdout (default: stdout)') {|v| options.output = v} opts.on('-t [ARG]', '--timeout [ARG]', 'Timeout in seconds (default: 120)') {|v| options.timeout = v} opts.on('-h', '--help', 'Display this help') do puts opts exit end end.parse! # handle timeou utils = Utils.new(options.output) options.timeout = options.timeout.to_i if options.timeout < 10 utils.log("FATAL:\nTimeout #{options.timeout} too short!") exit(-1) end # initiate shutdown WorkerManager.new(options.timeout, utils).initiate_shutdown

As the hook command executes in context of your image, you'll need to include this script inside your image (simply put it in your source code if you're using Cloud 66 Skycap). Note that the script is executed with the following arguments:

Usage: sidekiq_safe_shutdown.rb [options] -o, --output [ARG] File-path or stdout (default: stdout) -t, --timeout [ARG] Timeout in seconds (default: 120) -h, --help Display this help

For the example below we're putting this script in our image in the path:
/tmp/sidekiq_safe_shutdown.rb

And don't forget to make it executable with:
chmod +x /tmp/sidekiq_safe_shutdown.rb)

Invoking via Kubernetes Manually

If you're running Kubes directly, then you'll need to manually modify your Pod spec to include terminationGracePeriodSeconds and invoking the PreStop hook:

spec: #with default timeout terminationGracePeriodSeconds: 15 #or with specific timeout terminationGracePeriodSeconds: 3605 lifecycle: preStop: exec: #with default timeout command: ["/tmp/sidekiq_safe_shutdown.rb"] #or with specific timeout command: ["/tmp/sidekiq_safe_shutdown.rb", "-t", "3600"]
Invoking via Cloud 66

If you're running via our awesome Container Stacks v2 (shameless plug :) then simply add this script to your service.yml with the following line:

#with default timeout pre_stop_command: /tmp/sidekiq_safe_shutdown.rb stop_grace: 15s #or with specific timeout pre_stop_command: /tmp/sidekiq_safe_shutdown.rb -t 3600 stop_grace: 3605s

And that should be all you need - now when your Sidekiq workers shut down they will do so gracefully!

Appendices (Further Reading)



Try Cloud 66 for Free, No credit card required