Better Way to Log Your Background Jobs
There are several use-cases for those, some of which might be: periodic tasks, tasks too heavy to return the result in the synchronous way or the tasks that can be outside the flow and their failure should not make any difference to the currently processed flow.
Fortunately for us, the developers, because of the background processing being so widely used in Rails we have some awesome tools and gems to help us out! The most popular one being Sidekiq and that’s the one I'll use in the code
Great, so pretty much everyone is now using background processing, but what’s with this
Step 1 - Create a table for logs
Yes, we're going to save logs into the database. The structure might depend on your needs. What I wanted to have for sure were: job type, job's state
jid
(Job ID), associated user_id (optional) and created_at
/ updated_at
timestamps. Let's also add an index on jid
class CreateAsyncJobLogs < ActiveRecord::Migration[5.0]
def change
create_table :async_job_logs do |t|
t.string :jid
t.integer :state
t.integer :job_type
t.references :user, foreign_key: true
t.timestamps
end
end
add_index(:async_job_logs, :jid)
end
Step 2 - Create model
Now, let's quickly AsyncJobLog
class AsyncJobLog < ApplicationRecord
enum state: {
started: 1,
finished: 2,
}
enum job_type: {
reconciliation: 1,
user_verification: 2,
financial_data_import: 3,
}
end
Step 3 - Add logging to your workers
Let's start with the module that will be responsible for most of the dirty work - meaning creating and updating logs:
module WithJobLogging
def log_retryable_job(jid, job_type, user_id = nil)
create_async_job_log(jid, job_type, user_id) unless async_job_log(jid)
yield
mark_job_as_finished(jid)
end
def create_async_job_log(jid, job_type, user_id = nil)
AsyncJobLog.create(
jid: jid,
state: 'started',
job_type: job_type,
user_id: user_id
)
end
def async_job_log(jid)
AsyncJobLog.find_by(jid: jid)
end
def mark_job_as_finished(jid)
async_job_log(jid).finished!
end
end
Now let's use this module in the actual worker:
require 'sidekiq'
class FinancialDataImportWorker
include Sidekiq::Worker
include WithJobLogging
sidekiq_options retry: 5
def perform(user_id)
log_retryable_job(jid, 'financial_data_import', user_id) { do_stuff }
end
private
def do_stuff
# logic
end
end
The flow is pretty straightforward - before the worker does its job, it first creates a record in the table providing some information about who he is. At this point we have a record saying the worker responsible for financial data import (job_type
) and associated with the certain user (user_id
) has started processing at the given time (created_at
) - that's already a lot of nicely aggregated information. After finishing processing whatever the worker is responsible for, it simply finds his own log (by JID) and marks it as finished. But what if my worker fails, multiple times up to the point when it reaches the maximum amount of retries? Well, we can handle that in many ways: it all depends on what you need and what the convention is. Let's go with the simplest example and update our worker so it marks the log as finished when it retried for the last time:
require 'sidekiq'
class FinancialDataImportWorker
include Sidekiq::Worker
include WithJobLogging
sidekiq_options retry: 5
sidekiq_retries_exhausted do |msg|
async_job_log = AsyncJobLog.find_by(jid: msg['jid'])
async_job_log.finished!
end
def perform(user_id)
log_retryable_job(jid, 'financial_data_import', user_id) { do_stuff }
end
private
def do_stuff
# logic
end
end
If your application can make a use of the information about the job being unsuccessful, you might want to update job log with the new state, such as dead (don't forget to add it to your enum hash). While we're at it - what about the jobs that are not allowed to retry at all? Here is another approach to the problem. Let's add
module WithJobLogging
def log_retryable_job(jid, job_type, user_id = nil)
create_async_job_log(jid, job_type, user_id) unless async_job_log(jid)
yield
mark_job_as_finished(jid)
end
def log_nonretryable_job(jid, job_type, user_id = nil)
begin
create_async_job_log(jid, job_type, user_id) unless async_job_log(jid)
yield
rescue
# if needed
ensure
mark_job_as_finished(jid)
end
end
def create_async_job_log(jid, job_type, user_id = nil)
AsyncJobLog.create(
jid: jid,
state: 'started',
job_type: job_type,
user_id: user_id
)
end
def async_job_log(jid)
AsyncJobLog.find_by(jid: jid)
end
def mark_job_as_finished(jid)
async_job_log(jid).finished!
end
end
Now we can use it in the
require 'sidekiq'
class UserVerificationWorker
include Sidekiq::Worker
include WithJobLogging
sidekiq_options retry: false
def perform(user_id)
log_nonretryable_job(jid, 'user_verification', user_id) { do_stuff }
end
private
def do_stuff
# logic
end
end
We simply make sure that whatever happens in the main block of the worker, we’ll end up with log marked as finished. It's crucial to have the right state of the job reflected in the log if you want to make decisions based on gathered information.
Step 4 - Use gathered information
Okay, we got logs nicely structured and saved into the database. What now? There are tons of things you can use logs for. Now you know what worker performed at what time and for which user. You can add all kinds of fields to those logs, the options are limitless (well, actually limited by the max number of columns) such as calculated processing time, amount of
You can probably imagine working with those logs manually, so using them for debugging or showing/filtering them in the Active Admin. Let's just go through one of the possible scenarios where the code uses those as a part of logic.
Critical section
We previously defined a couple of job types. Let's assume that it shouldn't be possible for more than one financial-related worker to run simultaneously for the same user. That would mean the reconciliation worker and financial data import workers cannot run at the same time. First, let's add a scope to the AsyncJobLog
model that's going to be handy:
class AsyncJobLog < ApplicationRecord
enum state: {
started: 1,
finished: 2,
}
enum job_type: {
reconciliation: 1,
user_verification: 2,
financial_data_import: 3,
}
scope :financial_jobs, -> { where(job_type: %w[financial_data_import reconciliation]) }
end
Next, in FinancialDataImportWorker
require 'sidekiq'
class FinancialDataImportWorker
include Sidekiq::Worker
include WithJobLogging
sidekiq_options retry: 5
sidekiq_retries_exhausted do |msg|
async_job_log = AsyncJobLog.find_by(jid: msg['jid'])
async_job_log.finished!
end
def perform(user_id)
user = User.find(user_id)
return if financial_job_in_progress?(user)
log_retryable_job(jid, 'financial_data_import', user_id) { do_stuff }
end
private
def financial_job_in_progress?(user)
user.async_job_logs.financial_jobs.started.where.not(jid: jid).any?
end
def do_stuff
# logic
end
end
The worker will simply quit without starting if there is one of the two financial workers already in progress for this user. Of course, quitting is the easiest option. You can raise Rollbar error or do anything else that makes sense to your application. To make it the real critical section you can make worker retry where.not(jid: jid)
PS. If the critical section is VERY critical it might be worth creating
Conclusion
I hope that gave you an insight into how you can easily setup background job logging and what are the advantages of using even simple logging mechanism. Asynchronous processing definitely isn't easy, nor is debugging problems in this area. Logging workers
Huge thanks to Maciej
Photo by Maciej Rusek on Unsplash