Skip to Content Skip to Search

Active Job Continuation Step

Represents a step within a continuable job.

When a step is completed, it is recorded in the job’s continuation state. If the job is interrupted, it will be resumed from after the last completed step.

Steps also have an optional cursor that can be used to track progress within the step. If a job is interrupted during a step, the cursor will be saved and passed back when the job is resumed.

It is the responsibility of the code in the step to use the cursor correctly to resume from where it left off.

Methods
A
C
D
N
R
S
T

Attributes

[R] cursor

The cursor for the step.

[R] name

The name of the step.

Class Public methods

new(name, cursor, job:, resumed:)

# File activejob/lib/active_job/continuation/step.rb, line 25
def initialize(name, cursor, job:, resumed:)
  @name = name.to_sym
  @initial_cursor = cursor
  @cursor = cursor
  @resumed = resumed
  @job = job
end

Instance Public methods

advance!(from: nil)

Advance the cursor from the current or supplied value

The cursor will be advanced by calling the succ method on the cursor. An UnadvanceableCursorError error will be raised if the cursor does not implement succ.

# File activejob/lib/active_job/continuation/step.rb, line 49
def advance!(from: nil)
  from = cursor if from.nil?

  begin
    to = from.succ
  rescue NoMethodError
    raise UnadvanceableCursorError, "Cursor class '#{from.class}' does not implement 'succ'"
  end

  set! to
end

advanced?()

Has the cursor been advanced during this job execution?

# File activejob/lib/active_job/continuation/step.rb, line 67
def advanced?
  initial_cursor != cursor
end

checkpoint!()

Check if the job should be interrupted, and if so raise an Interrupt exception. The job will be requeued for retry.

# File activejob/lib/active_job/continuation/step.rb, line 35
def checkpoint!
  job.checkpoint!
end

description()

# File activejob/lib/active_job/continuation/step.rb, line 75
def description
  "at '#{name}', cursor '#{cursor.inspect}'"
end

resumed?()

Has this step been resumed from a previous job execution?

# File activejob/lib/active_job/continuation/step.rb, line 62
def resumed?
  @resumed
end

set!(cursor)

Set the cursor and interrupt the job if necessary.

# File activejob/lib/active_job/continuation/step.rb, line 40
def set!(cursor)
  @cursor = cursor
  checkpoint!
end

to_a()

# File activejob/lib/active_job/continuation/step.rb, line 71
def to_a
  [ name.to_s, cursor ]
end