Skip to content

Commit

Permalink
Add globals and kwargs to Workflow and Job (#113)
Browse files Browse the repository at this point in the history
* Add globals to Workflow and Job

For shared functionality across Workflow classes that depends on some variable
data, it's useful to have named parameters for the Workflow that are
automatically forwarded to all Job instances.

This commit adds a `globals` keyword arg to the Workflow initializer, which
should be a hash that is then stored in a `globals` hash attribute, is
persisted, and is merged into the `params` sent to each Job instance.

* Add kwargs to Workflow

For workflows that take a larger number of parameters or optional parameters,
it's useful to specify these as keyword arguments rather than positional ones.

This commit adds support to `Workflow#initialize` for kwargs, and stores them
in a new `kwargs` attribute.

---------

Co-authored-by: Noah Harrison <[email protected]>
  • Loading branch information
noahfpf and noahmiller authored Jul 17, 2024
1 parent 6e87f60 commit 68bc23f
Show file tree
Hide file tree
Showing 6 changed files with 87 additions and 16 deletions.
31 changes: 28 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,17 +159,19 @@ Let's assume we are writing a book publishing workflow which needs to know where

```ruby
class PublishBookWorkflow < Gush::Workflow
def configure(url, isbn)
def configure(url, isbn, publish: false)
run FetchBook, params: { url: url }
run PublishBook, params: { book_isbn: isbn }, after: FetchBook
if publish
run PublishBook, params: { book_isbn: isbn }, after: FetchBook
end
end
end
```

and then create your workflow with those arguments:

```ruby
PublishBookWorkflow.create("http://url.com/book.pdf", "978-0470081204")
PublishBookWorkflow.create("http://url.com/book.pdf", "978-0470081204", publish: true)
```

and that's basically it for defining workflows, see below on how to define jobs:
Expand Down Expand Up @@ -256,6 +258,29 @@ flow.status

## Advanced features

### Global parameters for jobs

Workflows can accept a hash of `globals` that are automatically forwarded as parameters to all jobs.

This is useful to have common functionality across workflow and job classes, such as tracking the creator id for all instances:

```ruby
class SimpleWorkflow < Gush::Workflow
def configure(url_to_fetch_from)
run DownloadJob, params: { url: url_to_fetch_from }
end
end

flow = SimpleWorkflow.create('http://foo.com', globals: { creator_id: 123 })
flow.globals
=> {:creator_id=>123}
flow.jobs.first.params
=> {:creator_id=>123, :url=>"http://foo.com"}
```

**Note:** job params with the same key as globals will take precedence over the globals.


### Pipelining

Gush offers a useful tool to pass results of a job to its dependencies, so they can act differently.
Expand Down
6 changes: 5 additions & 1 deletion lib/gush/client.rb
Original file line number Diff line number Diff line change
Expand Up @@ -183,7 +183,11 @@ def find_job_by_klass(workflow_id, job_name)
end

def workflow_from_hash(hash, nodes = [])
flow = hash[:klass].constantize.new(*hash[:arguments])
flow = hash[:klass].constantize.new(
*hash[:arguments],
**hash[:kwargs],
globals: hash[:globals]
)
flow.jobs = []
flow.stopped = hash.fetch(:stopped, false)
flow.id = hash[:id]
Expand Down
18 changes: 11 additions & 7 deletions lib/gush/workflow.rb
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@

module Gush
class Workflow
attr_accessor :id, :jobs, :stopped, :persisted, :arguments
attr_accessor :id, :jobs, :stopped, :persisted, :arguments, :kwargs, :globals

def initialize(*args)
def initialize(*args, globals: nil, **kwargs)
@id = id
@jobs = []
@dependencies = []
@persisted = false
@stopped = false
@arguments = args
@kwargs = kwargs
@globals = globals || {}

setup
end
Expand All @@ -19,8 +21,8 @@ def self.find(id)
Gush::Client.new.find_workflow(id)
end

def self.create(*args)
flow = new(*args)
def self.create(*args, **kwargs)
flow = new(*args, **kwargs)
flow.save
flow
end
Expand All @@ -38,7 +40,7 @@ def save
persist!
end

def configure(*args)
def configure(*args, **kwargs)
end

def mark_as_stopped
Expand Down Expand Up @@ -111,7 +113,7 @@ def run(klass, opts = {})
node = klass.new({
workflow_id: id,
id: client.next_free_job_id(id, klass.to_s),
params: opts.fetch(:params, {}),
params: (@globals || {}).merge(opts.fetch(:params, {})),
queue: opts[:queue],
wait: opts[:wait]
})
Expand Down Expand Up @@ -175,6 +177,8 @@ def to_hash
name: name,
id: id,
arguments: @arguments,
kwargs: @kwargs,
globals: @globals,
total: jobs.count,
finished: jobs.count(&:finished?),
klass: name,
Expand All @@ -200,7 +204,7 @@ def id
private

def setup
configure(*@arguments)
configure(*@arguments, **@kwargs)
resolve_dependencies
end

Expand Down
14 changes: 13 additions & 1 deletion spec/gush/client_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -26,13 +26,25 @@

context "when workflow has parameters" do
it "returns Workflow object" do
expected_workflow = ParameterTestWorkflow.create(true)
expected_workflow = ParameterTestWorkflow.create(true, kwarg: 123)
workflow = client.find_workflow(expected_workflow.id)

expect(workflow.id).to eq(expected_workflow.id)
expect(workflow.arguments).to eq([true])
expect(workflow.kwargs).to eq({ kwarg: 123 })
expect(workflow.jobs.map(&:name)).to match_array(expected_workflow.jobs.map(&:name))
end
end

context "when workflow has globals" do
it "returns Workflow object" do
expected_workflow = TestWorkflow.create(globals: { global1: 'foo' })
workflow = client.find_workflow(expected_workflow.id)

expect(workflow.id).to eq(expected_workflow.id)
expect(workflow.globals[:global1]).to eq('foo')
end
end
end
end

Expand Down
30 changes: 28 additions & 2 deletions spec/gush/workflow_spec.rb
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,23 @@ def configure(*args)
expect_any_instance_of(klass).to receive(:configure).with("arg1", "arg2")
klass.new("arg1", "arg2")
end

it "passes constructor keyword arguments to the method" do
klass = Class.new(Gush::Workflow) do
def configure(*args, **kwargs)
run FetchFirstJob
run PersistFirstJob, after: FetchFirstJob
end
end

expect_any_instance_of(klass).to receive(:configure).with("arg1", "arg2", arg3: 123)
klass.new("arg1", "arg2", arg3: 123)
end

it "accepts globals" do
flow = TestWorkflow.new(globals: { global1: 'foo' })
expect(flow.globals[:global1]).to eq('foo')
end
end

describe "#status" do
Expand Down Expand Up @@ -90,7 +107,7 @@ def configure(*args)
end
end

result = JSON.parse(klass.create("arg1", "arg2").to_json)
result = JSON.parse(klass.create("arg1", "arg2", arg3: 123).to_json)
expected = {
"id" => an_instance_of(String),
"name" => klass.to_s,
Expand All @@ -101,7 +118,9 @@ def configure(*args)
"started_at" => nil,
"finished_at" => nil,
"stopped" => false,
"arguments" => ["arg1", "arg2"]
"arguments" => ["arg1", "arg2"],
"kwargs" => {"arg3" => 123},
"globals" => {}
}
expect(result).to match(expected)
end
Expand All @@ -121,6 +140,13 @@ def configure(*args)
expect(flow.jobs.first.params).to eq ({ something: 1 })
end

it "merges globals with params and passes them to the job, with job param taking precedence" do
flow = Gush::Workflow.new(globals: { something: 2, global1: 123 })
flow.run(Gush::Job, params: { something: 1 })
flow.save
expect(flow.jobs.first.params).to eq ({ something: 1, global1: 123 })
end

it "allows passing wait param to the job" do
flow = Gush::Workflow.new
flow.run(Gush::Job, wait: 5.seconds)
Expand Down
4 changes: 2 additions & 2 deletions spec/spec_helper.rb
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ def configure
end

class ParameterTestWorkflow < Gush::Workflow
def configure(param)
run Prepare if param
def configure(param, kwarg: false)
run Prepare if param || kwarg
end
end

Expand Down

0 comments on commit 68bc23f

Please sign in to comment.