Prevent race conditions with Advisory Locking

We have had a problem recently about multiple workers running in parallel to sync resources to a third-party application.

That wouldn't be a problem itself, but we did that in batches to prevent triggering too many HTTP requests, since we had to do that every time a resource was created in our database. The problem with that approach, is that when multiple resources were added, there was a race condition between jobs being enqueued to sync them to the external service.

Without further explanation, let's see some code:

# models/resource.rb

class Resource
  after_create_commit :enqueue_sync_to_third_party
	
  def enqueue_sync_to_third_party
    SyncResourcesToThirdPartyJob.perform_async
  end	
end
# jobs/sync_resources_to_third_party_job.rb
class SyncResourcesToThirdPartyJob
  include Sidekiq::Job

  def perform
    resources_to_sync = Resource.where(synced_at: nil)
	  
    # triggers HTTP request with new resources
    ThirdParty.new.sync(resources_to_sync)
	  
	resources_to_sync.update!(synced_at: Time.current)
  end
end

As you can see, when a Resource is created, we enqueue a job to sync all fresh resources to the ThirdParty.

You might be thinking: Why not to make the job to sync just the created resource instead of all that are pending? Again, we want to do it in batches to prevent triggering too many HTTP requests (not only because of performance reasons, but also because of API rate limits in place).

Have you seen the problem already?

The code above worked great for a couple of days, but then we started seeing some resources being duplicated in the ThirdParty.

Think about what would happen when multiple resources were created in a row. We would enqueue multiple jobs to sync, and then if they run in parallel, we would end up with a Resource being synced multiple times.

Let's assume we have two Sidekiq Workers, and we create two resources A and B, in a row.

Worker #1 Worker #2 ThirdParty state
Resource.where(synced_at: nil) => A and B []
Resource.where(synced_at: nil) => A and B []
Add Resource A and B to ThirdParty [A, B]
Add Resource A and B to ThirdParty [A, B, A, B] 🚨
synced_resources.update(synced_at: Time.current) [A, B, A, B] 🚨
synced_resources.update(synced_at: Time.current) [A, B, A, B] 🚨

As you can see, when we enqueue multiple jobs to sync the Resource, we have a race condition that can end up with resources being duplicated in the ThirdParty.

What is Advisory Locking?

An Advisory Lock is a cooperative lock. It is provided by the Database Engine (PostgreSQL, MySQL, etc), and it is just defined with a string (key) that has a meaning for the consumers. That means an advisory lock cannot prevent another consumer from accessing the same data, but only through the defined key.

This is what the official PostgreSQL docs say about advisory locks:

[...] provides a means for creating locks that have application-defined meanings. These are called advisory locks, because the system does not enforce their use — it is up to the application to use them correctly.

That means one consumer can set up an advisory lock foo, and if another consumer asks to get foo before being released, it will be locked until that happens. However, if a consumer asks for bar instead, it will work because the advisory lock key is different.

This is very useful because with advisory locks you can prevent the same code from running simultaneously.

How to use Advisory Locking in Ruby?

Even though we can implement Advisory Locking with raw SQL, I like using the gem with_advisory_lock that already handles that for you so you can focus on the business side of things.

# Gemfile

gem 'with_advisory_lock'

What we need to do is to make sure that when a sync job instance starts, no other instance runs simultaneously until the former finishes the sync and updates the synced_at field.

# jobs/sync_resources_to_third_party_job.rb
class SyncResourcesToThirdPartyJob
  include Sidekiq::Job

  def perform
    Resource.with_advisory_lock("third_party_sync_lock") do
      resources_to_sync = Resource.where(synced_at: nil)
	  
      ThirdParty.new.sync(resources_to_sync)
	  
	  resources_to_sync.update!(synced_at: Time.current)
	end
  end
end

This way, an instance will get the lock with the key third_party_sync_lock, and when another worker runs the same code, it will not be able to proceed until the lock is released. That prevents the second worker to get resources to sync that are already being synced.

Worker #1 Worker #2 ThirdParty state
-> LOCK <- []
-> WAITING <- []
Resource.where(synced_at: nil) => A and B []
Add Resource A and B to ThirdParty [A, B]
resources_to_sync.update(synced_at: Time.current) [A, B]
-> UNLOCK <- [A, B]
-> LOCK <- [A, B]
Resource.where(synced_at: nil) => EMPTY [A, B]
-> UNLOCK <- [A, B] ✅

How to test it?

# spec/jobs/sync_resources_to_third_party_job_spec.rb

RSpec.describe SyncResourcesToThirdPartyJob do
  it "does not sync same resource twice if job is triggered multiple times" do
    non_synced_resources = [create(:resource)]

    third_party_api = instance_double(ThirdParty, sync: true)

    allow(ThirdParty).to receive(:new).and_return(third_party_api)

    threads = Array.new(2).map do
      Thread.new do
        described_class.new.perform
      end
    end

    threads.each(&:join)

    # Should receive only once, because the second time there
	# would be no books to bundle.
    expect(third_party_api).to have_received(:sync).with(
      non_synced_resources
    ).once
  end
end

Alternatives to Advisory Locking

I won't get into the details of each alternative, but want to mention them:

  • row-level locking (possibly too many locks, might be a bit more complex but works)
  • Make SyncResourcesToThirdPartyJob run per resource instead of batch (this could potentially cause many HTTP requests to ThirdParty, which means a performance hit and also a potential issue if your third party has an API rate-limit in place)
  • Sidekiq Enterprise unique jobs (would prevent this issue for 99% of cases, but it's just a band-aid and would not prevent the race condition if executing code from terminal or adding other triggers)
  • Flag to prevent enqueuing the same job twice (adds quite some logic and will need to store the flag either in the database, which feels a bit overkilling)
« Go home