diff --git a/README.md b/README.md index 73f9a74..e99b7d1 100644 --- a/README.md +++ b/README.md @@ -159,9 +159,11 @@ Let's assume we are writing a book publishing workflow which needs to know where ```ruby class PublishBookWorkflow < Gush::Workflow - def configure(url, isbn) + def configure(url, isbn, publish: false) run FetchBook, params: { url: url } - run PublishBook, params: { book_isbn: isbn }, after: FetchBook + if publish + run PublishBook, params: { book_isbn: isbn }, after: FetchBook + end end end ``` @@ -169,7 +171,7 @@ end and then create your workflow with those arguments: ```ruby -PublishBookWorkflow.create("http://url.com/book.pdf", "978-0470081204") +PublishBookWorkflow.create("http://url.com/book.pdf", "978-0470081204", publish: true) ``` and that's basically it for defining workflows, see below on how to define jobs: @@ -256,6 +258,29 @@ flow.status ## Advanced features +### Global parameters for jobs + +Workflows can accept a hash of `globals` that are automatically forwarded as parameters to all jobs. + +This is useful to have common functionality across workflow and job classes, such as tracking the creator id for all instances: + +```ruby +class SimpleWorkflow < Gush::Workflow + def configure(url_to_fetch_from) + run DownloadJob, params: { url: url_to_fetch_from } + end +end + +flow = SimpleWorkflow.create('http://foo.com', globals: { creator_id: 123 }) +flow.globals +=> {:creator_id=>123} +flow.jobs.first.params +=> {:creator_id=>123, :url=>"http://foo.com"} +``` + +**Note:** job params with the same key as globals will take precedence over the globals. + + ### Pipelining Gush offers a useful tool to pass results of a job to its dependencies, so they can act differently. diff --git a/lib/gush/client.rb b/lib/gush/client.rb index 012a858..c8e36d6 100644 --- a/lib/gush/client.rb +++ b/lib/gush/client.rb @@ -183,7 +183,11 @@ def find_job_by_klass(workflow_id, job_name) end def workflow_from_hash(hash, nodes = []) - flow = hash[:klass].constantize.new(*hash[:arguments]) + flow = hash[:klass].constantize.new( + *hash[:arguments], + **hash[:kwargs], + globals: hash[:globals] + ) flow.jobs = [] flow.stopped = hash.fetch(:stopped, false) flow.id = hash[:id] diff --git a/lib/gush/workflow.rb b/lib/gush/workflow.rb index f96bd77..e00c77f 100644 --- a/lib/gush/workflow.rb +++ b/lib/gush/workflow.rb @@ -2,15 +2,17 @@ module Gush class Workflow - attr_accessor :id, :jobs, :stopped, :persisted, :arguments + attr_accessor :id, :jobs, :stopped, :persisted, :arguments, :kwargs, :globals - def initialize(*args) + def initialize(*args, globals: nil, **kwargs) @id = id @jobs = [] @dependencies = [] @persisted = false @stopped = false @arguments = args + @kwargs = kwargs + @globals = globals || {} setup end @@ -19,8 +21,8 @@ def self.find(id) Gush::Client.new.find_workflow(id) end - def self.create(*args) - flow = new(*args) + def self.create(*args, **kwargs) + flow = new(*args, **kwargs) flow.save flow end @@ -38,7 +40,7 @@ def save persist! end - def configure(*args) + def configure(*args, **kwargs) end def mark_as_stopped @@ -111,7 +113,7 @@ def run(klass, opts = {}) node = klass.new({ workflow_id: id, id: client.next_free_job_id(id, klass.to_s), - params: opts.fetch(:params, {}), + params: (@globals || {}).merge(opts.fetch(:params, {})), queue: opts[:queue], wait: opts[:wait] }) @@ -175,6 +177,8 @@ def to_hash name: name, id: id, arguments: @arguments, + kwargs: @kwargs, + globals: @globals, total: jobs.count, finished: jobs.count(&:finished?), klass: name, @@ -200,7 +204,7 @@ def id private def setup - configure(*@arguments) + configure(*@arguments, **@kwargs) resolve_dependencies end diff --git a/spec/gush/client_spec.rb b/spec/gush/client_spec.rb index 9fa01e0..2fe72f4 100644 --- a/spec/gush/client_spec.rb +++ b/spec/gush/client_spec.rb @@ -26,13 +26,25 @@ context "when workflow has parameters" do it "returns Workflow object" do - expected_workflow = ParameterTestWorkflow.create(true) + expected_workflow = ParameterTestWorkflow.create(true, kwarg: 123) workflow = client.find_workflow(expected_workflow.id) expect(workflow.id).to eq(expected_workflow.id) + expect(workflow.arguments).to eq([true]) + expect(workflow.kwargs).to eq({ kwarg: 123 }) expect(workflow.jobs.map(&:name)).to match_array(expected_workflow.jobs.map(&:name)) end end + + context "when workflow has globals" do + it "returns Workflow object" do + expected_workflow = TestWorkflow.create(globals: { global1: 'foo' }) + workflow = client.find_workflow(expected_workflow.id) + + expect(workflow.id).to eq(expected_workflow.id) + expect(workflow.globals[:global1]).to eq('foo') + end + end end end diff --git a/spec/gush/workflow_spec.rb b/spec/gush/workflow_spec.rb index e97e987..720323d 100644 --- a/spec/gush/workflow_spec.rb +++ b/spec/gush/workflow_spec.rb @@ -15,6 +15,23 @@ def configure(*args) expect_any_instance_of(klass).to receive(:configure).with("arg1", "arg2") klass.new("arg1", "arg2") end + + it "passes constructor keyword arguments to the method" do + klass = Class.new(Gush::Workflow) do + def configure(*args, **kwargs) + run FetchFirstJob + run PersistFirstJob, after: FetchFirstJob + end + end + + expect_any_instance_of(klass).to receive(:configure).with("arg1", "arg2", arg3: 123) + klass.new("arg1", "arg2", arg3: 123) + end + + it "accepts globals" do + flow = TestWorkflow.new(globals: { global1: 'foo' }) + expect(flow.globals[:global1]).to eq('foo') + end end describe "#status" do @@ -90,7 +107,7 @@ def configure(*args) end end - result = JSON.parse(klass.create("arg1", "arg2").to_json) + result = JSON.parse(klass.create("arg1", "arg2", arg3: 123).to_json) expected = { "id" => an_instance_of(String), "name" => klass.to_s, @@ -101,7 +118,9 @@ def configure(*args) "started_at" => nil, "finished_at" => nil, "stopped" => false, - "arguments" => ["arg1", "arg2"] + "arguments" => ["arg1", "arg2"], + "kwargs" => {"arg3" => 123}, + "globals" => {} } expect(result).to match(expected) end @@ -121,6 +140,13 @@ def configure(*args) expect(flow.jobs.first.params).to eq ({ something: 1 }) end + it "merges globals with params and passes them to the job, with job param taking precedence" do + flow = Gush::Workflow.new(globals: { something: 2, global1: 123 }) + flow.run(Gush::Job, params: { something: 1 }) + flow.save + expect(flow.jobs.first.params).to eq ({ something: 1, global1: 123 }) + end + it "allows passing wait param to the job" do flow = Gush::Workflow.new flow.run(Gush::Job, wait: 5.seconds) diff --git a/spec/spec_helper.rb b/spec/spec_helper.rb index f64a892..2e75290 100644 --- a/spec/spec_helper.rb +++ b/spec/spec_helper.rb @@ -31,8 +31,8 @@ def configure end class ParameterTestWorkflow < Gush::Workflow - def configure(param) - run Prepare if param + def configure(param, kwarg: false) + run Prepare if param || kwarg end end