Integrations Engineering Best Practices #1 Cursors

22 Mar 2024 - Jorge Garcia

Last updated: 2024-03-22


This is an implementation of the Cursor concept in Data Engineering.

Cursors need to be used with Incremental Synchronization.

A Cursor is an object that keeps track of the last parameters used to request data from an API, used when you need to request data frequently.

Beyond the documentation I shared above, the cursor can also help in other ways. Here, I’ll present a common use case to avoid data loss.

Problem

You need to pull data from an API to create something in your app. (Creating some form of Data Pipeline).

I’m going to present an example of how Cursors can be used with these pipelines.

A simple way to create this pipeline is by executing cron jobs, with (at least in the Rails world) a background job framework.

Example: let’s say we work on an app that pulls online messages from different platforms. We want to create a Slack integration to sync messages.

# Sidekiq scheduler
SyncMessagesJob:
  every: "5 minutes"
class Message < ApplicationRecord
end

class SlackIntegration < ApplicationRecord
end

class SyncMessagesJob
  def perform
    SlackIntegration.all.each do |slack_integration|
      to = Time.zone.now
      from = starts_at - 5.minutes

      # The `from` and `to` is a time range to request messages created within this time range.
      messages = slack_integration.get_messages(from: from, to: to)

      Message.upsert(messages)
    end
  end
end

Lets ignore the next things:

This is good enough, but after an analysis, you found that some data is missing. And you probably require a backfilling process. But why does this happen?

Let’s picture this with timestamps:

  1. The job starts running exactly at 10:05:00
  2. X Integration runs the job with the values from = 10:00:01 and to = 10:05:01
  3. Between the next run (from 10:05 -> 10:10) another job packed the queue, now is full.
  4. The next job is scheduled at 10:10, but it take some time to finally be processed.
  5. X Integration runs the job, with the values from = 10:05:45 and to = 10:10:45
  6. You can see there’s a time range we’re not covering, from 10:05:02 -> 10:05:44

Let’s discuss a few ideas:

Solution

A Cursor is a simple object that at minimun, it saves the next attributes:

The Cursor object is used to keep track of the next params to use and updates this value when the job finishes.

Usage

Models:

class Cursor < ApplicationRecord
  belongs_to :slack_integration # or polymorphic if needed
end

class SlackIntegration < ApplicationRecord
  has_one :cursor # or has_many if you have multiple type of objects you need to integrate with
end

Job:

class SyncMessagesJob
  def perform
    SlackIntegration.eager_load(:cursor).all.each do |slack_integration|
      from = integration.cursor.next_value # or use find_or_initialize_by if doesn't exist
      to = Time.zone.now

      messages = integration.get_messages(from: from, to: to)

      Messages.upsert(messages)

      integration.cursor.update(next_value: to)
      # NOTE: the cursor is updated after the upsert. If by some reason, the cursor can't be
      #       updated, we will not loose data, as at worst the upsert will be executed again.
      #       Otherwise (updating before the upsert) we can lose data if the upsert fails.
    end
  end
end

Let’s picture the previous example:

  1. The job starts running exactly at 10:05:00
  2. X Integration runs the job, the cursor.next_value is 10:00:02 we use this as from. Now is the same as the previous example, to = 10:05:01
  3. Between the next run (from 10:05 -> 10:10) another job packed the queue, now is full.
  4. The next job is scheduled at 10:10, but it take some time to finally be processed.
  5. X Integration runs the job, the cursor.next_value is 10:05:01 we use this as from and to = 10:10:45
  6. This time (because we save the previous value as next_value) we don’t loose data, as the time range is always covered.

Cons: