We have had a problem recently about multiple workers running in parallel to sync resources to a third-party application.
That wouldn't be a problem itself, but we did that in batches to prevent triggering too many HTTP requests, since we had to do that every time a resource was created in our database. The problem with that approach, is that when multiple resources were added, there was a race condition between jobs being enqueued to sync them to the external service.
Without further explanation, let's see some code:
# models/resource.rb
class Resource
after_create_commit :enqueue_sync_to_third_party
def enqueue_sync_to_third_party
SyncResourcesToThirdPartyJob.perform_async
end
end
# jobs/sync_resources_to_third_party_job.rb
class SyncResourcesToThirdPartyJob
include Sidekiq::Job
def perform
resources_to_sync = Resource.where(synced_at: nil)
# triggers HTTP request with new resources
ThirdParty.new.sync(resources_to_sync)
resources_to_sync.update!(synced_at: Time.current)
end
end
As you can see, when a Resource
is created, we enqueue a job to sync all fresh resources to the ThirdParty
.
You might be thinking: Why not to make the job to sync just the created resource instead of all that are pending? Again, we want to do it in batches to prevent triggering too many HTTP requests (not only because of performance reasons, but also because of API rate limits in place).
Have you seen the problem already?
The code above worked great for a couple of days, but then we started seeing some resources being duplicated in the ThirdParty
.
Think about what would happen when multiple resources were created in a row. We would enqueue multiple jobs to sync, and then if they run in parallel, we would end up with a Resource
being synced multiple times.
Let's assume we have two Sidekiq Workers, and we create two resources
A and B, in a row.
Worker #1 | Worker #2 | ThirdParty state |
---|---|---|
Resource.where(synced_at: nil) => A and B | [] | |
Resource.where(synced_at: nil) => A and B | [] | |
Add Resource A and B to ThirdParty | [A, B] | |
Add Resource A and B to ThirdParty | [A, B, A, B] 🚨 | |
synced_resources.update(synced_at: Time.current) | [A, B, A, B] 🚨 | |
synced_resources.update(synced_at: Time.current) | [A, B, A, B] 🚨 |
As you can see, when we enqueue multiple jobs to sync the Resource
, we have a race condition that can end up with resources being duplicated in the ThirdParty
.
What is Advisory Locking?
An Advisory Lock is a cooperative lock. It is provided by the Database Engine (PostgreSQL, MySQL, etc), and it is just defined with a string
(key) that has a meaning for the consumers.
That means an advisory lock cannot prevent another consumer from accessing the same data, but only through the defined key.
This is what the official PostgreSQL docs say about advisory locks:
[...] provides a means for creating locks that have application-defined meanings. These are called advisory locks, because the system does not enforce their use — it is up to the application to use them correctly.
That means one consumer can set up an advisory lock foo
, and if another consumer asks to get foo
before being released, it will be locked until that happens.
However, if a consumer asks for bar
instead, it will work because the advisory lock key is different.
This is very useful because with advisory locks you can prevent the same code from running simultaneously.
How to use Advisory Locking in Ruby?
Even though we can implement Advisory Locking with raw SQL, I like using the gem with_advisory_lock that already handles that for you so you can focus on the business side of things.
# Gemfile
gem 'with_advisory_lock'
What we need to do is to make sure that when a sync job instance starts, no other instance runs simultaneously until the former finishes the sync and updates the synced_at
field.
# jobs/sync_resources_to_third_party_job.rb
class SyncResourcesToThirdPartyJob
include Sidekiq::Job
def perform
Resource.with_advisory_lock("third_party_sync_lock") do
resources_to_sync = Resource.where(synced_at: nil)
ThirdParty.new.sync(resources_to_sync)
resources_to_sync.update!(synced_at: Time.current)
end
end
end
This way, an instance will get the lock with the key third_party_sync_lock
, and when another worker runs the same code, it will not be able to proceed until the lock is released. That prevents the second worker to get resources to sync that are already being synced.
Worker #1 | Worker #2 | ThirdParty state |
---|---|---|
-> LOCK <- | [] | |
-> WAITING <- | [] | |
Resource.where(synced_at: nil) => A and B | [] | |
Add Resource A and B to ThirdParty | [A, B] | |
resources_to_sync.update(synced_at: Time.current) | [A, B] | |
-> UNLOCK <- | [A, B] | |
-> LOCK <- | [A, B] | |
Resource.where(synced_at: nil) => EMPTY | [A, B] | |
-> UNLOCK <- | [A, B] ✅ |
How to test it?
# spec/jobs/sync_resources_to_third_party_job_spec.rb
RSpec.describe SyncResourcesToThirdPartyJob do
it "does not sync same resource twice if job is triggered multiple times" do
non_synced_resources = [create(:resource)]
third_party_api = instance_double(ThirdParty, sync: true)
allow(ThirdParty).to receive(:new).and_return(third_party_api)
threads = Array.new(2).map do
Thread.new do
described_class.new.perform
end
end
threads.each(&:join)
# Should receive only once, because the second time there
# would be no books to bundle.
expect(third_party_api).to have_received(:sync).with(
non_synced_resources
).once
end
end
Alternatives to Advisory Locking
I won't get into the details of each alternative, but want to mention them:
- row-level locking (possibly too many locks, might be a bit more complex but works)
- Make SyncResourcesToThirdPartyJob run per resource instead of batch (this could potentially cause many HTTP requests to ThirdParty, which means a performance hit and also a potential issue if your third party has an API rate-limit in place)
- Sidekiq Enterprise unique jobs (would prevent this issue for 99% of cases, but it's just a band-aid and would not prevent the race condition if executing code from terminal or adding other triggers)
- Flag to prevent enqueuing the same job twice (adds quite some logic and will need to store the flag either in the database, which feels a bit overkilling)